socket_communicator.h 4.22 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
/*!
 *  Copyright (c) 2019 by Contributors
 * \file communicator.h
 * \brief SocketCommunicator for DGL distributed training.
 */
#ifndef DGL_GRAPH_NETWORK_SOCKET_COMMUNICATOR_H_
#define DGL_GRAPH_NETWORK_SOCKET_COMMUNICATOR_H_

#include <thread>
#include <vector>
#include <string>
12
#include <unordered_map>
13
14
15
16
17
18
19
20
21
22

#include "communicator.h"
#include "msg_queue.h"
#include "tcp_socket.h"

namespace dgl {
namespace network {

using dgl::network::MessageQueue;
using dgl::network::TCPSocket;
23
24
using dgl::network::Sender;
using dgl::network::Receiver;
25
26

/*!
27
 * \breif Networking address
28
 */
29
30
31
32
33
34
35
36
37
38
39
40
41
42
struct Addr {
  std::string ip_;
  int port_;
};

/*!
 * \brief Network Sender for DGL distributed training.
 *
 * Sender is an abstract class that defines a set of APIs for sending 
 * binary data over network. It can be implemented by different underlying 
 * networking libraries such TCP socket and ZMQ. One Sender can connect to 
 * multiple receivers, and it can send data to specified receiver via receiver's ID.
 */
class SocketSender : public Sender {
43
44
 public:
  /*!
45
46
47
48
   * \brief Add receiver address and it's ID to the namebook
   * \param ip receviver's IP address
   * \param port receiver's port
   * \param id receiver's ID
49
   */
50
51
  void AddReceiver(const char* ip, int port, int recv_id);

52
  /*!
53
54
   * \brief Connect with all the Receivers
   * \return True for sucess and False for fail
55
   */
56
  bool Connect();
57
58

  /*!
59
60
61
62
63
64
   * \brief Send data to specified Receiver
   * \param data data buffer for sending
   * \param size data size for sending
   * \param recv_id receiver's ID
   * \return bytes we sent
   *   > 0 : bytes we sent
65
66
   *   - 1 : error
   */
67
  int64_t Send(const char* data, int64_t size, int recv_id);
68
69

  /*!
70
   * \brief Finalize Sender
71
72
73
   */
  void Finalize();

74
75
76
77
78
79
80
81
82
83
84
  /*!
   * \brief Get data buffer
   * \return buffer pointer
   */
  char* GetBuffer();

  /*!
   * \brief Set data buffer
   */
  void SetBuffer(char* buffer);

85
86
87
88
89
90
 private:
  /*!
   * \brief socket map
   */ 
  std::unordered_map<int, TCPSocket*> socket_map_;

91
  /*!
92
93
94
   * \brief receiver address map
   */ 
  std::unordered_map<int, Addr> receiver_addr_map_;
95
96
97
98
99

  /*!
   * \brief data buffer
   */ 
  char* buffer_;
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
};

/*!
 * \brief Network Receiver for DGL distributed training.
 *
 * Receiver is an abstract class that defines a set of APIs for receiving binary 
 * data over network. It can be implemented by different underlying networking libraries 
 * such TCP socket and ZMQ. One Receiver can connect with multiple Senders, and it can receive 
 * data from these Senders concurrently via multi-threading and message queue.
 */
class SocketReceiver : public Receiver {
 public:
  /*!
   * \brief Wait all of the Senders to connect
   * \param ip Receiver's IP address
   * \param port Receiver's port
   * \param num_sender total number of Senders
   * \param queue_size size of message queue
   * \return True for sucess and False for fail
119
   */
120
  bool Wait(const char* ip, int port, int num_sender, int queue_size);
121
122

  /*!
123
124
125
126
127
128
   * \brief Recv data from Sender (copy data from message queue)
   * \param dest data buffer of destination
   * \param max_size maximul size of data buffer
   * \return bytes we received
   *   > 0 : bytes we received
   *   - 1 : error
129
   */
130
  int64_t Recv(char* dest, int64_t max_size);
131

132
  /*!
133
   * \brief Finalize Receiver
134
   */
135
  void Finalize();
136

137
138
139
140
141
142
143
144
145
146
147
  /*!
   * \brief Get data buffer
   * \return buffer pointer
   */
  char* GetBuffer();

  /*!
   * \brief Set data buffer
   */
  void SetBuffer(char* buffer);

148
 private:
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
  /*!
   * \brief number of sender
   */
  int num_sender_;

  /*!
   * \brief maximal size of message queue
   */ 
  int64_t queue_size_;

  /*!
   * \brief socket list
   */ 
  std::vector<TCPSocket*> socket_;

  /*!
   * \brief Thread pool for socket connection
   */ 
  std::vector<std::thread*> thread_;

  /*!
   * \brief Message queue for communicator
   */ 
  MessageQueue* queue_;

174
175
176
177
178
  /*!
   * \brief data buffer
   */ 
  char* buffer_;

179
180
181
182
  /*!
   * \brief Process received message in independent threads
   * \param socket new accpeted socket
   * \param queue message queue
183
   * \param id producer_id
184
   */ 
185
  static void MsgHandler(TCPSocket* socket, MessageQueue* queue, int id);
186
187
188
189
190
191
};

}  // namespace network
}  // namespace dgl

#endif  // DGL_GRAPH_NETWORK_SOCKET_COMMUNICATOR_H_