communicator.h 3.98 KB
Newer Older
1
/**
2
 *  Copyright (c) 2019 by Contributors
3
4
 * @file communicator.h
 * @brief Communicator for DGL distributed training.
5
 */
6
7
#ifndef DGL_RPC_NETWORK_COMMUNICATOR_H_
#define DGL_RPC_NETWORK_COMMUNICATOR_H_
8

9
10
#include <dmlc/logging.h>

11
12
#include <string>

13
14
#include "msg_queue.h"

15
16
17
namespace dgl {
namespace network {

18
/**
19
 * @brief Network Sender for DGL distributed training.
20
 *
21
22
23
24
25
 * Sender is an abstract class that defines a set of APIs for sending binary
 * data message over network. It can be implemented by different underlying
 * networking libraries such TCP socket and MPI. One Sender can connect to
 * multiple receivers and it can send data to specified receiver via receiver's
 * ID.
26
 */
27
class Sender {
28
 public:
29
  /**
30
31
32
   * @brief Sender constructor
   * @param queue_size size (bytes) of message queue.
   * @param max_thread_count size of thread pool. 0 for no limit
33
34
   * Note that, the queue_size parameter is optional.
   */
35
  explicit Sender(int64_t queue_size = 0, int max_thread_count = 0) {
36
    CHECK_GE(queue_size, 0);
37
    CHECK_GE(max_thread_count, 0);
38
    queue_size_ = queue_size;
39
    max_thread_count_ = max_thread_count;
40
41
  }

42
  virtual ~Sender() {}
43

44
  /**
45
46
47
48
   * @brief Send data to specified Receiver.
   * @param msg data message
   * @param recv_id receiver's ID
   * @return Status code
49
   *
50
51
52
53
54
55
   * (1) The send is non-blocking. There is no guarantee that the message has
   * been physically sent out when the function returns. (2) The communicator
   * will assume the responsibility of the given message. (3) The API is
   * multi-thread safe. (4) Messages sent to the same receiver are guaranteed to
   * be received in the same order. There is no guarantee for messages sent to
   * different receivers.
56
   */
57
  virtual STATUS Send(Message msg, int recv_id) = 0;
58

59
 protected:
60
  /**
61
   * @brief Size of message queue
62
   */
63
  int64_t queue_size_;
64
  /**
65
   * @brief Size of thread pool. 0 for no limit
66
67
   */
  int max_thread_count_;
68
69
};

70
/**
71
 * @brief Network Receiver for DGL distributed training.
72
 *
73
74
75
76
77
 * Receiver is an abstract class that defines a set of APIs for receiving binary
 * data message over network. It can be implemented by different underlying
 * networking libraries such as TCP socket and MPI. One Receiver can connect
 * with multiple Senders and it can receive data from multiple Senders
 * concurrently.
78
 */
79
class Receiver {
80
 public:
81
  /**
82
83
84
   * @brief Receiver constructor
   * @param queue_size size of message queue.
   * @param max_thread_count size of thread pool. 0 for no limit
85
86
   * Note that, the queue_size parameter is optional.
   */
87
  explicit Receiver(int64_t queue_size = 0, int max_thread_count = 0) {
88
89
90
    if (queue_size < 0) {
      LOG(FATAL) << "queue_size cannot be a negative number.";
    }
91
    CHECK_GE(max_thread_count, 0);
92
    queue_size_ = queue_size;
93
    max_thread_count_ = max_thread_count;
94
95
  }

96
  virtual ~Receiver() {}
97

98
  /**
99
100
101
102
   * @brief Recv data from Sender
   * @param msg pointer of data message
   * @param send_id which sender current msg comes from
   * @param timeout The timeout value in milliseconds. If zero, wait
103
   * indefinitely.
104
   * @return Status code
105
   *
106
   * (1) The Recv() API is thread-safe.
107
108
   * (2) Memory allocated by communicator but will not own it after the function
   * returns.
109
   */
110
  virtual STATUS Recv(Message* msg, int* send_id, int timeout = 0) = 0;
111

112
  /**
113
114
115
116
   * @brief Recv data from a specified Sender
   * @param msg pointer of data message
   * @param send_id sender's ID
   * @param timeout The timeout value in milliseconds. If zero, wait
117
   * indefinitely.
118
   * @return Status code
119
   *
120
   * (1) The RecvFrom() API is thread-safe.
121
122
   * (2) Memory allocated by communicator but will not own it after the function
   * returns.
123
   */
124
  virtual STATUS RecvFrom(Message* msg, int send_id, int timeout = 0) = 0;
125

126
 protected:
127
  /**
128
   * @brief Size of message queue
129
   */
130
  int64_t queue_size_;
131
  /**
132
   * @brief Size of thread pool. 0 for no limit
133
134
   */
  int max_thread_count_;
135
136
137
138
139
};

}  // namespace network
}  // namespace dgl

140
#endif  // DGL_RPC_NETWORK_COMMUNICATOR_H_