network.cc 5.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/*!
 *  Copyright (c) 2018 by Contributors
 * \file graph/network.cc
 * \brief DGL networking related APIs
 */

#include "./network.h"
#include "./network/communicator.h"
#include "./network/socket_communicator.h"
#include "./network/serialize.h"

#include "../c_api_common.h"

using dgl::runtime::DGLArgs;
using dgl::runtime::DGLArgValue;
using dgl::runtime::DGLRetValue;
using dgl::runtime::PackedFunc;
using dgl::runtime::NDArray;

namespace dgl {
namespace network {

23
24
25
static char* SEND_BUFFER = nullptr;
static char* RECV_BUFFER = nullptr;

26
27
28
DGL_REGISTER_GLOBAL("network._CAPI_DGLSenderCreate")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    try {
29
      SEND_BUFFER = new char[kMaxBufferSize];
30
31
32
    } catch (const std::bad_alloc&) {
      LOG(FATAL) << "Not enough memory for sender buffer: " << kMaxBufferSize;
    }
33
34
    network::Sender* sender = new network::SocketSender();
    CommunicatorHandle chandle = static_cast<CommunicatorHandle>(sender);
35
36
37
    *rv = chandle;
  });

38
DGL_REGISTER_GLOBAL("network._CAPI_DGLFinalizeSender")
39
.set_body([] (DGLArgs args, DGLRetValue* rv) {
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
    CommunicatorHandle chandle = args[0];
    network::Sender* sender = static_cast<network::Sender*>(chandle);
    sender->Finalize();
    delete [] SEND_BUFFER;
  });

DGL_REGISTER_GLOBAL("network._CAPI_DGLSenderAddReceiver")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    CommunicatorHandle chandle = args[0];
    std::string ip = args[1];
    int port = args[2];
    int recv_id = args[3];
    network::Sender* sender = static_cast<network::Sender*>(chandle);
    sender->AddReceiver(ip.c_str(), port, recv_id);
  });

DGL_REGISTER_GLOBAL("network._CAPI_DGLSenderConnect")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    CommunicatorHandle chandle = args[0];
    network::Sender* sender = static_cast<network::Sender*>(chandle);
    if (sender->Connect() == false) {
      LOG(FATAL) << "Sender connection failed.";
62
63
64
65
66
67
    }
  });

DGL_REGISTER_GLOBAL("network._CAPI_SenderSendSubgraph")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    CommunicatorHandle chandle = args[0];
68
69
70
71
72
73
    int recv_id = args[1];
    GraphHandle ghandle = args[2];
    const IdArray node_mapping = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[3]));
    const IdArray edge_mapping = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[4]));
    const IdArray layer_offsets = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[5]));
    const IdArray flow_offsets = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[6]));
74
    ImmutableGraph *ptr = static_cast<ImmutableGraph*>(ghandle);
75
    network::Sender* sender = static_cast<network::Sender*>(chandle);
76
77
78
    auto csr = ptr->GetInCSR();
    // Serialize nodeflow to data buffer
    int64_t data_size = network::SerializeSampledSubgraph(
79
                             SEND_BUFFER,
80
81
82
83
84
85
86
                             csr,
                             node_mapping,
                             edge_mapping,
                             layer_offsets,
                             flow_offsets);
    CHECK_GT(data_size, 0);
    // Send msg via network
87
    int64_t size = sender->Send(SEND_BUFFER, data_size, recv_id);
88
    if (size <= 0) {
89
90
91
92
93
94
95
96
97
98
      LOG(FATAL) << "Send message error (size: " << size << ")";
    }
  });

DGL_REGISTER_GLOBAL("network._CAPI_DGLReceiverCreate")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    try {
      RECV_BUFFER = new char[kMaxBufferSize];
    } catch (const std::bad_alloc&) {
      LOG(FATAL) << "Not enough memory for receiver buffer: " << kMaxBufferSize;
99
    }
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
    network::Receiver* receiver = new network::SocketReceiver();
    CommunicatorHandle chandle = static_cast<CommunicatorHandle>(receiver);
    *rv = chandle;
  });

DGL_REGISTER_GLOBAL("network._CAPI_DGLFinalizeReceiver")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    CommunicatorHandle chandle = args[0];
    network::Receiver* receiver = static_cast<network::SocketReceiver*>(chandle);
    receiver->Finalize();
    delete [] RECV_BUFFER;
  });

DGL_REGISTER_GLOBAL("network._CAPI_DGLReceiverWait")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    CommunicatorHandle chandle = args[0];
    std::string ip = args[1];
    int port = args[2];
    int num_sender = args[3];
    network::Receiver* receiver = static_cast<network::SocketReceiver*>(chandle);
    receiver->Wait(ip.c_str(), port, num_sender, kQueueSize);
121
122
123
124
125
  });

DGL_REGISTER_GLOBAL("network._CAPI_ReceiverRecvSubgraph")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    CommunicatorHandle chandle = args[0];
126
    network::Receiver* receiver = static_cast<network::SocketReceiver*>(chandle);
127
    // Recv data from network
128
    int64_t size = receiver->Recv(RECV_BUFFER, kMaxBufferSize);
129
    if (size <= 0) {
130
      LOG(FATAL) << "Receive error: (size: " << size << ")";
131
132
133
134
    }
    NodeFlow* nf = new NodeFlow();
    ImmutableGraph::CSR::Ptr csr;
    // Deserialize nodeflow from recv_data_buffer
135
    network::DeserializeSampledSubgraph(RECV_BUFFER,
136
                                        &(csr),
137
138
139
140
141
142
143
144
145
146
147
148
                                        &(nf->node_mapping),
                                        &(nf->edge_mapping),
                                        &(nf->layer_offsets),
                                        &(nf->flow_offsets));
    nf->graph = GraphPtr(new ImmutableGraph(csr, nullptr, false));
    std::vector<NodeFlow*> subgs(1);
    subgs[0] = nf;
    *rv = WrapVectorReturn(subgs);
  });

}  // namespace network
}  // namespace dgl