/** * Copyright (c) 2021 by Contributors * @file socket_pool.h * @brief Socket pool of nonblocking sockets for DGL distributed training. */ #ifndef DGL_RPC_NETWORK_SOCKET_POOL_H_ #define DGL_RPC_NETWORK_SOCKET_POOL_H_ #include #include #include namespace dgl { namespace network { class TCPSocket; /** * @brief SocketPool maintains a group of nonblocking sockets, and can provide * active sockets. * Currently SocketPool is based on epoll, a scalable I/O event notification * mechanism in Linux operating system. */ class SocketPool { public: /** * @brief socket mode read/receive */ static const int READ = 1; /** * @brief socket mode write/send */ static const int WRITE = 2; /** * @brief SocketPool constructor */ SocketPool(); /** * @brief Add a socket to SocketPool * @param socket tcp socket to add * @param socket_id receiver/sender id of the socket * @param events READ, WRITE or READ + WRITE */ void AddSocket( std::shared_ptr socket, int socket_id, int events = READ); /** * @brief Remove socket from SocketPool * @param socket tcp socket to remove * @return number of remaing sockets in the pool */ size_t RemoveSocket(std::shared_ptr socket); /** * @brief SocketPool destructor */ ~SocketPool(); /** * @brief Get current active socket. This is a blocking method * @param socket_id output parameter of the socket_id of active socket * @return active TCPSocket */ std::shared_ptr GetActiveSocket(int* socket_id); private: /** * @brief Wait for event notification */ void Wait(); /** * @brief map from fd to TCPSocket */ std::unordered_map> tcp_sockets_; /** * @brief map from fd to socket_id */ std::unordered_map socket_ids_; /** * @brief fd for epoll base */ int epfd_; /** * @brief queue for current active fds */ std::queue pending_fds_; }; } // namespace network } // namespace dgl #endif // DGL_RPC_NETWORK_SOCKET_POOL_H_