/*! * Copyright (c) 2019 by Contributors * \file communicator.h * \brief Communicator for DGL distributed training. */ #ifndef DGL_RPC_NETWORK_COMMUNICATOR_H_ #define DGL_RPC_NETWORK_COMMUNICATOR_H_ #include #include #include "../net_type.h" #include "msg_queue.h" namespace dgl { namespace network { /*! * \brief Network Sender for DGL distributed training. * * Sender is an abstract class that defines a set of APIs for sending binary * data message over network. It can be implemented by different underlying * networking libraries such TCP socket and MPI. One Sender can connect to * multiple receivers and it can send data to specified receiver via receiver's ID. */ class Sender : public rpc::RPCSender { public: /*! * \brief Sender constructor * \param queue_size size (bytes) of message queue. * \param max_thread_count size of thread pool. 0 for no limit * Note that, the queue_size parameter is optional. */ explicit Sender(int64_t queue_size = 0, int max_thread_count = 0) { CHECK_GE(queue_size, 0); CHECK_GE(max_thread_count, 0); queue_size_ = queue_size; max_thread_count_ = max_thread_count; } virtual ~Sender() {} /*! * \brief Send data to specified Receiver. * \param msg data message * \param recv_id receiver's ID * \return Status code * * (1) The send is non-blocking. There is no guarantee that the message has been * physically sent out when the function returns. * (2) The communicator will assume the responsibility of the given message. * (3) The API is multi-thread safe. * (4) Messages sent to the same receiver are guaranteed to be received in the same order. * There is no guarantee for messages sent to different receivers. */ virtual STATUS Send(Message msg, int recv_id) = 0; protected: /*! * \brief Size of message queue */ int64_t queue_size_; /*! * \brief Size of thread pool. 0 for no limit */ int max_thread_count_; }; /*! * \brief Network Receiver for DGL distributed training. * * Receiver is an abstract class that defines a set of APIs for receiving binary data * message over network. It can be implemented by different underlying networking * libraries such as TCP socket and MPI. One Receiver can connect with multiple Senders * and it can receive data from multiple Senders concurrently. */ class Receiver : public rpc::RPCReceiver { public: /*! * \brief Receiver constructor * \param queue_size size of message queue. * \param max_thread_count size of thread pool. 0 for no limit * Note that, the queue_size parameter is optional. */ explicit Receiver(int64_t queue_size = 0, int max_thread_count = 0) { if (queue_size < 0) { LOG(FATAL) << "queue_size cannot be a negative number."; } CHECK_GE(max_thread_count, 0); queue_size_ = queue_size; max_thread_count_ = max_thread_count; } virtual ~Receiver() {} /*! * \brief Recv data from Sender * \param msg pointer of data message * \param send_id which sender current msg comes from * \return Status code * * (1) The Recv() API is blocking, which will not * return until getting data from message queue. * (2) The Recv() API is thread-safe. * (3) Memory allocated by communicator but will not own it after the function returns. */ virtual STATUS Recv(Message* msg, int* send_id) = 0; /*! * \brief Recv data from a specified Sender * \param msg pointer of data message * \param send_id sender's ID * \return Status code * * (1) The RecvFrom() API is blocking, which will not * return until getting data from message queue. * (2) The RecvFrom() API is thread-safe. * (3) Memory allocated by communicator but will not own it after the function returns. */ virtual STATUS RecvFrom(Message* msg, int send_id) = 0; protected: /*! * \brief Size of message queue */ int64_t queue_size_; /*! * \brief Size of thread pool. 0 for no limit */ int max_thread_count_; }; } // namespace network } // namespace dgl #endif // DGL_RPC_NETWORK_COMMUNICATOR_H_