communicator.h 4.9 KB
Newer Older
1
2
3
4
5
/*!
 *  Copyright (c) 2019 by Contributors
 * \file communicator.h
 * \brief Communicator for DGL distributed training.
 */
6
7
#ifndef DGL_RPC_NETWORK_COMMUNICATOR_H_
#define DGL_RPC_NETWORK_COMMUNICATOR_H_
8

9
10
#include <dmlc/logging.h>

11
12
#include <string>

13
14
#include "msg_queue.h"

15
16
17
18
namespace dgl {
namespace network {

/*!
19
 * \brief Network Sender for DGL distributed training.
20
 *
21
22
23
24
 * Sender is an abstract class that defines a set of APIs for sending binary 
 * data message over network. It can be implemented by different underlying 
 * networking libraries such TCP socket and MPI. One Sender can connect to 
 * multiple receivers and it can send data to specified receiver via receiver's ID.
25
 */
26
class Sender {
27
 public:
28
29
30
31
32
33
34
35
36
37
  /*!
   * \brief Sender constructor
   * \param queue_size size (bytes) of message queue. 
   * Note that, the queue_size parameter is optional.
   */
  explicit Sender(int64_t queue_size = 0) {
    CHECK_GE(queue_size, 0);
    queue_size_ = queue_size;
  }

38
  virtual ~Sender() {}
39
40

  /*!
41
42
   * \brief Add receiver's address and ID to the sender's namebook
   * \param addr Networking address, e.g., 'socket://127.0.0.1:50091', 'mpi://0'
43
   * \param id receiver's ID
44
45
   *
   * AddReceiver() is not thread-safe and only one thread can invoke this API.
46
   */
47
  virtual void AddReceiver(const char* addr, int id) = 0;
48

49
  /*!
50
   * \brief Connect with all the Receivers
51
52
53
   * \return True for success and False for fail
   *
   * Connect() is not thread-safe and only one thread can invoke this API.
54
   */
55
  virtual bool Connect() = 0;
56
57

  /*!
58
59
   * \brief Send data to specified Receiver.
   * \param msg data message
60
   * \param recv_id receiver's ID
61
62
63
64
65
66
67
68
   * \return Status code
   *
   * (1) The send is non-blocking. There is no guarantee that the message has been 
   *     physically sent out when the function returns.
   * (2) The communicator will assume the responsibility of the given message.
   * (3) The API is multi-thread safe.
   * (4) Messages sent to the same receiver are guaranteed to be received in the same order. 
   *     There is no guarantee for messages sent to different receivers.
69
   */
70
  virtual STATUS Send(Message msg, int recv_id) = 0;
71
72

  /*!
73
   * \brief Finalize Sender
74
75
   *
   * Finalize() is not thread-safe and only one thread can invoke this API.
76
77
   */
  virtual void Finalize() = 0;
78
79

  /*!
80
   * \brief Communicator type: 'socket', 'mpi', etc.
81
   */
82
  virtual std::string Type() const = 0;
83

84
 protected:
85
  /*!
86
   * \brief Size of message queue
87
   */
88
  int64_t queue_size_;
89
90
91
92
93
};

/*!
 * \brief Network Receiver for DGL distributed training.
 *
94
95
96
97
 * Receiver is an abstract class that defines a set of APIs for receiving binary data 
 * message over network. It can be implemented by different underlying networking 
 * libraries such as TCP socket and MPI. One Receiver can connect with multiple Senders 
 * and it can receive data from multiple Senders concurrently.
98
99
100
 */
class Receiver {
 public:
101
102
103
104
105
106
107
108
109
110
111
112
  /*!
   * \brief Receiver constructor
   * \param queue_size size of message queue.
   * Note that, the queue_size parameter is optional.
   */
  explicit Receiver(int64_t queue_size = 0) {
    if (queue_size < 0) {
      LOG(FATAL) << "queue_size cannot be a negative number.";
    }
    queue_size_ = queue_size;
  }

113
  virtual ~Receiver() {}
114
115

  /*!
116
117
   * \brief Wait for all the Senders to connect
   * \param addr Networking address, e.g., 'socket://127.0.0.1:50051', 'mpi://0'
118
   * \param num_sender total number of Senders
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
   * \return True for success and False for fail
   *
   * Wait() is not thread-safe and only one thread can invoke this API.
   */
  virtual bool Wait(const char* addr, int num_sender) = 0;

  /*!
   * \brief Recv data from Sender
   * \param msg pointer of data message
   * \param send_id which sender current msg comes from
   * \return Status code
   *
   * (1) The Recv() API is blocking, which will not 
   *     return until getting data from message queue.
   * (2) The Recv() API is thread-safe.
   * (3) Memory allocated by communicator but will not own it after the function returns.
135
   */
136
  virtual STATUS Recv(Message* msg, int* send_id) = 0;
137
138

  /*!
139
140
141
142
143
144
145
146
147
   * \brief Recv data from a specified Sender
   * \param msg pointer of data message
   * \param send_id sender's ID
   * \return Status code
   *
   * (1) The RecvFrom() API is blocking, which will not 
   *     return until getting data from message queue.
   * (2) The RecvFrom() API is thread-safe.
   * (3) Memory allocated by communicator but will not own it after the function returns.
148
   */
149
  virtual STATUS RecvFrom(Message* msg, int send_id) = 0;
150
151
152

  /*!
   * \brief Finalize Receiver
153
154
   *
   * Finalize() is not thread-safe and only one thread can invoke this API.
155
156
   */
  virtual void Finalize() = 0;
157
158

  /*!
159
   * \brief Communicator type: 'socket', 'mpi', etc
160
   */
161
  virtual std::string Type() const = 0;
162

163
 protected:
164
  /*!
165
   * \brief Size of message queue
166
   */
167
  int64_t queue_size_;
168
169
170
171
172
};

}  // namespace network
}  // namespace dgl

173
#endif  // DGL_RPC_NETWORK_COMMUNICATOR_H_