communicator.h 4.05 KB
Newer Older
1
2
3
4
5
/*!
 *  Copyright (c) 2019 by Contributors
 * \file communicator.h
 * \brief Communicator for DGL distributed training.
 */
6
7
#ifndef DGL_RPC_NETWORK_COMMUNICATOR_H_
#define DGL_RPC_NETWORK_COMMUNICATOR_H_
8

9
10
#include <dmlc/logging.h>

11
12
#include <string>

13
#include "../net_type.h"
14
15
#include "msg_queue.h"

16
17
18
19
namespace dgl {
namespace network {

/*!
20
 * \brief Network Sender for DGL distributed training.
21
 *
22
23
24
25
26
 * Sender is an abstract class that defines a set of APIs for sending binary
 * data message over network. It can be implemented by different underlying
 * networking libraries such TCP socket and MPI. One Sender can connect to
 * multiple receivers and it can send data to specified receiver via receiver's
 * ID.
27
 */
28
class Sender : public rpc::RPCSender {
29
 public:
30
31
  /*!
   * \brief Sender constructor
32
   * \param queue_size size (bytes) of message queue.
33
   * \param max_thread_count size of thread pool. 0 for no limit
34
35
   * Note that, the queue_size parameter is optional.
   */
36
  explicit Sender(int64_t queue_size = 0, int max_thread_count = 0) {
37
    CHECK_GE(queue_size, 0);
38
    CHECK_GE(max_thread_count, 0);
39
    queue_size_ = queue_size;
40
    max_thread_count_ = max_thread_count;
41
42
  }

43
  virtual ~Sender() {}
44
45

  /*!
46
47
   * \brief Send data to specified Receiver.
   * \param msg data message
48
   * \param recv_id receiver's ID
49
50
   * \return Status code
   *
51
52
53
54
55
56
   * (1) The send is non-blocking. There is no guarantee that the message has
   * been physically sent out when the function returns. (2) The communicator
   * will assume the responsibility of the given message. (3) The API is
   * multi-thread safe. (4) Messages sent to the same receiver are guaranteed to
   * be received in the same order. There is no guarantee for messages sent to
   * different receivers.
57
   */
58
  virtual STATUS Send(Message msg, int recv_id) = 0;
59

60
 protected:
61
  /*!
62
   * \brief Size of message queue
63
   */
64
  int64_t queue_size_;
65
66
67
68
  /*!
   * \brief Size of thread pool. 0 for no limit
   */
  int max_thread_count_;
69
70
71
72
73
};

/*!
 * \brief Network Receiver for DGL distributed training.
 *
74
75
76
77
78
 * Receiver is an abstract class that defines a set of APIs for receiving binary
 * data message over network. It can be implemented by different underlying
 * networking libraries such as TCP socket and MPI. One Receiver can connect
 * with multiple Senders and it can receive data from multiple Senders
 * concurrently.
79
 */
80
class Receiver : public rpc::RPCReceiver {
81
 public:
82
83
84
  /*!
   * \brief Receiver constructor
   * \param queue_size size of message queue.
85
   * \param max_thread_count size of thread pool. 0 for no limit
86
87
   * Note that, the queue_size parameter is optional.
   */
88
  explicit Receiver(int64_t queue_size = 0, int max_thread_count = 0) {
89
90
91
    if (queue_size < 0) {
      LOG(FATAL) << "queue_size cannot be a negative number.";
    }
92
    CHECK_GE(max_thread_count, 0);
93
    queue_size_ = queue_size;
94
    max_thread_count_ = max_thread_count;
95
96
  }

97
  virtual ~Receiver() {}
98

99
100
101
102
  /*!
   * \brief Recv data from Sender
   * \param msg pointer of data message
   * \param send_id which sender current msg comes from
103
104
   * \param timeout The timeout value in milliseconds. If zero, wait
   * indefinitely.
105
106
   * \return Status code
   *
107
   * (1) The Recv() API is thread-safe.
108
109
   * (2) Memory allocated by communicator but will not own it after the function
   * returns.
110
   */
111
  virtual STATUS Recv(Message* msg, int* send_id, int timeout = 0) = 0;
112
113

  /*!
114
115
116
   * \brief Recv data from a specified Sender
   * \param msg pointer of data message
   * \param send_id sender's ID
117
118
   * \param timeout The timeout value in milliseconds. If zero, wait
   * indefinitely.
119
120
   * \return Status code
   *
121
   * (1) The RecvFrom() API is thread-safe.
122
123
   * (2) Memory allocated by communicator but will not own it after the function
   * returns.
124
   */
125
  virtual STATUS RecvFrom(Message* msg, int send_id, int timeout = 0) = 0;
126

127
 protected:
128
  /*!
129
   * \brief Size of message queue
130
   */
131
  int64_t queue_size_;
132
133
134
135
  /*!
   * \brief Size of thread pool. 0 for no limit
   */
  int max_thread_count_;
136
137
138
139
140
};

}  // namespace network
}  // namespace dgl

141
#endif  // DGL_RPC_NETWORK_COMMUNICATOR_H_