network.cc 6.35 KB
Newer Older
1
2
3
4
5
6
/*!
 *  Copyright (c) 2018 by Contributors
 * \file graph/network.cc
 * \brief DGL networking related APIs
 */

7
8
#include <dgl/runtime/container.h>
#include <dgl/packed_func_ext.h>
9
10
11
12
13
14
15
#include "./network.h"
#include "./network/communicator.h"
#include "./network/socket_communicator.h"
#include "./network/serialize.h"

#include "../c_api_common.h"

16
using namespace dgl::runtime;
17
18
19
20

namespace dgl {
namespace network {

21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// Wrapper for Send api
static void SendData(network::Sender* sender,
                     const char* data,
                     int64_t size,
                     int recv_id) {
  int64_t send_size = sender->Send(data, size, recv_id);
  if (send_size <= 0) {
    LOG(FATAL) << "Send error (size: " << send_size << ")";
  }
}

// Wrapper for Recv api
static void RecvData(network::Receiver* receiver,
                     char* dest,
                     int64_t max_size) {
  int64_t recv_size = receiver->Recv(dest, max_size);
  if (recv_size <= 0) {
    LOG(FATAL) << "Receive error (size: " << recv_size << ")";
  }
}

42
43
DGL_REGISTER_GLOBAL("network._CAPI_DGLSenderCreate")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
44
    network::Sender* sender = new network::SocketSender();
45
    try {
46
47
      char* buffer = new char[kMaxBufferSize];
      sender->SetBuffer(buffer);
48
49
50
    } catch (const std::bad_alloc&) {
      LOG(FATAL) << "Not enough memory for sender buffer: " << kMaxBufferSize;
    }
51
    CommunicatorHandle chandle = static_cast<CommunicatorHandle>(sender);
52
53
54
    *rv = chandle;
  });

55
DGL_REGISTER_GLOBAL("network._CAPI_DGLFinalizeSender")
56
.set_body([] (DGLArgs args, DGLRetValue* rv) {
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
    CommunicatorHandle chandle = args[0];
    network::Sender* sender = static_cast<network::Sender*>(chandle);
    sender->Finalize();
  });

DGL_REGISTER_GLOBAL("network._CAPI_DGLSenderAddReceiver")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    CommunicatorHandle chandle = args[0];
    std::string ip = args[1];
    int port = args[2];
    int recv_id = args[3];
    network::Sender* sender = static_cast<network::Sender*>(chandle);
    sender->AddReceiver(ip.c_str(), port, recv_id);
  });

DGL_REGISTER_GLOBAL("network._CAPI_DGLSenderConnect")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    CommunicatorHandle chandle = args[0];
    network::Sender* sender = static_cast<network::Sender*>(chandle);
    if (sender->Connect() == false) {
      LOG(FATAL) << "Sender connection failed.";
78
79
80
81
82
83
    }
  });

DGL_REGISTER_GLOBAL("network._CAPI_SenderSendSubgraph")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    CommunicatorHandle chandle = args[0];
84
    int recv_id = args[1];
85
86
    // TODO(minjie): could simply use NodeFlow nf = args[2];
    GraphRef g = args[2];
87
88
89
90
    const IdArray node_mapping = args[3];
    const IdArray edge_mapping = args[4];
    const IdArray layer_offsets = args[5];
    const IdArray flow_offsets = args[6];
91
92
    auto ptr = std::dynamic_pointer_cast<ImmutableGraph>(g.sptr());
    CHECK(ptr) << "only immutable graph is allowed in send/recv";
93
    network::Sender* sender = static_cast<network::Sender*>(chandle);
94
    auto csr = ptr->GetInCSR();
95
    // Write control message
96
97
    char* buffer = sender->GetBuffer();
    *buffer = CONTROL_NODEFLOW;
98
99
    // Serialize nodeflow to data buffer
    int64_t data_size = network::SerializeSampledSubgraph(
100
                             buffer+sizeof(CONTROL_NODEFLOW),
101
102
103
104
105
106
                             csr,
                             node_mapping,
                             edge_mapping,
                             layer_offsets,
                             flow_offsets);
    CHECK_GT(data_size, 0);
107
    data_size += sizeof(CONTROL_NODEFLOW);
108
    // Send msg via network
109
    SendData(sender, buffer, data_size, recv_id);
110
111
112
113
114
115
116
  });

DGL_REGISTER_GLOBAL("network._CAPI_SenderSendEndSignal")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    CommunicatorHandle chandle = args[0];
    int recv_id = args[1];
    network::Sender* sender = static_cast<network::Sender*>(chandle);
117
118
    char* buffer = sender->GetBuffer();
    *buffer = CONTROL_END_SIGNAL;
119
    // Send msg via network
120
    SendData(sender, buffer, sizeof(CONTROL_END_SIGNAL), recv_id);
121
122
123
124
  });

DGL_REGISTER_GLOBAL("network._CAPI_DGLReceiverCreate")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
125
    network::Receiver* receiver = new network::SocketReceiver();
126
    try {
127
128
      char* buffer = new char[kMaxBufferSize];
      receiver->SetBuffer(buffer);
129
130
    } catch (const std::bad_alloc&) {
      LOG(FATAL) << "Not enough memory for receiver buffer: " << kMaxBufferSize;
131
    }
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
    CommunicatorHandle chandle = static_cast<CommunicatorHandle>(receiver);
    *rv = chandle;
  });

DGL_REGISTER_GLOBAL("network._CAPI_DGLFinalizeReceiver")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    CommunicatorHandle chandle = args[0];
    network::Receiver* receiver = static_cast<network::SocketReceiver*>(chandle);
    receiver->Finalize();
  });

DGL_REGISTER_GLOBAL("network._CAPI_DGLReceiverWait")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    CommunicatorHandle chandle = args[0];
    std::string ip = args[1];
    int port = args[2];
    int num_sender = args[3];
    network::Receiver* receiver = static_cast<network::SocketReceiver*>(chandle);
    receiver->Wait(ip.c_str(), port, num_sender, kQueueSize);
151
152
153
154
155
  });

DGL_REGISTER_GLOBAL("network._CAPI_ReceiverRecvSubgraph")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    CommunicatorHandle chandle = args[0];
156
    network::Receiver* receiver = static_cast<network::SocketReceiver*>(chandle);
157
    // Recv data from network
158
159
160
    char* buffer = receiver->GetBuffer();
    RecvData(receiver, buffer, kMaxBufferSize);
    int control = *buffer;
161
    if (control == CONTROL_NODEFLOW) {
162
      NodeFlow nf = NodeFlow::Create();
163
      CSRPtr csr;
164
      // Deserialize nodeflow from recv_data_buffer
165
      network::DeserializeSampledSubgraph(buffer+sizeof(CONTROL_NODEFLOW),
166
167
168
169
170
                                          &(csr),
                                          &(nf->node_mapping),
                                          &(nf->edge_mapping),
                                          &(nf->layer_offsets),
                                          &(nf->flow_offsets));
171
      nf->graph = GraphPtr(new ImmutableGraph(csr, nullptr));
172
173
174
      List<NodeFlow> subgs;
      subgs.push_back(nf);
      *rv = subgs;
175
176
177
178
    } else if (control == CONTROL_END_SIGNAL) {
      *rv = CONTROL_END_SIGNAL;
    } else {
      LOG(FATAL) << "Unknow control number: " << control;
179
180
181
182
183
    }
  });

}  // namespace network
}  // namespace dgl