communicator.h 4.05 KB
Newer Older
1
/**
2
 *  Copyright (c) 2019 by Contributors
3
4
 * @file communicator.h
 * @brief Communicator for DGL distributed training.
5
 */
6
7
#ifndef DGL_RPC_NETWORK_COMMUNICATOR_H_
#define DGL_RPC_NETWORK_COMMUNICATOR_H_
8

9
10
#include <dmlc/logging.h>

11
12
#include <string>

13
#include "../net_type.h"
14
15
#include "msg_queue.h"

16
17
18
namespace dgl {
namespace network {

19
/**
20
 * @brief Network Sender for DGL distributed training.
21
 *
22
23
24
25
26
 * Sender is an abstract class that defines a set of APIs for sending binary
 * data message over network. It can be implemented by different underlying
 * networking libraries such TCP socket and MPI. One Sender can connect to
 * multiple receivers and it can send data to specified receiver via receiver's
 * ID.
27
 */
28
class Sender : public rpc::RPCSender {
29
 public:
30
  /**
31
32
33
   * @brief Sender constructor
   * @param queue_size size (bytes) of message queue.
   * @param max_thread_count size of thread pool. 0 for no limit
34
35
   * Note that, the queue_size parameter is optional.
   */
36
  explicit Sender(int64_t queue_size = 0, int max_thread_count = 0) {
37
    CHECK_GE(queue_size, 0);
38
    CHECK_GE(max_thread_count, 0);
39
    queue_size_ = queue_size;
40
    max_thread_count_ = max_thread_count;
41
42
  }

43
  virtual ~Sender() {}
44

45
  /**
46
47
48
49
   * @brief Send data to specified Receiver.
   * @param msg data message
   * @param recv_id receiver's ID
   * @return Status code
50
   *
51
52
53
54
55
56
   * (1) The send is non-blocking. There is no guarantee that the message has
   * been physically sent out when the function returns. (2) The communicator
   * will assume the responsibility of the given message. (3) The API is
   * multi-thread safe. (4) Messages sent to the same receiver are guaranteed to
   * be received in the same order. There is no guarantee for messages sent to
   * different receivers.
57
   */
58
  virtual STATUS Send(Message msg, int recv_id) = 0;
59

60
 protected:
61
  /**
62
   * @brief Size of message queue
63
   */
64
  int64_t queue_size_;
65
  /**
66
   * @brief Size of thread pool. 0 for no limit
67
68
   */
  int max_thread_count_;
69
70
};

71
/**
72
 * @brief Network Receiver for DGL distributed training.
73
 *
74
75
76
77
78
 * Receiver is an abstract class that defines a set of APIs for receiving binary
 * data message over network. It can be implemented by different underlying
 * networking libraries such as TCP socket and MPI. One Receiver can connect
 * with multiple Senders and it can receive data from multiple Senders
 * concurrently.
79
 */
80
class Receiver : public rpc::RPCReceiver {
81
 public:
82
  /**
83
84
85
   * @brief Receiver constructor
   * @param queue_size size of message queue.
   * @param max_thread_count size of thread pool. 0 for no limit
86
87
   * Note that, the queue_size parameter is optional.
   */
88
  explicit Receiver(int64_t queue_size = 0, int max_thread_count = 0) {
89
90
91
    if (queue_size < 0) {
      LOG(FATAL) << "queue_size cannot be a negative number.";
    }
92
    CHECK_GE(max_thread_count, 0);
93
    queue_size_ = queue_size;
94
    max_thread_count_ = max_thread_count;
95
96
  }

97
  virtual ~Receiver() {}
98

99
  /**
100
101
102
103
   * @brief Recv data from Sender
   * @param msg pointer of data message
   * @param send_id which sender current msg comes from
   * @param timeout The timeout value in milliseconds. If zero, wait
104
   * indefinitely.
105
   * @return Status code
106
   *
107
   * (1) The Recv() API is thread-safe.
108
109
   * (2) Memory allocated by communicator but will not own it after the function
   * returns.
110
   */
111
  virtual STATUS Recv(Message* msg, int* send_id, int timeout = 0) = 0;
112

113
  /**
114
115
116
117
   * @brief Recv data from a specified Sender
   * @param msg pointer of data message
   * @param send_id sender's ID
   * @param timeout The timeout value in milliseconds. If zero, wait
118
   * indefinitely.
119
   * @return Status code
120
   *
121
   * (1) The RecvFrom() API is thread-safe.
122
123
   * (2) Memory allocated by communicator but will not own it after the function
   * returns.
124
   */
125
  virtual STATUS RecvFrom(Message* msg, int send_id, int timeout = 0) = 0;
126

127
 protected:
128
  /**
129
   * @brief Size of message queue
130
   */
131
  int64_t queue_size_;
132
  /**
133
   * @brief Size of thread pool. 0 for no limit
134
135
   */
  int max_thread_count_;
136
137
138
139
140
};

}  // namespace network
}  // namespace dgl

141
#endif  // DGL_RPC_NETWORK_COMMUNICATOR_H_