"src/vscode:/vscode.git/clone" did not exist on "bacf2ab457ef9aa6b2aabdde9fb55e5e3c967a9e"
msg_queue.h 4.46 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/*!
 *  Copyright (c) 2019 by Contributors
 * \file msg_queue.h
 * \brief Message queue for DGL distributed training.
 */
#ifndef DGL_GRAPH_NETWORK_MSG_QUEUE_H_
#define DGL_GRAPH_NETWORK_MSG_QUEUE_H_

#include <queue>
#include <set>
#include <string>
#include <utility>  // for pair
#include <mutex>
#include <condition_variable>
#include <atomic>
16
#include <functional>
17
18
19
20

namespace dgl {
namespace network {

21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
typedef int STATUS;

/*!
 * \brief Status code of message queue
 */
#define  ADD_SUCCESS     3400   // Add message successfully
#define  MSG_GT_SIZE     3401   // Message size beyond queue size
#define  MSG_LE_ZERO     3402   // Message size is not a positive number
#define  QUEUE_CLOSE     3403   // Cannot add message when queue is closed
#define  QUEUE_FULL      3404   // Cannot add message when queue is full
#define  REMOVE_SUCCESS  3405   // Remove message successfully
#define  QUEUE_EMPTY     3406   // Cannot remove when queue is empty

/*!
 * \brief Message used by network communicator and message queue.
 */
struct Message {
  /*!
   * \brief Constructor
   */
  Message() { }

  /*!
   * \brief Constructor
   */ 
  Message(char* data_ptr, int64_t data_size)
  : data(data_ptr), size(data_size) { }

  /*!
   * \brief message data
   */
  char* data;
  /*!
   * \brief message size in bytes
   */
  int64_t size;
  /*!
   * \brief aux_data pointer handler
   */
  void* aux_handler;
  /*!
   * \brief user-defined deallocator, which can be nullptr
   */
  std::function<void(Message*)> deallocator = nullptr;
};

67
/*!
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
 * \brief Free memory buffer of message
 */
inline void DefaultMessageDeleter(Message* msg) { delete [] msg->data; }

/*!
 * \brief Message Queue for network communication.
 *
 * MessageQueue is FIFO queue that adopts producer/consumer model for data message. 
 * It supports one or more producer threads and one or more consumer threads. 
 * Producers invokes Add() to push data message into the queue, and consumers 
 * invokes Remove() to pop data message from queue. Add() and Remove() use two condition
 * variables to synchronize producer threads and consumer threads. Each producer 
 * invokes SignalFinished(producer_id) to claim that it is about to finish, where 
 * producer_id is an integer uniquely identify a producer thread. This signaling mechanism 
 * prevents consumers from waiting after all producers have finished their jobs. 
83
 *
84
 * MessageQueue is thread-safe.
85
86
87
88
89
90
 * 
 */
class MessageQueue {
 public:
  /*!
   * \brief MessageQueue constructor
91
   * \param queue_size size (bytes) of message queue
92
93
94
95
96
97
98
99
   * \param num_producers number of producers, use 1 by default
   */
  MessageQueue(int64_t queue_size /* in bytes */,
               int num_producers = 1);

  /*!
   * \brief MessageQueue deconstructor
   */
100
  ~MessageQueue() {}
101
102

  /*!
103
104
105
106
   * \brief Add message to the queue
   * \param msg data message
   * \param is_blocking Blocking if cannot add, else return
   * \return Status code
107
   */
108
  STATUS Add(Message msg, bool is_blocking = true);
109
110
111

  /*!
   * \brief Remove message from the queue
112
113
114
   * \param msg pointer of data msg
   * \param is_blocking Blocking if cannot remove, else return
   * \return Status code
115
   */
116
  STATUS Remove(Message* msg, bool is_blocking = true);
117
118

  /*!
119
120
   * \brief Signal that producer producer_id will no longer produce anything
   * \param producer_id An integer uniquely to identify a producer thread
121
   */
122
  void SignalFinished(int producer_id);
123
124

  /*!
125
   * \return true if queue is empty.
126
   */
127
  bool Empty() const;
128
129
130
131
132
133
134
135

  /*!
   * \return true if queue is empty and all num_producers have signaled.
   */
  bool EmptyAndNoMoreAdd() const;

 protected:
  /*! 
136
   * \brief message queue 
137
   */
138
  std::queue<Message> queue_;
139
140
141
142
143
144
145
146
147
148
149
150
151
152

  /*! 
   * \brief Size of the queue in bytes 
   */
  int64_t queue_size_;

  /*! 
   * \brief Free size of the queue 
   */
  int64_t free_size_;

  /*! 
   * \brief Used to check all producers will no longer produce anything 
   */
Da Zheng's avatar
Da Zheng committed
153
  size_t num_producers_;
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184

  /*! 
   * \brief Store finished producer id 
   */
  std::set<int /* producer_id */> finished_producers_;

  /*! 
   * \brief Condition when consumer should wait 
   */
  std::condition_variable cond_not_full_;

  /*! 
   * \brief Condition when producer should wait 
   */
  std::condition_variable cond_not_empty_;

  /*! 
   * \brief Signal for exit wait 
   */
  std::atomic<bool> exit_flag_{false};

  /*! 
   * \brief Protect all above data and conditions 
   */
  mutable std::mutex mutex_;
};

}  // namespace network
}  // namespace dgl

#endif  // DGL_GRAPH_NETWORK_MSG_QUEUE_H_