network.cc 6.52 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/*!
 *  Copyright (c) 2018 by Contributors
 * \file graph/network.cc
 * \brief DGL networking related APIs
 */

#include "./network.h"
#include "./network/communicator.h"
#include "./network/socket_communicator.h"
#include "./network/serialize.h"

#include "../c_api_common.h"

using dgl::runtime::DGLArgs;
using dgl::runtime::DGLArgValue;
using dgl::runtime::DGLRetValue;
using dgl::runtime::PackedFunc;
using dgl::runtime::NDArray;

namespace dgl {
namespace network {

23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// Wrapper for Send api
static void SendData(network::Sender* sender,
                     const char* data,
                     int64_t size,
                     int recv_id) {
  int64_t send_size = sender->Send(data, size, recv_id);
  if (send_size <= 0) {
    LOG(FATAL) << "Send error (size: " << send_size << ")";
  }
}

// Wrapper for Recv api
static void RecvData(network::Receiver* receiver,
                     char* dest,
                     int64_t max_size) {
  int64_t recv_size = receiver->Recv(dest, max_size);
  if (recv_size <= 0) {
    LOG(FATAL) << "Receive error (size: " << recv_size << ")";
  }
}

44
45
DGL_REGISTER_GLOBAL("network._CAPI_DGLSenderCreate")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
46
    network::Sender* sender = new network::SocketSender();
47
    try {
48
49
      char* buffer = new char[kMaxBufferSize];
      sender->SetBuffer(buffer);
50
51
52
    } catch (const std::bad_alloc&) {
      LOG(FATAL) << "Not enough memory for sender buffer: " << kMaxBufferSize;
    }
53
    CommunicatorHandle chandle = static_cast<CommunicatorHandle>(sender);
54
55
56
    *rv = chandle;
  });

57
DGL_REGISTER_GLOBAL("network._CAPI_DGLFinalizeSender")
58
.set_body([] (DGLArgs args, DGLRetValue* rv) {
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
    CommunicatorHandle chandle = args[0];
    network::Sender* sender = static_cast<network::Sender*>(chandle);
    sender->Finalize();
  });

DGL_REGISTER_GLOBAL("network._CAPI_DGLSenderAddReceiver")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    CommunicatorHandle chandle = args[0];
    std::string ip = args[1];
    int port = args[2];
    int recv_id = args[3];
    network::Sender* sender = static_cast<network::Sender*>(chandle);
    sender->AddReceiver(ip.c_str(), port, recv_id);
  });

DGL_REGISTER_GLOBAL("network._CAPI_DGLSenderConnect")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    CommunicatorHandle chandle = args[0];
    network::Sender* sender = static_cast<network::Sender*>(chandle);
    if (sender->Connect() == false) {
      LOG(FATAL) << "Sender connection failed.";
80
81
82
83
84
85
    }
  });

DGL_REGISTER_GLOBAL("network._CAPI_SenderSendSubgraph")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    CommunicatorHandle chandle = args[0];
86
87
88
89
90
91
    int recv_id = args[1];
    GraphHandle ghandle = args[2];
    const IdArray node_mapping = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[3]));
    const IdArray edge_mapping = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[4]));
    const IdArray layer_offsets = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[5]));
    const IdArray flow_offsets = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[6]));
92
    ImmutableGraph *ptr = static_cast<ImmutableGraph*>(ghandle);
93
    network::Sender* sender = static_cast<network::Sender*>(chandle);
94
    auto csr = ptr->GetInCSR();
95
    // Write control message
96
97
    char* buffer = sender->GetBuffer();
    *buffer = CONTROL_NODEFLOW;
98
99
    // Serialize nodeflow to data buffer
    int64_t data_size = network::SerializeSampledSubgraph(
100
                             buffer+sizeof(CONTROL_NODEFLOW),
101
102
103
104
105
106
                             csr,
                             node_mapping,
                             edge_mapping,
                             layer_offsets,
                             flow_offsets);
    CHECK_GT(data_size, 0);
107
    data_size += sizeof(CONTROL_NODEFLOW);
108
    // Send msg via network
109
    SendData(sender, buffer, data_size, recv_id);
110
111
112
113
114
115
116
  });

DGL_REGISTER_GLOBAL("network._CAPI_SenderSendEndSignal")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    CommunicatorHandle chandle = args[0];
    int recv_id = args[1];
    network::Sender* sender = static_cast<network::Sender*>(chandle);
117
118
    char* buffer = sender->GetBuffer();
    *buffer = CONTROL_END_SIGNAL;
119
    // Send msg via network
120
    SendData(sender, buffer, sizeof(CONTROL_END_SIGNAL), recv_id);
121
122
123
124
  });

DGL_REGISTER_GLOBAL("network._CAPI_DGLReceiverCreate")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
125
    network::Receiver* receiver = new network::SocketReceiver();
126
    try {
127
128
      char* buffer = new char[kMaxBufferSize];
      receiver->SetBuffer(buffer);
129
130
    } catch (const std::bad_alloc&) {
      LOG(FATAL) << "Not enough memory for receiver buffer: " << kMaxBufferSize;
131
    }
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
    CommunicatorHandle chandle = static_cast<CommunicatorHandle>(receiver);
    *rv = chandle;
  });

DGL_REGISTER_GLOBAL("network._CAPI_DGLFinalizeReceiver")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    CommunicatorHandle chandle = args[0];
    network::Receiver* receiver = static_cast<network::SocketReceiver*>(chandle);
    receiver->Finalize();
  });

DGL_REGISTER_GLOBAL("network._CAPI_DGLReceiverWait")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    CommunicatorHandle chandle = args[0];
    std::string ip = args[1];
    int port = args[2];
    int num_sender = args[3];
    network::Receiver* receiver = static_cast<network::SocketReceiver*>(chandle);
    receiver->Wait(ip.c_str(), port, num_sender, kQueueSize);
151
152
153
154
155
  });

DGL_REGISTER_GLOBAL("network._CAPI_ReceiverRecvSubgraph")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
    CommunicatorHandle chandle = args[0];
156
    network::Receiver* receiver = static_cast<network::SocketReceiver*>(chandle);
157
    // Recv data from network
158
159
160
    char* buffer = receiver->GetBuffer();
    RecvData(receiver, buffer, kMaxBufferSize);
    int control = *buffer;
161
162
163
164
    if (control == CONTROL_NODEFLOW) {
      NodeFlow* nf = new NodeFlow();
      ImmutableGraph::CSR::Ptr csr;
      // Deserialize nodeflow from recv_data_buffer
165
      network::DeserializeSampledSubgraph(buffer+sizeof(CONTROL_NODEFLOW),
166
167
168
169
170
171
172
173
174
175
176
177
178
                                          &(csr),
                                          &(nf->node_mapping),
                                          &(nf->edge_mapping),
                                          &(nf->layer_offsets),
                                          &(nf->flow_offsets));
      nf->graph = GraphPtr(new ImmutableGraph(csr, nullptr, false));
      std::vector<NodeFlow*> subgs(1);
      subgs[0] = nf;
      *rv = WrapVectorReturn(subgs);
    } else if (control == CONTROL_END_SIGNAL) {
      *rv = CONTROL_END_SIGNAL;
    } else {
      LOG(FATAL) << "Unknow control number: " << control;
179
180
181
182
183
    }
  });

}  // namespace network
}  // namespace dgl