Unverified Commit b89dcce1 authored by Chao Ma's avatar Chao Ma Committed by GitHub
Browse files

[RPC] Refactoring networking APIs (#496)

* Refactoring network API

* update demo

* update

* update demo

* update demo

* add num_sender

* update

* fix lint

* fix lint

* fix lint

* update
parent 688a9228
### Demo for Distributed Sampler ### Demo for Distributed Sampler
First we need to change the `--ip` and `--port` in `run_trainer.sh` and `run_sampler.sh` for your own environemnt. First we need to change the `--ip` in `run_trainer.sh` and `run_sampler.sh` for your own environemnt.
Then we need to start trainer node: Then we need to start trainer node:
......
...@@ -15,7 +15,8 @@ class MySamplerPool(SamplerPool): ...@@ -15,7 +15,8 @@ class MySamplerPool(SamplerPool):
"""User-defined worker function """User-defined worker function
""" """
# Start sender # Start sender
sender = dgl.contrib.sampling.SamplerSender(ip=args.ip, port=args.port) namebook = { 0:args.ip }
sender = dgl.contrib.sampling.SamplerSender(namebook)
# load and preprocess dataset # load and preprocess dataset
data = load_data(args) data = load_data(args)
...@@ -41,10 +42,9 @@ class MySamplerPool(SamplerPool): ...@@ -41,10 +42,9 @@ class MySamplerPool(SamplerPool):
num_hops=args.n_layers+1, num_hops=args.n_layers+1,
seed_nodes=train_nid): seed_nodes=train_nid):
print("send train nodeflow: %d" %(idx)) print("send train nodeflow: %d" %(idx))
sender.send(nf) sender.send(nf, 0)
idx += 1 idx += 1
def main(args): def main(args):
pool = MySamplerPool() pool = MySamplerPool()
pool.start(args.num_sender, args) pool.start(args.num_sender, args)
...@@ -64,12 +64,10 @@ if __name__ == '__main__': ...@@ -64,12 +64,10 @@ if __name__ == '__main__':
help="graph self-loop (default=False)") help="graph self-loop (default=False)")
parser.add_argument("--n-layers", type=int, default=1, parser.add_argument("--n-layers", type=int, default=1,
help="number of hidden gcn layers") help="number of hidden gcn layers")
parser.add_argument("--ip", type=str, default='127.0.0.1', parser.add_argument("--ip", type=str, default='127.0.0.1:50051',
help="ip address of remote trainer machine") help="IP address of remote trainer machine")
parser.add_argument("--port", type=int, default=2049,
help="listen port of remote trainer machine")
parser.add_argument("--num-sender", type=int, default=1, parser.add_argument("--num-sender", type=int, default=1,
help="total number of sampler sender") help="Number of sampler sender machine")
args = parser.parse_args() args = parser.parse_args()
print(args) print(args)
......
...@@ -123,7 +123,7 @@ def main(args): ...@@ -123,7 +123,7 @@ def main(args):
data.graph.add_edges_from([(i,i) for i in range(len(data.graph))]) data.graph.add_edges_from([(i,i) for i in range(len(data.graph))])
# Create sampler receiver # Create sampler receiver
receiver = dgl.contrib.sampling.SamplerReceiver(ip=args.ip, port=args.port, num_sender=args.num_sender) receiver = dgl.contrib.sampling.SamplerReceiver(addr=args.ip, num_sender=args.num_sender)
train_nid = mx.nd.array(np.nonzero(data.train_mask)[0]).astype(np.int64).as_in_context(ctx) train_nid = mx.nd.array(np.nonzero(data.train_mask)[0]).astype(np.int64).as_in_context(ctx)
test_nid = mx.nd.array(np.nonzero(data.test_mask)[0]).astype(np.int64).as_in_context(ctx) test_nid = mx.nd.array(np.nonzero(data.test_mask)[0]).astype(np.int64).as_in_context(ctx)
...@@ -255,10 +255,8 @@ if __name__ == '__main__': ...@@ -255,10 +255,8 @@ if __name__ == '__main__':
help="graph self-loop (default=False)") help="graph self-loop (default=False)")
parser.add_argument("--weight-decay", type=float, default=5e-4, parser.add_argument("--weight-decay", type=float, default=5e-4,
help="Weight for L2 loss") help="Weight for L2 loss")
parser.add_argument("--ip", type=str, default='127.0.0.1', parser.add_argument("--ip", type=str, default='127.0.0.1:50051',
help="IP address of sampler receiver machine") help="IP address of sampler receiver machine")
parser.add_argument("--port", type=int, default=2049,
help="Listening port of sampler receiver machine")
parser.add_argument("--num-sender", type=int, default=1, parser.add_argument("--num-sender", type=int, default=1,
help="Number of sampler sender machine") help="Number of sampler sender machine")
args = parser.parse_args() args = parser.parse_args()
......
DGLBACKEND=mxnet python3 gcn_ns_sampler.py --ip 127.0.0.1 --port 2049 --num-sender=5 --dataset reddit-self-loop --num-neighbors 2 --batch-size 1000 --test-batch-size 500 DGLBACKEND=mxnet python3 gcn_ns_sampler.py --ip 127.0.0.1:2049 --num-sender=1 --dataset reddit-self-loop --num-neighbors 2 --batch-size 1000 --test-batch-size 500
DGLBACKEND=mxnet python3 gcn_trainer.py --ip 127.0.0.1 --port 2049 --num-sender=5 --dataset reddit-self-loop --num-neighbors 2 --batch-size 1000 --test-batch-size 500 --n-hidden 64 DGLBACKEND=mxnet python3 gcn_trainer.py --ip 127.0.0.1:2049 --num-sender=1 --dataset reddit-self-loop --num-neighbors 2 --batch-size 1000 --test-batch-size 500 --n-hidden 64
# This file contains DGL distributed samplers APIs. # This file contains DGL distributed samplers APIs.
from ...network import _send_subgraph, _recv_subgraph from ...network import _send_nodeflow, _recv_nodeflow
from ...network import _create_sender, _create_receiver from ...network import _create_sender, _create_receiver
from ...network import _finalize_sender, _finalize_receiver from ...network import _finalize_sender, _finalize_receiver
from ...network import _add_receiver_addr, _sender_connect, _receiver_wait
from multiprocessing import Pool from multiprocessing import Pool
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
...@@ -11,7 +12,8 @@ class SamplerPool(object): ...@@ -11,7 +12,8 @@ class SamplerPool(object):
should be implemented by users. SamplerPool will fork() N (N = num_worker) should be implemented by users. SamplerPool will fork() N (N = num_worker)
child processes, and each process will perform worker() method independently. child processes, and each process will perform worker() method independently.
Note that, the fork() API will use shared memory for N process and the OS will Note that, the fork() API will use shared memory for N process and the OS will
perfrom copy-on-write only when developers write that piece of memory. perfrom copy-on-write only when developers write that piece of memory. So fork N
processes and load N copy of graph will not increase the memory overhead.
Users can use this class like this: Users can use this class like this:
...@@ -36,7 +38,7 @@ class SamplerPool(object): ...@@ -36,7 +38,7 @@ class SamplerPool(object):
num_worker : int num_worker : int
number of worker (number of child process) number of worker (number of child process)
args : arguments args : arguments
arguments passed by user any arguments passed by user
""" """
p = Pool() p = Pool()
for i in range(num_worker): for i in range(num_worker):
...@@ -48,74 +50,84 @@ class SamplerPool(object): ...@@ -48,74 +50,84 @@ class SamplerPool(object):
@abstractmethod @abstractmethod
def worker(self, args): def worker(self, args):
"""User-defined function
Parameters
----------
args : arguments
any arguments passed by user
"""
pass pass
class SamplerSender(object): class SamplerSender(object):
"""Sender of DGL distributed sampler. """SamplerSender for DGL distributed training.
Users use SamplerSender class to send sampled Users use SamplerSender to send sampled subgraph (NodeFlow)
subgraph (NodeFlow) to remote trainer. Note that, SamplerSender to remote SamplerReceiver. Note that a SamplerSender can connect
class will try to connect to SamplerReceiver in a loop until the to multiple SamplerReceiver.
SamplerReceiver started.
Parameters Parameters
---------- ----------
ip : str namebook : dict
ip address of remote trainer machine address namebook of SamplerReceiver, where
port : int key is recevier's ID and value is receiver's address, e.g.,
port of remote trainer machine
{ 0:'168.12.23.45:50051',
1:'168.12.23.21:50051',
2:'168.12.46.12:50051' }
""" """
def __init__(self, ip, port): def __init__(self, namebook):
self._ip = ip assert len(namebook) > 0, 'namebook cannot be empty.'
self._port = port self._namebook = namebook
self._sender = _create_sender(ip, port) self._sender = _create_sender()
for ID, addr in self._namebook.items():
vec = addr.split(':')
_add_receiver_addr(self._sender, vec[0], int(vec[1]), ID)
_sender_connect(self._sender)
def __del__(self): def __del__(self):
"""Finalize Sender """Finalize Sender
""" """
# _finalize_sender will send a special message
# to tell the remote trainer machine that it has finished its job.
_finalize_sender(self._sender) _finalize_sender(self._sender)
def send(self, nodeflow): def send(self, nodeflow, recv_id):
"""Send sampled subgraph (NodeFlow) to remote trainer. """Send sampled subgraph (NodeFlow) to remote trainer.
Parameters Parameters
---------- ----------
nodeflow : NodeFlow nodeflow : NodeFlow
sampled NodeFlow object sampled NodeFlow object
recv_id : int
receiver ID
""" """
_send_subgraph(self._sender, nodeflow) _send_nodeflow(self._sender, nodeflow, recv_id)
class SamplerReceiver(object): class SamplerReceiver(object):
"""Receiver of DGL distributed sampler. """SamplerReceiver for DGL distributed training.
Users use SamplerReceiver class to receive sampled Users use SamplerReceiver to receive sampled subgraph (NodeFlow)
subgraph (NodeFlow) from remote samplers. Note that SamplerReceiver from remote SamplerSender. Note that SamplerReceiver can receive messages
can receive messages from multiple senders concurrently, by given from multiple SamplerSenders concurrently by given the num_sender parameter.
the num_sender parameter, and only when all senders connect to SamplerReceiver, Note that, only when all SamplerSenders connect to SamplerReceiver, receiver
the SamplerReceiver can start its job. can start its job.
Parameters Parameters
---------- ----------
ip : str addr : str
ip address of current trainer machine address of SamplerReceiver, e.g., '127.0.0.1:50051'
port : int
port of current trainer machine
num_sender : int num_sender : int
total number of sampler nodes, use 1 by default total number of SamplerSender
""" """
def __init__(self, ip, port, num_sender=1): def __init__(self, addr, num_sender):
self._ip = ip self._addr = addr
self._port = port
self._num_sender = num_sender self._num_sender = num_sender
self._receiver = _create_receiver(ip, port, num_sender) self._receiver = _create_receiver()
vec = self._addr.split(':')
_receiver_wait(self._receiver, vec[0], int(vec[1]), self._num_sender);
def __del__(self): def __del__(self):
"""Finalize Receiver """Finalize Receiver
_finalize_sampler_receiver method will clean up the
back-end threads started by the SamplerReceiver.
""" """
_finalize_receiver(self._receiver) _finalize_receiver(self._receiver)
...@@ -132,4 +144,4 @@ class SamplerReceiver(object): ...@@ -132,4 +144,4 @@ class SamplerReceiver(object):
NodeFlow NodeFlow
received NodeFlow object received NodeFlow object
""" """
return _recv_subgraph(self._receiver, graph) return _recv_nodeflow(self._receiver, graph)
...@@ -8,62 +8,105 @@ from . import utils ...@@ -8,62 +8,105 @@ from . import utils
_init_api("dgl.network") _init_api("dgl.network")
def _create_sender(ip_addr, port): def _create_sender():
"""Create a sender communicator via C socket. """Create a Sender communicator via C api
"""
return _CAPI_DGLSenderCreate()
def _finalize_sender(sender):
"""Finalize Sender communicator
Parameters Parameters
---------- ----------
ip_addr : str sender : ctypes.c_void_p
ip address of remote trainer C Sender handle
port : int
port of remote trainer
""" """
return _CAPI_DGLSenderCreate(ip_addr, port) _CAPI_DGLFinalizeSender(sender)
def _create_receiver(ip_addr, port, num_sender): def _add_receiver_addr(sender, ip_addr, port, recv_id):
"""Create a receiver communicator via C socket. """Add Receiver IP address to namebook
Parameters Parameters
---------- ----------
sender : ctypes.c_void_p
C Sender handle
ip_addr : str ip_addr : str
ip address of remote trainer IP address of Receiver
port : int port : int
listen port of remote trainer listen of Receiver
num_sender : int recv_id : int
total number of sampler nodes Receiver ID
"""
_CAPI_DGLSenderAddReceiver(sender, ip_addr, port, recv_id)
def _sender_connect(sender):
"""Connect to all the Receiver
Parameters
----------
sender : ctypes.c_void_p
C Sender handle
""" """
return _CAPI_DGLReceiverCreate(ip_addr, port, num_sender) _CAPI_DGLSenderConnect(sender)
def _send_subgraph(sender, nodeflow): def _send_nodeflow(sender, nodeflow, recv_id):
"""Send sampled subgraph (Nodeflow) to remote trainer. """Send sampled subgraph (Nodeflow) to remote Receiver.
Parameters Parameters
---------- ----------
sender : ctypes.c_void_p sender : ctypes.c_void_p
C sender handle C Sender handle
nodeflow : NodeFlow nodeflow : NodeFlow
NodeFlow object NodeFlow object
recv_id : int
Receiver ID
""" """
graph_handle = nodeflow._graph._handle graph_handle = nodeflow._graph._handle
node_mapping = nodeflow._node_mapping.todgltensor() node_mapping = nodeflow._node_mapping.todgltensor()
edge_mapping = nodeflow._edge_mapping.todgltensor() edge_mapping = nodeflow._edge_mapping.todgltensor()
# Can we convert NDArray to tensor directly, instead of using toindex()?
layers_offsets = utils.toindex(nodeflow._layer_offsets).todgltensor() layers_offsets = utils.toindex(nodeflow._layer_offsets).todgltensor()
flows_offsets = utils.toindex(nodeflow._block_offsets).todgltensor() flows_offsets = utils.toindex(nodeflow._block_offsets).todgltensor()
_CAPI_SenderSendSubgraph(sender, _CAPI_SenderSendSubgraph(sender,
recv_id,
graph_handle, graph_handle,
node_mapping, node_mapping,
edge_mapping, edge_mapping,
layers_offsets, layers_offsets,
flows_offsets) flows_offsets)
def _recv_subgraph(receiver, graph): def _create_receiver():
"""Create a Receiver communicator via C api
"""
return _CAPI_DGLReceiverCreate()
def _finalize_receiver(receiver):
"""Finalize Receiver Communicator
"""
_CAPI_DGLFinalizeReceiver(receiver)
def _receiver_wait(receiver, ip_addr, port, num_sender):
"""Wait all Sender to connect..
Parameters
----------
receiver : ctypes.c_void_p
C Receiver handle
ip_addr : str
IP address of Receiver
port : int
port of Receiver
num_sender : int
total number of Sender
"""
_CAPI_DGLReceiverWait(receiver, ip_addr, port, num_sender)
def _recv_nodeflow(receiver, graph):
"""Receive sampled subgraph (NodeFlow) from remote sampler. """Receive sampled subgraph (NodeFlow) from remote sampler.
Parameters Parameters
---------- ----------
receiver : ctypes.c_void_p receiver : ctypes.c_void_p
C receiver handle C Receiver handle
graph : DGLGraph graph : DGLGraph
The parent graph The parent graph
...@@ -75,23 +118,3 @@ def _recv_subgraph(receiver, graph): ...@@ -75,23 +118,3 @@ def _recv_subgraph(receiver, graph):
# hdl is a list of ptr # hdl is a list of ptr
hdl = unwrap_to_ptr_list(_CAPI_ReceiverRecvSubgraph(receiver)) hdl = unwrap_to_ptr_list(_CAPI_ReceiverRecvSubgraph(receiver))
return NodeFlow(graph, hdl[0]) return NodeFlow(graph, hdl[0])
def _finalize_sender(sender):
"""Finalize Sender communicator
Parameters
----------
sender : ctypes.c_void_p
C sender handle
"""
_CAPI_DGLFinalizeCommunicator(sender)
def _finalize_receiver(receiver):
"""Finalize Receiver communicator
Parameters
----------
receiver : ctypes.c_void_p
C receiver handle
"""
_CAPI_DGLFinalizeCommunicator(receiver)
...@@ -20,55 +20,63 @@ using dgl::runtime::NDArray; ...@@ -20,55 +20,63 @@ using dgl::runtime::NDArray;
namespace dgl { namespace dgl {
namespace network { namespace network {
static char* SEND_BUFFER = nullptr;
static char* RECV_BUFFER = nullptr;
DGL_REGISTER_GLOBAL("network._CAPI_DGLSenderCreate") DGL_REGISTER_GLOBAL("network._CAPI_DGLSenderCreate")
.set_body([] (DGLArgs args, DGLRetValue* rv) { .set_body([] (DGLArgs args, DGLRetValue* rv) {
std::string ip = args[0];
int port = args[1];
network::Communicator* comm = new network::SocketCommunicator();
if (comm->Initialize(IS_SENDER, ip.c_str(), port) == false) {
LOG(FATAL) << "Initialize network communicator (sender) error.";
}
try { try {
comm->SetBuffer(new char[kMaxBufferSize]); SEND_BUFFER = new char[kMaxBufferSize];
} catch (const std::bad_alloc&) { } catch (const std::bad_alloc&) {
LOG(FATAL) << "Not enough memory for sender buffer: " << kMaxBufferSize; LOG(FATAL) << "Not enough memory for sender buffer: " << kMaxBufferSize;
} }
CommunicatorHandle chandle = static_cast<CommunicatorHandle>(comm); network::Sender* sender = new network::SocketSender();
CommunicatorHandle chandle = static_cast<CommunicatorHandle>(sender);
*rv = chandle; *rv = chandle;
}); });
DGL_REGISTER_GLOBAL("network._CAPI_DGLReceiverCreate") DGL_REGISTER_GLOBAL("network._CAPI_DGLFinalizeSender")
.set_body([] (DGLArgs args, DGLRetValue* rv) { .set_body([] (DGLArgs args, DGLRetValue* rv) {
std::string ip = args[0]; CommunicatorHandle chandle = args[0];
int port = args[1]; network::Sender* sender = static_cast<network::Sender*>(chandle);
int num_sender = args[2]; sender->Finalize();
network::Communicator* comm = new network::SocketCommunicator(); delete [] SEND_BUFFER;
if (comm->Initialize(IS_RECEIVER, ip.c_str(), port, num_sender, kQueueSize) == false) { });
LOG(FATAL) << "Initialize network communicator (receiver) error.";
} DGL_REGISTER_GLOBAL("network._CAPI_DGLSenderAddReceiver")
try { .set_body([] (DGLArgs args, DGLRetValue* rv) {
comm->SetBuffer(new char[kMaxBufferSize]); CommunicatorHandle chandle = args[0];
} catch (const std::bad_alloc&) { std::string ip = args[1];
LOG(FATAL) << "Not enough memory for receiver buffer: " << kMaxBufferSize; int port = args[2];
int recv_id = args[3];
network::Sender* sender = static_cast<network::Sender*>(chandle);
sender->AddReceiver(ip.c_str(), port, recv_id);
});
DGL_REGISTER_GLOBAL("network._CAPI_DGLSenderConnect")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
CommunicatorHandle chandle = args[0];
network::Sender* sender = static_cast<network::Sender*>(chandle);
if (sender->Connect() == false) {
LOG(FATAL) << "Sender connection failed.";
} }
CommunicatorHandle chandle = static_cast<CommunicatorHandle>(comm);
*rv = chandle;
}); });
DGL_REGISTER_GLOBAL("network._CAPI_SenderSendSubgraph") DGL_REGISTER_GLOBAL("network._CAPI_SenderSendSubgraph")
.set_body([] (DGLArgs args, DGLRetValue* rv) { .set_body([] (DGLArgs args, DGLRetValue* rv) {
CommunicatorHandle chandle = args[0]; CommunicatorHandle chandle = args[0];
GraphHandle ghandle = args[1]; int recv_id = args[1];
const IdArray node_mapping = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[2])); GraphHandle ghandle = args[2];
const IdArray edge_mapping = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[3])); const IdArray node_mapping = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[3]));
const IdArray layer_offsets = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[4])); const IdArray edge_mapping = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[4]));
const IdArray flow_offsets = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[5])); const IdArray layer_offsets = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[5]));
const IdArray flow_offsets = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[6]));
ImmutableGraph *ptr = static_cast<ImmutableGraph*>(ghandle); ImmutableGraph *ptr = static_cast<ImmutableGraph*>(ghandle);
network::Communicator* comm = static_cast<network::Communicator*>(chandle); network::Sender* sender = static_cast<network::Sender*>(chandle);
auto csr = ptr->GetInCSR(); auto csr = ptr->GetInCSR();
// Serialize nodeflow to data buffer // Serialize nodeflow to data buffer
int64_t data_size = network::SerializeSampledSubgraph( int64_t data_size = network::SerializeSampledSubgraph(
comm->GetBuffer(), SEND_BUFFER,
csr, csr,
node_mapping, node_mapping,
edge_mapping, edge_mapping,
...@@ -76,25 +84,55 @@ DGL_REGISTER_GLOBAL("network._CAPI_SenderSendSubgraph") ...@@ -76,25 +84,55 @@ DGL_REGISTER_GLOBAL("network._CAPI_SenderSendSubgraph")
flow_offsets); flow_offsets);
CHECK_GT(data_size, 0); CHECK_GT(data_size, 0);
// Send msg via network // Send msg via network
int64_t size = comm->Send(comm->GetBuffer(), data_size); int64_t size = sender->Send(SEND_BUFFER, data_size, recv_id);
if (size <= 0) { if (size <= 0) {
LOG(ERROR) << "Send message error (size: " << size << ")"; LOG(FATAL) << "Send message error (size: " << size << ")";
}
});
DGL_REGISTER_GLOBAL("network._CAPI_DGLReceiverCreate")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
try {
RECV_BUFFER = new char[kMaxBufferSize];
} catch (const std::bad_alloc&) {
LOG(FATAL) << "Not enough memory for receiver buffer: " << kMaxBufferSize;
} }
network::Receiver* receiver = new network::SocketReceiver();
CommunicatorHandle chandle = static_cast<CommunicatorHandle>(receiver);
*rv = chandle;
});
DGL_REGISTER_GLOBAL("network._CAPI_DGLFinalizeReceiver")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
CommunicatorHandle chandle = args[0];
network::Receiver* receiver = static_cast<network::SocketReceiver*>(chandle);
receiver->Finalize();
delete [] RECV_BUFFER;
});
DGL_REGISTER_GLOBAL("network._CAPI_DGLReceiverWait")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
CommunicatorHandle chandle = args[0];
std::string ip = args[1];
int port = args[2];
int num_sender = args[3];
network::Receiver* receiver = static_cast<network::SocketReceiver*>(chandle);
receiver->Wait(ip.c_str(), port, num_sender, kQueueSize);
}); });
DGL_REGISTER_GLOBAL("network._CAPI_ReceiverRecvSubgraph") DGL_REGISTER_GLOBAL("network._CAPI_ReceiverRecvSubgraph")
.set_body([] (DGLArgs args, DGLRetValue* rv) { .set_body([] (DGLArgs args, DGLRetValue* rv) {
CommunicatorHandle chandle = args[0]; CommunicatorHandle chandle = args[0];
network::Communicator* comm = static_cast<network::Communicator*>(chandle); network::Receiver* receiver = static_cast<network::SocketReceiver*>(chandle);
// Recv data from network // Recv data from network
int64_t size = comm->Receive(comm->GetBuffer(), kMaxBufferSize); int64_t size = receiver->Recv(RECV_BUFFER, kMaxBufferSize);
if (size <= 0) { if (size <= 0) {
LOG(ERROR) << "Receive error: (size: " << size << ")"; LOG(FATAL) << "Receive error: (size: " << size << ")";
} }
NodeFlow* nf = new NodeFlow(); NodeFlow* nf = new NodeFlow();
ImmutableGraph::CSR::Ptr csr; ImmutableGraph::CSR::Ptr csr;
// Deserialize nodeflow from recv_data_buffer // Deserialize nodeflow from recv_data_buffer
network::DeserializeSampledSubgraph(comm->GetBuffer(), network::DeserializeSampledSubgraph(RECV_BUFFER,
&(csr), &(csr),
&(nf->node_mapping), &(nf->node_mapping),
&(nf->edge_mapping), &(nf->edge_mapping),
...@@ -106,12 +144,5 @@ DGL_REGISTER_GLOBAL("network._CAPI_ReceiverRecvSubgraph") ...@@ -106,12 +144,5 @@ DGL_REGISTER_GLOBAL("network._CAPI_ReceiverRecvSubgraph")
*rv = WrapVectorReturn(subgs); *rv = WrapVectorReturn(subgs);
}); });
DGL_REGISTER_GLOBAL("network._CAPI_DGLFinalizeCommunicator")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
CommunicatorHandle chandle = args[0];
network::Communicator* comm = static_cast<network::Communicator*>(chandle);
comm->Finalize();
});
} // namespace network } // namespace network
} // namespace dgl } // namespace dgl
...@@ -12,71 +12,84 @@ namespace dgl { ...@@ -12,71 +12,84 @@ namespace dgl {
namespace network { namespace network {
/*! /*!
* \brief Communicator for DGL distributed training. * \brief Network Sender for DGL distributed training.
* *
* Communicator is a set of interface for network communication, which * Sender is an abstract class that defines a set of APIs for sending
* can be implemented by real network libraries, such as grpc, mpi, as well * binary data over network. It can be implemented by different underlying
* as raw socket. There has two types of Communicator, one is Sender * networking libraries such TCP socket and ZMQ. One Sender can connect to
* (is_sender = true), and another is Receiver. For Sender, it can send binary * multiple receivers, and it can send data to specified receiver via receiver's ID.
* data to remote Receiver node. For Receiver, it can listen on a specified
* endpoint and receive the binary data sent from Sender node. Note that, a
* receiver node can recv messages from multiple senders concurrently.
*/ */
class Communicator { class Sender {
public: public:
virtual ~Communicator() {} virtual ~Sender() {}
/*! /*!
* \brief Initialize Communicator * \brief Add receiver address and it's ID to the namebook
* \param is_sender true for sender and false for receiver * \param ip receviver's IP address
* \param ip ip address * \param port receiver's port
* \param port end port * \param id receiver's ID
* (e.g. "168.123.2.43:50051"). For Receiver, this address identifies
* the local listening endpoint (e.g. "0.0.0.0:50051").
* \param num_sender number of senders, only used for receiver.
* \param queue_size the size of message queue, only for receiver.
* \return true for success and false for error
*/ */
virtual bool Initialize(bool is_sender, virtual void AddReceiver(const char* ip, int port, int id) = 0;
const char* ip,
int port,
int num_sender = 1,
int64_t queue_size = 5 * 1024 * 1024) = 0;
/*! /*!
* \brief Send message to receiver node * \brief Connect with all the Receivers
* \param src data pointer * \return True for sucess and False for fail
* \param size data size
* \return bytes send
* > 0 : bytes send
* - 1 : error
*/ */
virtual int64_t Send(char* src, int64_t size) = 0; virtual bool Connect() = 0;
/*! /*!
* \brief Receive mesesage from sender node, we * \brief Send data to specified Receiver
* actually reading data from local message queue. * \param data data buffer for sending
* \param dest destination data pointer * \param size data size for sending
* \param max_size maximal data size * \param recv_id receiver's ID
* \return bytes received * \return bytes we sent
* > 0 : bytes received * > 0 : bytes we sent
* - 1 : error * - 1 : error
*/ */
virtual int64_t Receive(char* dest, int64_t max_size) = 0; virtual int64_t Send(const char* data, int64_t size, int recv_id) = 0;
/*! /*!
* \brief Finalize the Communicator class * \brief Finalize Sender
*/ */
virtual void Finalize() = 0; virtual void Finalize() = 0;
};
/*!
* \brief Network Receiver for DGL distributed training.
*
* Receiver is an abstract class that defines a set of APIs for receiving binary
* data over network. It can be implemented by different underlying networking libraries
* such TCP socket and ZMQ. One Receiver can connect with multiple Senders, and it can receive
* data from these Senders concurrently via multi-threading and message queue.
*/
class Receiver {
public:
virtual ~Receiver() {}
/*! /*!
* \brief Set pointer of memory buffer allocated for Communicator * \brief Wait all of the Senders to connect
* \param ip Receiver's IP address
* \param port Receiver's port
* \param num_sender total number of Senders
* \param queue_size size of message queue
* \return True for sucess and False for fail
*/ */
virtual void SetBuffer(char* buffer) = 0; virtual bool Wait(const char* ip, int port, int num_sender, int queue_size) = 0;
/*! /*!
* \brief Get pointer of memory buffer allocated for Communicator * \brief Recv data from Sender (copy data from message queue)
* \param dest data buffer of destination
* \param max_size maximul size of data buffer
* \return bytes we received
* > 0 : bytes we received
* - 1 : error
*/ */
virtual char* GetBuffer() = 0; virtual int64_t Recv(char* dest, int64_t max_size) = 0;
/*!
* \brief Finalize Receiver
*/
virtual void Finalize() = 0;
}; };
} // namespace network } // namespace network
......
...@@ -21,72 +21,105 @@ namespace network { ...@@ -21,72 +21,105 @@ namespace network {
const int kTimeOut = 10; // 10 minutes for socket timeout const int kTimeOut = 10; // 10 minutes for socket timeout
const int kMaxConnection = 1024; // 1024 maximal socket connection const int kMaxConnection = 1024; // 1024 maximal socket connection
bool SocketCommunicator::Initialize(bool is_sender, void SocketSender::AddReceiver(const char* ip, int port, int recv_id) {
const char* ip, dgl::network::Addr addr;
int port, addr.ip_.assign(const_cast<char*>(ip));
int num_sender, addr.port_ = port;
int64_t queue_size) { receiver_addr_map_[recv_id] = addr;
if (is_sender) {
is_sender_ = true;
return InitSender(ip, port);
} else {
is_sender_ = false;
return InitReceiver(ip, port, num_sender, queue_size);
}
} }
bool SocketCommunicator::InitSender(const char* ip, int port) { bool SocketSender::Connect() {
// Sender only has a client socket // Create N sockets for Receiver
socket_.resize(1); for (const auto& r : receiver_addr_map_) {
socket_[0] = new TCPSocket(); int ID = r.first;
TCPSocket* client = socket_[0]; socket_map_[ID] = new TCPSocket();
bool bo = false; TCPSocket* client = socket_map_[ID];
int try_count = 0; bool bo = false;
// Connect to server int try_count = 0;
while (bo == false && try_count < kMaxTryCount) { const char* ip = r.second.ip_.c_str();
if (client->Connect(ip, port)) { int port = r.second.port_;
LOG(INFO) << "Connected to " << ip << ":" << port; while (bo == false && try_count < kMaxTryCount) {
return true; if (client->Connect(ip, port)) {
} else { LOG(INFO) << "Connected to Receiver: " << ip << ":" << port;
LOG(ERROR) << "Cannot connect to " << ip << ":" << port bo = true;
<< ", try again ..."; } else {
bo = false; LOG(ERROR) << "Cannot connect to Receiver: " << ip << ":" << port
try_count++; << ", try again ...";
bo = false;
try_count++;
#ifdef _WIN32 #ifdef _WIN32
Sleep(1); Sleep(1);
#else // !_WIN32 #else // !_WIN32
sleep(1); sleep(1);
#endif // _WIN32 #endif // _WIN32
}
}
if (bo == false) {
return bo;
}
}
return true;
}
int64_t SocketSender::Send(const char* data, int64_t size, int recv_id) {
TCPSocket* client = socket_map_[recv_id];
// First sent the size of data
int64_t sent_bytes = 0;
while (static_cast<size_t>(sent_bytes) < sizeof(int64_t)) {
int64_t max_len = sizeof(int64_t) - sent_bytes;
int64_t tmp = client->Send(
reinterpret_cast<char*>(&size)+sent_bytes,
max_len);
sent_bytes += tmp;
}
// Then send the data
sent_bytes = 0;
while (sent_bytes < size) {
int64_t max_len = size - sent_bytes;
int64_t tmp = client->Send(data+sent_bytes, max_len);
sent_bytes += tmp;
}
return size + sizeof(int64_t);
}
void SocketSender::Finalize() {
// Close all sockets
for (const auto& socket : socket_map_) {
TCPSocket* client = socket.second;
if (client != nullptr) {
client->Close();
delete client;
client = nullptr;
} }
} }
return false;
} }
bool SocketCommunicator::InitReceiver(const char* ip, bool SocketReceiver::Wait(const char* ip,
int port, int port,
int num_sender, int num_sender,
int64_t queue_size) { int queue_size) {
CHECK_GE(num_sender, 1); CHECK_GE(num_sender, 1);
CHECK_GT(queue_size, 0); CHECK_GT(queue_size, 0);
// Init message queue // Initialize message queue
num_sender_ = num_sender; num_sender_ = num_sender;
queue_size_ = queue_size; queue_size_ = queue_size;
queue_ = new MessageQueue(queue_size_, num_sender_); queue_ = new MessageQueue(queue_size_, num_sender_);
// Init socket, and socket_[0] is the server socket // Initialize socket, and socket_[0] is server socket
socket_.resize(num_sender+1); socket_.resize(num_sender_+1);
thread_.resize(num_sender); thread_.resize(num_sender_);
socket_[0] = new TCPSocket(); socket_[0] = new TCPSocket();
TCPSocket* server = socket_[0]; TCPSocket* server = socket_[0];
server->SetTimeout(kTimeOut * 60 * 1000); // millsec server->SetTimeout(kTimeOut * 60 * 1000); // millsec
// Bind socket // Bind socket
if (server->Bind(ip, port) == false) { if (server->Bind(ip, port) == false) {
LOG(ERROR) << "Cannot bind to " << ip << ":" << port; LOG(FATAL) << "Cannot bind to " << ip << ":" << port;
return false; return false;
} }
LOG(INFO) << "Bind to " << ip << ":" << port; LOG(INFO) << "Bind to " << ip << ":" << port;
// Listen // Listen
if (server->Listen(kMaxConnection) == false) { if (server->Listen(kMaxConnection) == false) {
LOG(ERROR) << "Cannot listen on " << ip << ":" << port; LOG(FATAL) << "Cannot listen on " << ip << ":" << port;
return false; return false;
} }
LOG(INFO) << "Listen on " << ip << ":" << port << ", wait sender connect ..."; LOG(INFO) << "Listen on " << ip << ":" << port << ", wait sender connect ...";
...@@ -96,18 +129,18 @@ bool SocketCommunicator::InitReceiver(const char* ip, ...@@ -96,18 +129,18 @@ bool SocketCommunicator::InitReceiver(const char* ip,
for (int i = 1; i <= num_sender_; ++i) { for (int i = 1; i <= num_sender_; ++i) {
socket_[i] = new TCPSocket(); socket_[i] = new TCPSocket();
if (server->Accept(socket_[i], &accept_ip, &accept_port) == false) { if (server->Accept(socket_[i], &accept_ip, &accept_port) == false) {
LOG(ERROR) << "Error on accept socket."; LOG(FATAL) << "Error on accept socket.";
return false; return false;
} }
// new thread for the socket // create new thread for each socket
thread_[i-1] = new std::thread(MsgHandler, socket_[i], queue_); thread_[i-1] = new std::thread(MsgHandler, socket_[i], queue_, i-1);
LOG(INFO) << "Accept new sender: " << accept_ip << ":" << accept_port; LOG(INFO) << "Accept new sender: " << accept_ip << ":" << accept_port;
} }
return true; return true;
} }
void SocketCommunicator::MsgHandler(TCPSocket* socket, MessageQueue* queue) { void SocketReceiver::MsgHandler(TCPSocket* socket, MessageQueue* queue, int id) {
char* buffer = new char[kMaxBufferSize]; char* buffer = new char[kMaxBufferSize];
for (;;) { for (;;) {
// First recv the size // First recv the size
...@@ -120,8 +153,10 @@ void SocketCommunicator::MsgHandler(TCPSocket* socket, MessageQueue* queue) { ...@@ -120,8 +153,10 @@ void SocketCommunicator::MsgHandler(TCPSocket* socket, MessageQueue* queue) {
max_len); max_len);
received_bytes += tmp; received_bytes += tmp;
} }
// Data_size ==-99 is a special signal to tell
// the MsgHandler to exit the loop
if (data_size <= 0) { if (data_size <= 0) {
LOG(INFO) << "Socket finish job"; queue->Signal(id);
break; break;
} }
// Then recv the data // Then recv the data
...@@ -136,39 +171,19 @@ void SocketCommunicator::MsgHandler(TCPSocket* socket, MessageQueue* queue) { ...@@ -136,39 +171,19 @@ void SocketCommunicator::MsgHandler(TCPSocket* socket, MessageQueue* queue) {
delete [] buffer; delete [] buffer;
} }
void SocketCommunicator::Finalize() { int64_t SocketReceiver::Recv(char* dest, int64_t max_size) {
if (is_sender_) { // Get message from message queue
FinalizeSender(); return queue_->Remove(dest, max_size);
} else {
FinalizeReceiver();
}
}
void SocketCommunicator::FinalizeSender() {
// We send a size = -1 signal to notify
// receiver to finish its job
if (socket_[0] != nullptr) {
int64_t size = -1;
int64_t sent_bytes = 0;
while (static_cast<size_t>(sent_bytes) < sizeof(int64_t)) {
int64_t max_len = sizeof(int64_t) - sent_bytes;
int64_t tmp = socket_[0]->Send(
reinterpret_cast<char*>(&size)+sent_bytes,
max_len);
sent_bytes += tmp;
}
socket_[0]->Close();
LOG(INFO) << "Close sender socket.";
delete socket_[0];
socket_[0] = nullptr;
}
if (buffer_ != nullptr) {
delete [] buffer_;
}
} }
void SocketCommunicator::FinalizeReceiver() { void SocketReceiver::Finalize() {
for (int i = 0; i <= num_sender_; ++i) { for (int i = 0; i <= num_sender_; ++i) {
if (i != 0) { // write -99 signal to exit loop
int64_t data_size = -99;
queue_->Add(
reinterpret_cast<char*>(&data_size),
sizeof(int64_t));
}
if (socket_[i] != nullptr) { if (socket_[i] != nullptr) {
socket_[i]->Close(); socket_[i]->Close();
delete socket_[i]; delete socket_[i];
...@@ -177,50 +192,5 @@ void SocketCommunicator::FinalizeReceiver() { ...@@ -177,50 +192,5 @@ void SocketCommunicator::FinalizeReceiver() {
} }
} }
int64_t SocketCommunicator::Send(char* src, int64_t size) {
if (!is_sender_) {
LOG(ERROR) << "Receiver cannot invoke send() API.";
return -1;
}
TCPSocket* client = socket_[0];
// First sent the size of data
int64_t sent_bytes = 0;
while (static_cast<size_t>(sent_bytes) < sizeof(int64_t)) {
int64_t max_len = sizeof(int64_t) - sent_bytes;
int64_t tmp = client->Send(
reinterpret_cast<char*>(&size)+sent_bytes,
max_len);
sent_bytes += tmp;
}
// Then send the data
sent_bytes = 0;
while (sent_bytes < size) {
int64_t max_len = size - sent_bytes;
int64_t tmp = client->Send(src+sent_bytes, max_len);
sent_bytes += tmp;
}
return size + sizeof(int64_t);
}
int64_t SocketCommunicator::Receive(char* dest, int64_t max_size) {
if (is_sender_) {
LOG(ERROR) << "Sender cannot invoke Receive() API.";
return -1;
}
// Get message from the message queue
return queue_->Remove(dest, max_size);
}
void SocketCommunicator::SetBuffer(char* buffer) {
// Set memory buffer allocated for current Communicator
buffer_ = buffer;
}
char* SocketCommunicator::GetBuffer() {
// Get memory buffer allocated for current Communicator
return buffer_;
}
} // namespace network } // namespace network
} // namespace dgl } // namespace dgl
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
#include <thread> #include <thread>
#include <vector> #include <vector>
#include <string> #include <string>
#include <unordered_map>
#include "communicator.h" #include "communicator.h"
#include "msg_queue.h" #include "msg_queue.h"
...@@ -19,70 +20,105 @@ namespace network { ...@@ -19,70 +20,105 @@ namespace network {
using dgl::network::MessageQueue; using dgl::network::MessageQueue;
using dgl::network::TCPSocket; using dgl::network::TCPSocket;
using dgl::network::Sender;
using dgl::network::Receiver;
/*! /*!
* \brief Implementation of Communicator class with TCP socket. * \breif Networking address
*/ */
class SocketCommunicator : public Communicator { struct Addr {
std::string ip_;
int port_;
};
/*!
* \brief Network Sender for DGL distributed training.
*
* Sender is an abstract class that defines a set of APIs for sending
* binary data over network. It can be implemented by different underlying
* networking libraries such TCP socket and ZMQ. One Sender can connect to
* multiple receivers, and it can send data to specified receiver via receiver's ID.
*/
class SocketSender : public Sender {
public: public:
/*! /*!
* \brief Initialize Communicator * \brief Add receiver address and it's ID to the namebook
* \param is_sender true for sender and false for receiver * \param ip receviver's IP address
* \param ip ip address * \param port receiver's port
* \param port end port * \param id receiver's ID
* (e.g. "168.123.2.43:50051"). For Receiver, this address identifies
* the local listening endpoint (e.g. "0.0.0.0:50051").
* \param num_sender number of senders, only used for receiver.
* \param queue_size the size of message queue, only for receiver.
* \return true for success and false for error
*/ */
bool Initialize(bool is_sender, void AddReceiver(const char* ip, int port, int recv_id);
const char* ip,
int port,
int num_sender = 1,
int64_t queue_size = 5 * 1024 * 1024);
/*! /*!
* \brief Send message to receiver node * \brief Connect with all the Receivers
* \param src data pointer * \return True for sucess and False for fail
* \param size data size
* \return bytes send
* > 0 : bytes send
* - 1 : error
*/ */
int64_t Send(char* src, int64_t size); bool Connect();
/*! /*!
* \brief Receive mesesage from sender node, we * \brief Send data to specified Receiver
* actually reading data from local message queue. * \param data data buffer for sending
* \param dest destination data pointer * \param size data size for sending
* \param max_size maximal data size * \param recv_id receiver's ID
* \return bytes received * \return bytes we sent
* > 0 : bytes received * > 0 : bytes we sent
* - 1 : error * - 1 : error
*/ */
int64_t Receive(char* dest, int64_t max_size); int64_t Send(const char* data, int64_t size, int recv_id);
/*! /*!
* \brief Finalize the SocketCommunicator class * \brief Finalize Sender
*/ */
void Finalize(); void Finalize();
private:
/*!
* \brief socket map
*/
std::unordered_map<int, TCPSocket*> socket_map_;
/*! /*!
* \brief Set pointer of memory buffer allocated for Communicator * \brief receiver address map
*/
std::unordered_map<int, Addr> receiver_addr_map_;
};
/*!
* \brief Network Receiver for DGL distributed training.
*
* Receiver is an abstract class that defines a set of APIs for receiving binary
* data over network. It can be implemented by different underlying networking libraries
* such TCP socket and ZMQ. One Receiver can connect with multiple Senders, and it can receive
* data from these Senders concurrently via multi-threading and message queue.
*/
class SocketReceiver : public Receiver {
public:
/*!
* \brief Wait all of the Senders to connect
* \param ip Receiver's IP address
* \param port Receiver's port
* \param num_sender total number of Senders
* \param queue_size size of message queue
* \return True for sucess and False for fail
*/ */
void SetBuffer(char* buffer); bool Wait(const char* ip, int port, int num_sender, int queue_size);
/*! /*!
* \brief Get pointer of memory buffer allocated for Communicator * \brief Recv data from Sender (copy data from message queue)
* \param dest data buffer of destination
* \param max_size maximul size of data buffer
* \return bytes we received
* > 0 : bytes we received
* - 1 : error
*/ */
char* GetBuffer(); int64_t Recv(char* dest, int64_t max_size);
private:
/*! /*!
* \brief Is a sender or reciever node? * \brief Finalize Receiver
*/ */
bool is_sender_; void Finalize();
private:
/*! /*!
* \brief number of sender * \brief number of sender
*/ */
...@@ -108,48 +144,13 @@ class SocketCommunicator : public Communicator { ...@@ -108,48 +144,13 @@ class SocketCommunicator : public Communicator {
*/ */
MessageQueue* queue_; MessageQueue* queue_;
/*!
* \brief Memory buffer for communicator
*/
char* buffer_ = nullptr;
/*!
* \brief Initalize sender node
* \param ip receiver ip address
* \param port receiver port
* \return true for success and false for error
*/
bool InitSender(const char* ip, int port);
/*!
* \brief Initialize receiver node
* \param ip receiver ip address
* \param port receiver port
* \param num_sender number of sender
* \param queue_size size of message queue
* \return true for success and false for error
*/
bool InitReceiver(const char* ip,
int port,
int num_sender,
int64_t queue_size);
/*!
* \brief Finalize sender node
*/
void FinalizeSender();
/*!
* \brief Finalize receiver node
*/
void FinalizeReceiver();
/*! /*!
* \brief Process received message in independent threads * \brief Process received message in independent threads
* \param socket new accpeted socket * \param socket new accpeted socket
* \param queue message queue * \param queue message queue
* \param id producer_id
*/ */
static void MsgHandler(TCPSocket* socket, MessageQueue* queue); static void MsgHandler(TCPSocket* socket, MessageQueue* queue, int id);
}; };
} // namespace network } // namespace network
......
...@@ -13,7 +13,7 @@ def generate_rand_graph(n): ...@@ -13,7 +13,7 @@ def generate_rand_graph(n):
def start_trainer(): def start_trainer():
g = generate_rand_graph(100) g = generate_rand_graph(100)
recv = dgl.contrib.sampling.SamplerReceiver(ip="127.0.0.1", port=50051) recv = dgl.contrib.sampling.SamplerReceiver(addr='127.0.0.1:50051', num_sender=1)
subg = recv.recv(g) subg = recv.recv(g)
seed_ids = subg.layer_parent_nid(-1) seed_ids = subg.layer_parent_nid(-1)
assert len(seed_ids) == 1 assert len(seed_ids) == 1
...@@ -28,18 +28,15 @@ def start_trainer(): ...@@ -28,18 +28,15 @@ def start_trainer():
src1 = subg.map_to_parent_nid(child_src) src1 = subg.map_to_parent_nid(child_src)
assert F.array_equal(src1, src) assert F.array_equal(src1, src)
time.sleep(3) # wait all senders to finalize their jobs
def start_sampler(): def start_sampler():
g = generate_rand_graph(100) g = generate_rand_graph(100)
sender = dgl.contrib.sampling.SamplerSender(ip="127.0.0.1", port=50051) namebook = { 0:'127.0.0.1:50051' }
sender = dgl.contrib.sampling.SamplerSender(namebook)
for i, subg in enumerate(dgl.contrib.sampling.NeighborSampler( for i, subg in enumerate(dgl.contrib.sampling.NeighborSampler(
g, 1, 100, neighbor_type='in', num_workers=4)): g, 1, 100, neighbor_type='in', num_workers=4)):
sender.send(subg) sender.send(subg, 0)
break break
time.sleep(1)
if __name__ == '__main__': if __name__ == '__main__':
pid = os.fork() pid = os.fork()
if pid == 0: if pid == 0:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment