"vscode:/vscode.git/clone" did not exist on "f52fc3fcdf6cf22fd6a7c4db789f1a530a01db19"
msg_queue.h 4.43 KB
Newer Older
1
2
3
4
5
6
7
8
/*!
 *  Copyright (c) 2019 by Contributors
 * \file msg_queue.h
 * \brief Message queue for DGL distributed training.
 */
#ifndef DGL_GRAPH_NETWORK_MSG_QUEUE_H_
#define DGL_GRAPH_NETWORK_MSG_QUEUE_H_

9
10
#include <dgl/runtime/ndarray.h>

11
12
13
14
15
16
17
#include <queue>
#include <set>
#include <string>
#include <utility>  // for pair
#include <mutex>
#include <condition_variable>
#include <atomic>
18
#include <functional>
19
20
21
22

namespace dgl {
namespace network {

23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
typedef int STATUS;

/*!
 * \brief Status code of message queue
 */
#define  ADD_SUCCESS     3400   // Add message successfully
#define  MSG_GT_SIZE     3401   // Message size beyond queue size
#define  MSG_LE_ZERO     3402   // Message size is not a positive number
#define  QUEUE_CLOSE     3403   // Cannot add message when queue is closed
#define  QUEUE_FULL      3404   // Cannot add message when queue is full
#define  REMOVE_SUCCESS  3405   // Remove message successfully
#define  QUEUE_EMPTY     3406   // Cannot remove when queue is empty

/*!
 * \brief Message used by network communicator and message queue.
 */
struct Message {
  /*!
   * \brief Constructor
   */
  Message() { }

  /*!
   * \brief Constructor
   */ 
  Message(char* data_ptr, int64_t data_size)
  : data(data_ptr), size(data_size) { }

  /*!
   * \brief message data
   */
  char* data;
  /*!
   * \brief message size in bytes
   */
  int64_t size;
  /*!
   * \brief user-defined deallocator, which can be nullptr
   */
  std::function<void(Message*)> deallocator = nullptr;
};

65
/*!
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
 * \brief Free memory buffer of message
 */
inline void DefaultMessageDeleter(Message* msg) { delete [] msg->data; }

/*!
 * \brief Message Queue for network communication.
 *
 * MessageQueue is FIFO queue that adopts producer/consumer model for data message. 
 * It supports one or more producer threads and one or more consumer threads. 
 * Producers invokes Add() to push data message into the queue, and consumers 
 * invokes Remove() to pop data message from queue. Add() and Remove() use two condition
 * variables to synchronize producer threads and consumer threads. Each producer 
 * invokes SignalFinished(producer_id) to claim that it is about to finish, where 
 * producer_id is an integer uniquely identify a producer thread. This signaling mechanism 
 * prevents consumers from waiting after all producers have finished their jobs. 
81
 *
82
 * MessageQueue is thread-safe.
83
84
85
86
87
88
 * 
 */
class MessageQueue {
 public:
  /*!
   * \brief MessageQueue constructor
89
   * \param queue_size size (bytes) of message queue
90
91
92
93
94
95
96
97
   * \param num_producers number of producers, use 1 by default
   */
  MessageQueue(int64_t queue_size /* in bytes */,
               int num_producers = 1);

  /*!
   * \brief MessageQueue deconstructor
   */
98
  ~MessageQueue() {}
99
100

  /*!
101
102
103
104
   * \brief Add message to the queue
   * \param msg data message
   * \param is_blocking Blocking if cannot add, else return
   * \return Status code
105
   */
106
  STATUS Add(Message msg, bool is_blocking = true);
107
108
109

  /*!
   * \brief Remove message from the queue
110
111
112
   * \param msg pointer of data msg
   * \param is_blocking Blocking if cannot remove, else return
   * \return Status code
113
   */
114
  STATUS Remove(Message* msg, bool is_blocking = true);
115
116

  /*!
117
118
   * \brief Signal that producer producer_id will no longer produce anything
   * \param producer_id An integer uniquely to identify a producer thread
119
   */
120
  void SignalFinished(int producer_id);
121
122

  /*!
123
   * \return true if queue is empty.
124
   */
125
  bool Empty() const;
126
127
128
129
130
131
132
133

  /*!
   * \return true if queue is empty and all num_producers have signaled.
   */
  bool EmptyAndNoMoreAdd() const;

 protected:
  /*! 
134
   * \brief message queue 
135
   */
136
  std::queue<Message> queue_;
137
138
139
140
141
142
143
144
145
146
147
148
149
150

  /*! 
   * \brief Size of the queue in bytes 
   */
  int64_t queue_size_;

  /*! 
   * \brief Free size of the queue 
   */
  int64_t free_size_;

  /*! 
   * \brief Used to check all producers will no longer produce anything 
   */
Da Zheng's avatar
Da Zheng committed
151
  size_t num_producers_;
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182

  /*! 
   * \brief Store finished producer id 
   */
  std::set<int /* producer_id */> finished_producers_;

  /*! 
   * \brief Condition when consumer should wait 
   */
  std::condition_variable cond_not_full_;

  /*! 
   * \brief Condition when producer should wait 
   */
  std::condition_variable cond_not_empty_;

  /*! 
   * \brief Signal for exit wait 
   */
  std::atomic<bool> exit_flag_{false};

  /*! 
   * \brief Protect all above data and conditions 
   */
  mutable std::mutex mutex_;
};

}  // namespace network
}  // namespace dgl

#endif  // DGL_GRAPH_NETWORK_MSG_QUEUE_H_