network.cc 6.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/*!
 *  Copyright (c) 2018 by Contributors
 * \file graph/network.cc
 * \brief DGL networking related APIs
 */

#include "./network.h"
#include "./network/communicator.h"
#include "./network/socket_communicator.h"
#include "./network/serialize.h"

#include "../c_api_common.h"

using dgl::runtime::DGLArgs;
using dgl::runtime::DGLArgValue;
using dgl::runtime::DGLRetValue;
using dgl::runtime::PackedFunc;
using dgl::runtime::NDArray;

namespace dgl {
namespace network {

23
24
25
static char* SEND_BUFFER = nullptr;
static char* RECV_BUFFER = nullptr;

26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// Wrapper for Send api
static void SendData(network::Sender* sender,
                     const char* data,
                     int64_t size,
                     int recv_id) {
  int64_t send_size = sender->Send(data, size, recv_id);
  if (send_size <= 0) {
    LOG(FATAL) << "Send error (size: " << send_size << ")";
  }
}

// Wrapper for Recv api
static void RecvData(network::Receiver* receiver,
                     char* dest,
                     int64_t max_size) {
  int64_t recv_size = receiver->Recv(dest, max_size);
  if (recv_size <= 0) {
    LOG(FATAL) << "Receive error (size: " << recv_size << ")";
  }
}

47
48
49
DGL_REGISTER_GLOBAL("network._CAPI_DGLSenderCreate")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    try {
50
      SEND_BUFFER = new char[kMaxBufferSize];
51
52
53
    } catch (const std::bad_alloc&) {
      LOG(FATAL) << "Not enough memory for sender buffer: " << kMaxBufferSize;
    }
54
55
    network::Sender* sender = new network::SocketSender();
    CommunicatorHandle chandle = static_cast<CommunicatorHandle>(sender);
56
57
58
    *rv = chandle;
  });

59
DGL_REGISTER_GLOBAL("network._CAPI_DGLFinalizeSender")
60
.set_body([] (DGLArgs args, DGLRetValue* rv) {
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
    CommunicatorHandle chandle = args[0];
    network::Sender* sender = static_cast<network::Sender*>(chandle);
    sender->Finalize();
    delete [] SEND_BUFFER;
  });

DGL_REGISTER_GLOBAL("network._CAPI_DGLSenderAddReceiver")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    CommunicatorHandle chandle = args[0];
    std::string ip = args[1];
    int port = args[2];
    int recv_id = args[3];
    network::Sender* sender = static_cast<network::Sender*>(chandle);
    sender->AddReceiver(ip.c_str(), port, recv_id);
  });

DGL_REGISTER_GLOBAL("network._CAPI_DGLSenderConnect")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    CommunicatorHandle chandle = args[0];
    network::Sender* sender = static_cast<network::Sender*>(chandle);
    if (sender->Connect() == false) {
      LOG(FATAL) << "Sender connection failed.";
83
84
85
86
87
88
    }
  });

DGL_REGISTER_GLOBAL("network._CAPI_SenderSendSubgraph")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    CommunicatorHandle chandle = args[0];
89
90
91
92
93
94
    int recv_id = args[1];
    GraphHandle ghandle = args[2];
    const IdArray node_mapping = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[3]));
    const IdArray edge_mapping = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[4]));
    const IdArray layer_offsets = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[5]));
    const IdArray flow_offsets = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[6]));
95
    ImmutableGraph *ptr = static_cast<ImmutableGraph*>(ghandle);
96
    network::Sender* sender = static_cast<network::Sender*>(chandle);
97
    auto csr = ptr->GetInCSR();
98
99
    // Write control message
    *SEND_BUFFER = CONTROL_NODEFLOW;
100
101
    // Serialize nodeflow to data buffer
    int64_t data_size = network::SerializeSampledSubgraph(
102
                             SEND_BUFFER+sizeof(CONTROL_NODEFLOW),
103
104
105
106
107
108
                             csr,
                             node_mapping,
                             edge_mapping,
                             layer_offsets,
                             flow_offsets);
    CHECK_GT(data_size, 0);
109
    data_size += sizeof(CONTROL_NODEFLOW);
110
    // Send msg via network
111
112
113
114
115
116
117
118
119
120
121
    SendData(sender, SEND_BUFFER, data_size, recv_id);
  });

DGL_REGISTER_GLOBAL("network._CAPI_SenderSendEndSignal")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    CommunicatorHandle chandle = args[0];
    int recv_id = args[1];
    network::Sender* sender = static_cast<network::Sender*>(chandle);
    *SEND_BUFFER = CONTROL_END_SIGNAL;
    // Send msg via network
    SendData(sender, SEND_BUFFER, sizeof(CONTROL_END_SIGNAL), recv_id);
122
123
124
125
126
127
128
129
  });

DGL_REGISTER_GLOBAL("network._CAPI_DGLReceiverCreate")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    try {
      RECV_BUFFER = new char[kMaxBufferSize];
    } catch (const std::bad_alloc&) {
      LOG(FATAL) << "Not enough memory for receiver buffer: " << kMaxBufferSize;
130
    }
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
    network::Receiver* receiver = new network::SocketReceiver();
    CommunicatorHandle chandle = static_cast<CommunicatorHandle>(receiver);
    *rv = chandle;
  });

DGL_REGISTER_GLOBAL("network._CAPI_DGLFinalizeReceiver")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    CommunicatorHandle chandle = args[0];
    network::Receiver* receiver = static_cast<network::SocketReceiver*>(chandle);
    receiver->Finalize();
    delete [] RECV_BUFFER;
  });

DGL_REGISTER_GLOBAL("network._CAPI_DGLReceiverWait")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    CommunicatorHandle chandle = args[0];
    std::string ip = args[1];
    int port = args[2];
    int num_sender = args[3];
    network::Receiver* receiver = static_cast<network::SocketReceiver*>(chandle);
    receiver->Wait(ip.c_str(), port, num_sender, kQueueSize);
152
153
154
155
156
  });

DGL_REGISTER_GLOBAL("network._CAPI_ReceiverRecvSubgraph")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    CommunicatorHandle chandle = args[0];
157
    network::Receiver* receiver = static_cast<network::SocketReceiver*>(chandle);
158
    // Recv data from network
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
    RecvData(receiver, RECV_BUFFER, kMaxBufferSize);
    int control = *RECV_BUFFER;
    if (control == CONTROL_NODEFLOW) {
      NodeFlow* nf = new NodeFlow();
      ImmutableGraph::CSR::Ptr csr;
      // Deserialize nodeflow from recv_data_buffer
      network::DeserializeSampledSubgraph(RECV_BUFFER+sizeof(CONTROL_NODEFLOW),
                                          &(csr),
                                          &(nf->node_mapping),
                                          &(nf->edge_mapping),
                                          &(nf->layer_offsets),
                                          &(nf->flow_offsets));
      nf->graph = GraphPtr(new ImmutableGraph(csr, nullptr, false));
      std::vector<NodeFlow*> subgs(1);
      subgs[0] = nf;
      *rv = WrapVectorReturn(subgs);
    } else if (control == CONTROL_END_SIGNAL) {
      *rv = CONTROL_END_SIGNAL;
    } else {
      LOG(FATAL) << "Unknow control number: " << control;
179
180
181
182
183
    }
  });

}  // namespace network
}  // namespace dgl