/*! * Copyright (c) 2019 by Contributors * @file msg_queue.h * @brief Message queue for DGL distributed training. */ #ifndef DGL_RPC_NETWORK_MSG_QUEUE_H_ #define DGL_RPC_NETWORK_MSG_QUEUE_H_ #include #include #include #include #include #include #include #include #include // for pair namespace dgl { namespace network { typedef int STATUS; /*! * @brief Status code of message queue */ #define ADD_SUCCESS 3400 // Add message successfully #define MSG_GT_SIZE 3401 // Message size beyond queue size #define MSG_LE_ZERO 3402 // Message size is not a positive number #define QUEUE_CLOSE 3403 // Cannot add message when queue is closed #define QUEUE_FULL 3404 // Cannot add message when queue is full #define REMOVE_SUCCESS 3405 // Remove message successfully #define QUEUE_EMPTY 3406 // Cannot remove when queue is empty /*! * @brief Message used by network communicator and message queue. */ struct Message { /*! * @brief Constructor */ Message() {} /*! * @brief Constructor */ Message(char* data_ptr, int64_t data_size) : data(data_ptr), size(data_size) {} /*! * @brief message data */ char* data; /*! * @brief message size in bytes */ int64_t size; /*! * @brief message receiver id */ int receiver_id = -1; /*! * @brief user-defined deallocator, which can be nullptr */ std::function deallocator = nullptr; }; /*! * @brief Free memory buffer of message */ inline void DefaultMessageDeleter(Message* msg) { delete[] msg->data; } /*! * @brief Message Queue for network communication. * * MessageQueue is FIFO queue that adopts producer/consumer model for data * message. It supports one or more producer threads and one or more consumer * threads. Producers invokes Add() to push data message into the queue, and * consumers invokes Remove() to pop data message from queue. Add() and Remove() * use two condition variables to synchronize producer threads and consumer * threads. Each producer invokes SignalFinished(producer_id) to claim that it * is about to finish, where producer_id is an integer uniquely identify a * producer thread. This signaling mechanism prevents consumers from waiting * after all producers have finished their jobs. * * MessageQueue is thread-safe. * */ class MessageQueue { public: /*! * @brief MessageQueue constructor * @param queue_size size (bytes) of message queue * @param num_producers number of producers, use 1 by default */ explicit MessageQueue( int64_t queue_size /* in bytes */, int num_producers = 1); /*! * @brief MessageQueue deconstructor */ ~MessageQueue() {} /*! * @brief Add message to the queue * @param msg data message * @param is_blocking Blocking if cannot add, else return * @return Status code */ STATUS Add(Message msg, bool is_blocking = true); /*! * @brief Remove message from the queue * @param msg pointer of data msg * @param is_blocking Blocking if cannot remove, else return * @return Status code */ STATUS Remove(Message* msg, bool is_blocking = true); /*! * @brief Signal that producer producer_id will no longer produce anything * @param producer_id An integer uniquely to identify a producer thread */ void SignalFinished(int producer_id); /*! * @return true if queue is empty. */ bool Empty() const; /*! * @return true if queue is empty and all num_producers have signaled. */ bool EmptyAndNoMoreAdd() const; protected: /*! * @brief message queue */ std::queue queue_; /*! * @brief Size of the queue in bytes */ int64_t queue_size_; /*! * @brief Free size of the queue */ int64_t free_size_; /*! * @brief Used to check all producers will no longer produce anything */ size_t num_producers_; /*! * @brief Store finished producer id */ std::set finished_producers_; /*! * @brief Condition when consumer should wait */ std::condition_variable cond_not_full_; /*! * @brief Condition when producer should wait */ std::condition_variable cond_not_empty_; /*! * @brief Signal for exit wait */ std::atomic exit_flag_{false}; /*! * @brief Protect all above data and conditions */ mutable std::mutex mutex_; }; } // namespace network } // namespace dgl #endif // DGL_RPC_NETWORK_MSG_QUEUE_H_