"vscode:/vscode.git/clone" did not exist on "3118fb520d5c8f0d413241104b27848c46c2460e"
msg_queue.h 4.42 KB
Newer Older
1
2
3
4
5
/*!
 *  Copyright (c) 2019 by Contributors
 * \file msg_queue.h
 * \brief Message queue for DGL distributed training.
 */
6
7
#ifndef DGL_RPC_NETWORK_MSG_QUEUE_H_
#define DGL_RPC_NETWORK_MSG_QUEUE_H_
8

9
10
#include <dgl/runtime/ndarray.h>

11
12
13
14
15
16
17
#include <queue>
#include <set>
#include <string>
#include <utility>  // for pair
#include <mutex>
#include <condition_variable>
#include <atomic>
18
#include <functional>
19
20
21
22

namespace dgl {
namespace network {

23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
typedef int STATUS;

/*!
 * \brief Status code of message queue
 */
#define  ADD_SUCCESS     3400   // Add message successfully
#define  MSG_GT_SIZE     3401   // Message size beyond queue size
#define  MSG_LE_ZERO     3402   // Message size is not a positive number
#define  QUEUE_CLOSE     3403   // Cannot add message when queue is closed
#define  QUEUE_FULL      3404   // Cannot add message when queue is full
#define  REMOVE_SUCCESS  3405   // Remove message successfully
#define  QUEUE_EMPTY     3406   // Cannot remove when queue is empty

/*!
 * \brief Message used by network communicator and message queue.
 */
struct Message {
  /*!
   * \brief Constructor
   */
  Message() { }

  /*!
   * \brief Constructor
   */ 
  Message(char* data_ptr, int64_t data_size)
  : data(data_ptr), size(data_size) { }

  /*!
   * \brief message data
   */
  char* data;
  /*!
   * \brief message size in bytes
   */
  int64_t size;
  /*!
   * \brief user-defined deallocator, which can be nullptr
   */
  std::function<void(Message*)> deallocator = nullptr;
};

65
/*!
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
 * \brief Free memory buffer of message
 */
inline void DefaultMessageDeleter(Message* msg) { delete [] msg->data; }

/*!
 * \brief Message Queue for network communication.
 *
 * MessageQueue is FIFO queue that adopts producer/consumer model for data message. 
 * It supports one or more producer threads and one or more consumer threads. 
 * Producers invokes Add() to push data message into the queue, and consumers 
 * invokes Remove() to pop data message from queue. Add() and Remove() use two condition
 * variables to synchronize producer threads and consumer threads. Each producer 
 * invokes SignalFinished(producer_id) to claim that it is about to finish, where 
 * producer_id is an integer uniquely identify a producer thread. This signaling mechanism 
 * prevents consumers from waiting after all producers have finished their jobs. 
81
 *
82
 * MessageQueue is thread-safe.
83
84
85
86
87
88
 * 
 */
class MessageQueue {
 public:
  /*!
   * \brief MessageQueue constructor
89
   * \param queue_size size (bytes) of message queue
90
91
92
93
94
95
96
97
   * \param num_producers number of producers, use 1 by default
   */
  MessageQueue(int64_t queue_size /* in bytes */,
               int num_producers = 1);

  /*!
   * \brief MessageQueue deconstructor
   */
98
  ~MessageQueue() {}
99
100

  /*!
101
102
103
104
   * \brief Add message to the queue
   * \param msg data message
   * \param is_blocking Blocking if cannot add, else return
   * \return Status code
105
   */
106
  STATUS Add(Message msg, bool is_blocking = true);
107
108
109

  /*!
   * \brief Remove message from the queue
110
111
112
   * \param msg pointer of data msg
   * \param is_blocking Blocking if cannot remove, else return
   * \return Status code
113
   */
114
  STATUS Remove(Message* msg, bool is_blocking = true);
115
116

  /*!
117
118
   * \brief Signal that producer producer_id will no longer produce anything
   * \param producer_id An integer uniquely to identify a producer thread
119
   */
120
  void SignalFinished(int producer_id);
121
122

  /*!
123
   * \return true if queue is empty.
124
   */
125
  bool Empty() const;
126
127
128
129
130
131
132
133

  /*!
   * \return true if queue is empty and all num_producers have signaled.
   */
  bool EmptyAndNoMoreAdd() const;

 protected:
  /*! 
134
   * \brief message queue 
135
   */
136
  std::queue<Message> queue_;
137
138
139
140
141
142
143
144
145
146
147
148
149
150

  /*! 
   * \brief Size of the queue in bytes 
   */
  int64_t queue_size_;

  /*! 
   * \brief Free size of the queue 
   */
  int64_t free_size_;

  /*! 
   * \brief Used to check all producers will no longer produce anything 
   */
Da Zheng's avatar
Da Zheng committed
151
  size_t num_producers_;
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181

  /*! 
   * \brief Store finished producer id 
   */
  std::set<int /* producer_id */> finished_producers_;

  /*! 
   * \brief Condition when consumer should wait 
   */
  std::condition_variable cond_not_full_;

  /*! 
   * \brief Condition when producer should wait 
   */
  std::condition_variable cond_not_empty_;

  /*! 
   * \brief Signal for exit wait 
   */
  std::atomic<bool> exit_flag_{false};

  /*! 
   * \brief Protect all above data and conditions 
   */
  mutable std::mutex mutex_;
};

}  // namespace network
}  // namespace dgl

182
#endif  // DGL_RPC_NETWORK_MSG_QUEUE_H_