"examples/sampling/graphbolt/sparse/graphsage.py" did not exist on "7af522db917be320e768bc83099d33519e6e9bb8"
socket_pool.h 2.08 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
/*!
 *  Copyright (c) 2021 by Contributors
 * \file socket_pool.h
 * \brief Socket pool of nonblocking sockets for DGL distributed training.
 */
#ifndef DGL_RPC_NETWORK_SOCKET_POOL_H_
#define DGL_RPC_NETWORK_SOCKET_POOL_H_

#include <unordered_map>
#include <queue>
#include <memory>

namespace dgl {
namespace network {

class TCPSocket;

/*!
 * \brief SocketPool maintains a group of nonblocking sockets, and can provide
 * active sockets.
 * Currently SocketPool is based on epoll, a scalable I/O event notification
 * mechanism in Linux operating system. 
 */
class SocketPool {
 public:
  /*!
   * \brief socket mode read/receive
   */
  static const int READ = 1;
  /*!
   * \brief socket mode write/send
   */
  static const int WRITE = 2;
  /*!
   * \brief SocketPool constructor
   */
  SocketPool();

  /*!
   * \brief Add a socket to SocketPool
   * \param socket tcp socket to add
   * \param socket_id receiver/sender id of the socket
   * \param events READ, WRITE or READ + WRITE
   */
  void AddSocket(std::shared_ptr<TCPSocket> socket, int socket_id,
    int events = READ);

  /*!
   * \brief Remove socket from SocketPool
   * \param socket tcp socket to remove
   * \return number of remaing sockets in the pool
   */
  size_t RemoveSocket(std::shared_ptr<TCPSocket> socket);

  /*!
   * \brief SocketPool destructor
   */
  ~SocketPool();

  /*!
   * \brief Get current active socket. This is a blocking method
   * \param socket_id output parameter of the socket_id of active socket
   * \return active TCPSocket
   */
  std::shared_ptr<TCPSocket> GetActiveSocket(int* socket_id);

 private:
  /*!
   * \brief Wait for event notification
   */
  void Wait();

  /*!
   * \brief map from fd to TCPSocket
   */
  std::unordered_map<int, std::shared_ptr<TCPSocket>> tcp_sockets_;

  /*!
   * \brief map from fd to socket_id
   */
  std::unordered_map<int, int> socket_ids_;

  /*!
   * \brief fd for epoll base
   */
  int epfd_;

  /*!
   * \brief queue for current active fds
   */
  std::queue<int> pending_fds_;
};

}  // namespace network
}  // namespace dgl

#endif  // DGL_RPC_NETWORK_SOCKET_POOL_H_