"docs/git@developer.sourcefind.cn:OpenDAS/dgl.git" did not exist on "bb1f8850e5832309e910e967871cf099f83b37e9"
Unverified Commit a3ce780d authored by Jinjing Zhou's avatar Jinjing Zhou Committed by GitHub
Browse files

[RPC] Use tensorpipe for rpc communication (#3335)

* doesn't know whether works

* add change

* fix

* fix

* fix

* remove

* revert

* lint

* lint

* fix

* revert

* lint

* fix

* only build rpc on linux

* lint

* lint

* fix build on windows

* fix windows

* remove old test

* fix cmake

* Revert "remove old test"

This reverts commit f1ea75c777c34cdc1f08c0589676ba6aee1feb29.

* fix windows

* fix

* fix

* fix indent

* fix indent

* address comment

* fix

* fix

* fix

* fix

* fix

* lint

* fix indent

* fix lint

* add introduction

* fix

* lint

* lint

* add more logs

* fix

* update xbyak for C++14 with gcc5

* Remove channels

* fix

* add test script

* fix

* remove unused file

* fix lint

* add timeout
parent 987db374
...@@ -32,6 +32,9 @@ ...@@ -32,6 +32,9 @@
[submodule "third_party/libxsmm"] [submodule "third_party/libxsmm"]
path = third_party/libxsmm path = third_party/libxsmm
url = https://github.com/hfp/libxsmm.git url = https://github.com/hfp/libxsmm.git
[submodule "third_party/tensorpipe"]
path = third_party/tensorpipe
url = https://github.com/pytorch/tensorpipe
[submodule "third_party/thrust"] [submodule "third_party/thrust"]
path = third_party/thrust path = third_party/thrust
url = https://github.com/NVIDIA/thrust.git url = https://github.com/NVIDIA/thrust.git
...@@ -166,11 +166,17 @@ file(GLOB_RECURSE DGL_SRC_1 ...@@ -166,11 +166,17 @@ file(GLOB_RECURSE DGL_SRC_1
src/api/*.cc src/api/*.cc
src/graph/*.cc src/graph/*.cc
src/scheduler/*.cc src/scheduler/*.cc
src/rpc/*.cc
) )
list(APPEND DGL_SRC ${DGL_SRC_1}) list(APPEND DGL_SRC ${DGL_SRC_1})
if (NOT MSVC)
file(GLOB_RECURSE DGL_RPC_SRC src/rpc/*.cc)
else()
file(GLOB_RECURSE DGL_RPC_SRC src/rpc/network/*.cc)
endif()
list(APPEND DGL_SRC ${DGL_RPC_SRC})
# Configure cuda # Configure cuda
if(USE_CUDA) if(USE_CUDA)
dgl_config_cuda(DGL_CUDA_SRC) dgl_config_cuda(DGL_CUDA_SRC)
...@@ -198,6 +204,8 @@ else(USE_CUDA) ...@@ -198,6 +204,8 @@ else(USE_CUDA)
add_library(dgl SHARED ${DGL_SRC}) add_library(dgl SHARED ${DGL_SRC})
endif(USE_CUDA) endif(USE_CUDA)
set_property(TARGET dgl PROPERTY CXX_STANDARD 14)
# include directories # include directories
target_include_directories(dgl PRIVATE "include") target_include_directories(dgl PRIVATE "include")
target_include_directories(dgl PRIVATE "third_party/dlpack/include") target_include_directories(dgl PRIVATE "third_party/dlpack/include")
...@@ -209,6 +217,7 @@ target_include_directories(dgl PRIVATE "tensoradapter/include") ...@@ -209,6 +217,7 @@ target_include_directories(dgl PRIVATE "tensoradapter/include")
target_include_directories(dgl PRIVATE "third_party/nanoflann/include") target_include_directories(dgl PRIVATE "third_party/nanoflann/include")
target_include_directories(dgl PRIVATE "third_party/libxsmm/include") target_include_directories(dgl PRIVATE "third_party/libxsmm/include")
# For serialization # For serialization
if (USE_HDFS) if (USE_HDFS)
option(DMLC_HDFS_SHARED "dgl has to build with dynamic hdfs library" ON) option(DMLC_HDFS_SHARED "dgl has to build with dynamic hdfs library" ON)
...@@ -242,6 +251,16 @@ if((NOT MSVC) AND USE_LIBXSMM) ...@@ -242,6 +251,16 @@ if((NOT MSVC) AND USE_LIBXSMM)
list(APPEND DGL_LINKER_LIBS -L${CMAKE_SOURCE_DIR}/third_party/libxsmm/lib/ xsmm) list(APPEND DGL_LINKER_LIBS -L${CMAKE_SOURCE_DIR}/third_party/libxsmm/lib/ xsmm)
endif((NOT MSVC) AND USE_LIBXSMM) endif((NOT MSVC) AND USE_LIBXSMM)
if(NOT MSVC)
# Only build tensorpipe on linux
string(REPLACE "-pedantic" "" CMAKE_C_FLAGS ${CMAKE_C_FLAGS})
set(TP_BUILD_LIBUV ON)
set(TP_STATIC_OR_SHARED STATIC)
add_subdirectory(third_party/tensorpipe)
list(APPEND DGL_LINKER_LIBS tensorpipe)
target_include_directories(dgl PRIVATE third_party/tensorpipe)
endif(NOT MSVC)
# Compile TVM Runtime and Featgraph # Compile TVM Runtime and Featgraph
# (NOTE) We compile a dynamic library called featgraph_runtime, which the DGL library links to. # (NOTE) We compile a dynamic library called featgraph_runtime, which the DGL library links to.
# Kernels are packed in a separate dynamic library called featgraph_kernels, which DGL # Kernels are packed in a separate dynamic library called featgraph_kernels, which DGL
......
...@@ -65,7 +65,7 @@ def unit_test_linux(backend, dev) { ...@@ -65,7 +65,7 @@ def unit_test_linux(backend, dev) {
def unit_test_win64(backend, dev) { def unit_test_win64(backend, dev) {
init_git_win64() init_git_win64()
unpack_lib("dgl-${dev}-win64", dgl_win64_libs) unpack_lib("dgl-${dev}-win64", dgl_win64_libs)
timeout(time: 10, unit: 'MINUTES') { timeout(time: 20, unit: 'MINUTES') {
bat "CALL tests\\scripts\\task_unit_test.bat ${backend}" bat "CALL tests\\scripts\\task_unit_test.bat ${backend}"
} }
} }
......
...@@ -35,8 +35,8 @@ def start_server(server_id, ip_config, num_servers, num_clients, server_state, \ ...@@ -35,8 +35,8 @@ def start_server(server_id, ip_config, num_servers, num_clients, server_state, \
""" """
assert server_id >= 0, 'server_id (%d) cannot be a negative number.' % server_id assert server_id >= 0, 'server_id (%d) cannot be a negative number.' % server_id
assert num_servers > 0, 'num_servers (%d) must be a positive number.' % num_servers assert num_servers > 0, 'num_servers (%d) must be a positive number.' % num_servers
assert num_clients >= 0, 'num_client (%d) cannot be a negative number.' % num_client assert num_clients >= 0, 'num_client (%d) cannot be a negative number.' % num_clients
assert max_queue_size > 0, 'queue_size (%d) cannot be a negative number.' % queue_size assert max_queue_size > 0, 'queue_size (%d) cannot be a negative number.' % max_queue_size
assert net_type in ('socket'), 'net_type (%s) can only be \'socket\'' % net_type assert net_type in ('socket'), 'net_type (%s) can only be \'socket\'' % net_type
# Register signal handler. # Register signal handler.
rpc.register_sig_handler() rpc.register_sig_handler()
......
...@@ -21,7 +21,7 @@ struct RawDataTensorCtx { ...@@ -21,7 +21,7 @@ struct RawDataTensorCtx {
void RawDataTensoDLPackDeleter(DLManagedTensor* tensor) { void RawDataTensoDLPackDeleter(DLManagedTensor* tensor) {
auto ctx = static_cast<RawDataTensorCtx*>(tensor->manager_ctx); auto ctx = static_cast<RawDataTensorCtx*>(tensor->manager_ctx);
free(ctx->tensor.dl_tensor.data); delete[] ctx->tensor.dl_tensor.data;
delete ctx; delete ctx;
} }
......
This diff is collapsed.
...@@ -12,17 +12,21 @@ ...@@ -12,17 +12,21 @@
#include <dmlc/thread_local.h> #include <dmlc/thread_local.h>
#include <cstdint> #include <cstdint>
#include <memory> #include <memory>
#include <deque>
#include <vector> #include <vector>
#include <string> #include <string>
#include "./network/communicator.h" #include <mutex>
#include "./network/socket_communicator.h"
#include "./network/msg_queue.h" #include "./rpc_msg.h"
#include "./tensorpipe/tp_communicator.h"
#include "./network/common.h" #include "./network/common.h"
#include "./server_state.h" #include "./server_state.h"
namespace dgl { namespace dgl {
namespace rpc { namespace rpc {
struct RPCContext;
// Communicator handler type // Communicator handler type
typedef void* CommunicatorHandle; typedef void* CommunicatorHandle;
...@@ -31,8 +35,8 @@ struct RPCContext { ...@@ -31,8 +35,8 @@ struct RPCContext {
/*! /*!
* \brief Rank of this process. * \brief Rank of this process.
* *
* If the process is a client, this is equal to client ID. Otherwise, the process * If the process is a client, this is equal to client ID. Otherwise, the
* is a server and this is equal to server ID. * process is a server and this is equal to server ID.
*/ */
int32_t rank = -1; int32_t rank = -1;
...@@ -49,7 +53,7 @@ struct RPCContext { ...@@ -49,7 +53,7 @@ struct RPCContext {
/*! /*!
* \brief Message sequence number. * \brief Message sequence number.
*/ */
int64_t msg_seq = 0; std::atomic<int64_t> msg_seq{0};
/*! /*!
* \brief Total number of server. * \brief Total number of server.
...@@ -74,12 +78,17 @@ struct RPCContext { ...@@ -74,12 +78,17 @@ struct RPCContext {
/*! /*!
* \brief Sender communicator. * \brief Sender communicator.
*/ */
std::shared_ptr<network::Sender> sender; std::shared_ptr<TPSender> sender;
/*! /*!
* \brief Receiver communicator. * \brief Receiver communicator.
*/ */
std::shared_ptr<network::Receiver> receiver; std::shared_ptr<TPReceiver> receiver;
/*!
* \brief Tensorpipe global context
*/
std::shared_ptr<tensorpipe::Context> ctx;
/*! /*!
* \brief Server state data. * \brief Server state data.
...@@ -92,74 +101,27 @@ struct RPCContext { ...@@ -92,74 +101,27 @@ struct RPCContext {
*/ */
std::shared_ptr<ServerState> server_state; std::shared_ptr<ServerState> server_state;
/*! \brief Get the thread-local RPC context structure */ /*! \brief Get the RPC context singleton */
static RPCContext *ThreadLocal() { static RPCContext* getInstance() {
return dmlc::ThreadLocalStore<RPCContext>::Get(); static RPCContext ctx;
return &ctx;
} }
/*! \brief Reset the RPC context */ /*! \brief Reset the RPC context */
static void Reset() { static void Reset() {
auto* t = ThreadLocal(); auto* t = getInstance();
t->rank = -1; t->rank = -1;
t->machine_id = -1; t->machine_id = -1;
t->num_machines = 0; t->num_machines = 0;
t->num_clients = 0; t->num_clients = 0;
t->barrier_count = 0; t->barrier_count = 0;
t->num_servers_per_machine = 0; t->num_servers_per_machine = 0;
t->sender = std::shared_ptr<network::Sender>(); t->sender.reset();
t->receiver = std::shared_ptr<network::Receiver>(); t->receiver.reset();
} t->ctx.reset();
};
/*! \brief RPC message data structure
*
* This structure is exposed to Python and can be used as argument or return value
* in C API.
*/
struct RPCMessage : public runtime::Object {
/*! \brief Service ID */
int32_t service_id;
/*! \brief Sequence number of this message. */
int64_t msg_seq;
/*! \brief Client ID. */
int32_t client_id;
/*! \brief Server ID. */
int32_t server_id;
/*! \brief Payload buffer carried by this request.*/
std::string data;
/*! \brief Extra payloads in the form of tensors.*/
std::vector<runtime::NDArray> tensors;
bool Load(dmlc::Stream* stream) {
stream->Read(&service_id);
stream->Read(&msg_seq);
stream->Read(&client_id);
stream->Read(&server_id);
stream->Read(&data);
stream->Read(&tensors);
return true;
}
void Save(dmlc::Stream* stream) const {
stream->Write(service_id);
stream->Write(msg_seq);
stream->Write(client_id);
stream->Write(server_id);
stream->Write(data);
stream->Write(tensors);
} }
static constexpr const char* _type_key = "rpc.RPCMessage";
DGL_DECLARE_OBJECT_TYPE_INFO(RPCMessage, runtime::Object);
}; };
DGL_DEFINE_OBJECT_REF(RPCMessageRef, RPCMessage);
/*! \brief RPC status flag */ /*! \brief RPC status flag */
enum RPCStatus { enum RPCStatus {
kRPCSuccess = 0, kRPCSuccess = 0,
......
/*!
* Copyright (c) 2020 by Contributors
* \file rpc/rpc_msg.h
* \brief Common headers for remote process call (RPC).
*/
#ifndef DGL_RPC_RPC_MSG_H_
#define DGL_RPC_RPC_MSG_H_
#include <dgl/runtime/object.h>
#include <string>
#include <vector>
namespace dgl {
namespace rpc {
/*! \brief RPC message data structure
*
* This structure is exposed to Python and can be used as argument or return
* value in C API.
*/
struct RPCMessage : public runtime::Object {
/*! \brief Service ID */
int32_t service_id;
/*! \brief Sequence number of this message. */
int64_t msg_seq;
/*! \brief Client ID. */
int32_t client_id;
/*! \brief Server ID. */
int32_t server_id;
/*! \brief Payload buffer carried by this request.*/
std::string data;
/*! \brief Extra payloads in the form of tensors.*/
std::vector<runtime::NDArray> tensors;
bool Load(dmlc::Stream* stream) {
stream->Read(&service_id);
stream->Read(&msg_seq);
stream->Read(&client_id);
stream->Read(&server_id);
stream->Read(&data);
stream->Read(&tensors);
return true;
}
void Save(dmlc::Stream* stream) const {
stream->Write(service_id);
stream->Write(msg_seq);
stream->Write(client_id);
stream->Write(server_id);
stream->Write(data);
stream->Write(tensors);
}
static constexpr const char* _type_key = "rpc.RPCMessage";
DGL_DECLARE_OBJECT_TYPE_INFO(RPCMessage, runtime::Object);
};
DGL_DEFINE_OBJECT_REF(RPCMessageRef, RPCMessage);
} // namespace rpc
} // namespace dgl
#endif // DGL_RPC_RPC_MSG_H_
# Introduction to tensorpipe
## Process of setup communication:
```cpp
context = std::make_shared<tensorpipe::Context>();
// For Receiver
// Create listener to accept join request
listener = context->listen({addr});
// Accept join request and generate pipe
std::promise<std::shared_ptr<Pipe>> pipeProm;
listener->accept([&](const Error& error, std::shared_ptr<Pipe> pipe) {
if (error) {
LOG(WARNING) << error.what();
}
pipeProm.set_value(std::move(pipe));
});
std::shared_ptr<Pipe> pipe = pipeProm.get_future().get();
// For Sender
pipe = context->connect(addr);
// Note that the pipe may not be really available at this point
// For example if no listener listening the address, there won't be error raised
// The error will happen at the write/read operation. Thus we need to manually check this
std::promise<bool> done;
tensorpipe::Message tpmsg;
tpmsg.metadata = "dglconnect";
pipe->write(tpmsg, [&done](const tensorpipe::Error& error) {
if (error) {
done.set_value(false);
} else {
done.set_value(true);
}
});
if (done.get_future().get()) {
break;
} else {
sleep(5);
LOG(INFO) << "Cannot connect to remove server. Wait to retry";
}
```
## Read and Write
Message structure: https://github.com/pytorch/tensorpipe/blob/master/tensorpipe/core/message.h
There are three concepts, Message, Descriptor and Allocation.
Message is the core struct for communication. Message contains three major field, metadata(string), payload(cpu memory buffers), tensor(cpu/gpu memory buffer, with device as attribute).
Descriptor and Allocation are for the read scenario. A typical read operation as follows
```cpp
pipe->readDescriptor(
[](const Error& error, Descriptor descriptor) {
// Descriptor contains metadata of the message, the data size of each payload, the device information of tensors and other metadatas other than the real buffer
// User should allocate the proper memory based on the descriptor, and set back the allocated memory to Allocation object
Allocation allocation;
// Then call pipe->read to ask pipe to receive the real buffer into allocations
pipe->read(allocation, [](const Error& error) {});
});
```
To send the message is much simpler
```cpp
// Resource cleaning should be handled in the callback
pipe->write(message, callback_fn)
```
## Register the underlying communication channel
There are two concept, transport and channel.
Transport is the basic component for communication like sockets, which only supports cpu buffers.
Channel is higher abstraction over transport, which can support gpu buffers, or utilize multiple transport method to acceelerate communication
Tensorpipe will try to setup the channel based on priority.
```cpp
// Register transport
auto context = std::make_shared<tensorpipe::Context>();
// uv is short for libuv, using epoll with sockets to communicate
auto transportContext = tensorpipe::transport::uv::create();
context->registerTransport(0 /* priority */, "tcp", transportContext);/
// basic channel just use the bare transport to communicate
auto basicChannel = tensorpipe::channel::basic::create();
context->registerChannel(0, "basic", basicChannel);
// Below is the mpt(multiplex transport) channel, which can use multiple uv transport to increase throughput
std::vector<std::shared_ptr<tensorpipe::transport::Context>> contexts = {
tensorpipe::transport::uv::create(), tensorpipe::transport::uv::create(),
tensorpipe::transport::uv::create()};
std::vector<std::shared_ptr<tensorpipe::transport::Listener>> listeners = {
contexts[0]->listen("127.0.0.1"), contexts[1]->listen("127.0.0.1"),
contexts[2]->listen("127.0.0.1")};
auto mptChannel = tensorpipe::channel::mpt::create(
std::move(contexts), std::move(listeners));
context->registerChannel(10, "mpt", mptChannel);
```
There are more channels supported by tensorpipe, such as CUDA IPC (for cuda communication on the same machine), CMA(using shared memory on the same machine), CUDA GDR(using infiniband with CUDA GPUDirect for gpu buffer), CUDA Basic(using socket+seperate thread to copy buffer to CUDA memory.
Quote from tensorpipe:
Backends come in two flavors:
Transports are the connections used by the pipes to transfer control messages, and the (smallish) core payloads. They are meant to be lightweight and low-latency. The most basic transport is a simple TCP one, which should work in all scenarios. A more optimized one, for example, is based on a ring buffer allocated in shared memory, which two processes on the same machine can use to communicate by performing just a memory copy, without passing through the kernel.
Channels are where the heavy lifting takes place, as they take care of copying the (larger) tensor data. High bandwidths are a requirement. Examples include multiplexing chunks of data across multiple TCP sockets and processes, so to saturate the NIC's bandwidth. Or using a CUDA memcpy call to transfer memory from one GPU to another using NVLink.
\ No newline at end of file
/*
* Copyright (c) Facebook, Inc. and its affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#ifndef DGL_RPC_TENSORPIPE_QUEUE_H_
#define DGL_RPC_TENSORPIPE_QUEUE_H_
#include <condition_variable>
#include <deque>
#include <mutex>
namespace dgl {
namespace rpc {
template <typename T>
class Queue {
public:
// Capacity isn't used actually
explicit Queue(int capacity = 1) : capacity_(capacity) {}
void push(T t) {
std::unique_lock<std::mutex> lock(mutex_);
// while (items_.size() >= capacity_) {
// cv_.wait(lock);
// }
items_.push_back(std::move(t));
cv_.notify_all();
}
T pop() {
std::unique_lock<std::mutex> lock(mutex_);
while (items_.size() == 0) {
cv_.wait(lock);
}
T t(std::move(items_.front()));
items_.pop_front();
cv_.notify_all();
return t;
}
private:
std::mutex mutex_;
std::condition_variable cv_;
const int capacity_;
std::deque<T> items_;
};
} // namespace rpc
} // namespace dgl
#endif // DGL_RPC_TENSORPIPE_QUEUE_H_
/*!
* Copyright (c) 2019 by Contributors
* \file tp_communicator.cc
* \brief Tensorpipe Communicator for DGL distributed training.
*/
#include "tp_communicator.h"
#include <time.h>
#include <unistd.h>
#include <future>
#include <memory>
#include <utility>
#include "../rpc.h"
namespace dgl {
namespace rpc {
using namespace tensorpipe;
void TPSender::AddReceiver(const std::string& addr, int recv_id) {
receiver_addrs_[recv_id] = addr;
}
bool TPSender::Connect() {
for (const auto& kv : receiver_addrs_) {
std::shared_ptr<Pipe> pipe;
for (;;) {
pipe = context->connect(kv.second);
std::promise<bool> done;
tensorpipe::Message tpmsg;
tpmsg.metadata = "dglconnect";
pipe->write(tpmsg, [&done](const tensorpipe::Error& error) {
if (error) {
done.set_value(false);
} else {
done.set_value(true);
}
});
if (done.get_future().get()) {
break;
} else {
sleep(5);
LOG(INFO) << "Cannot connect to remove server " << kv.second
<< ". Wait to retry";
}
}
pipes_[kv.first] = pipe;
}
return true;
}
void TPSender::Send(const RPCMessage& msg, int recv_id) {
auto pipe = pipes_[recv_id];
tensorpipe::Message tp_msg;
std::string* zerocopy_blob_ptr = &tp_msg.metadata;
StreamWithBuffer zc_write_strm(zerocopy_blob_ptr, true);
zc_write_strm.Write(msg);
int32_t nonempty_ndarray_count = zc_write_strm.buffer_list().size();
zerocopy_blob_ptr->append(reinterpret_cast<char*>(&nonempty_ndarray_count),
sizeof(int32_t));
tp_msg.tensors.resize(nonempty_ndarray_count);
// Hold the NDArray that ensure it's valid until write operation completes
auto ndarray_holder = std::make_shared<std::vector<NDArray>>();
ndarray_holder->resize(nonempty_ndarray_count);
auto& buffer_list = zc_write_strm.buffer_list();
for (int i = 0; i < buffer_list.size(); i++) {
auto& ptr = buffer_list[i];
(*ndarray_holder.get())[i] = ptr.tensor;
tensorpipe::CpuBuffer cpu_buffer;
cpu_buffer.ptr = ptr.data;
tp_msg.tensors[i].buffer = cpu_buffer;
tp_msg.tensors[i].length = ptr.size;
if (ptr.size == 0) {
LOG(FATAL) << "Cannot send a empty NDArray.";
}
}
pipe->write(tp_msg,
[ndarray_holder, recv_id](const tensorpipe::Error& error) {
if (error) {
LOG(FATAL) << "Failed to send message to " << recv_id
<< ". Details: " << error.what();
}
});
}
void TPSender::Finalize() {}
void TPReceiver::Finalize() {}
bool TPReceiver::Wait(const std::string& addr, int num_sender) {
listener = context->listen({addr});
for (int i = 0; i < num_sender; i++) {
std::promise<std::shared_ptr<Pipe>> pipeProm;
listener->accept([&](const Error& error, std::shared_ptr<Pipe> pipe) {
if (error) {
LOG(WARNING) << error.what();
}
pipeProm.set_value(std::move(pipe));
});
std::shared_ptr<Pipe> pipe = pipeProm.get_future().get();
std::promise<bool> checkConnect;
pipe->readDescriptor(
[pipe, &checkConnect](const Error& error, Descriptor descriptor) {
Allocation allocation;
checkConnect.set_value(descriptor.metadata == "dglconnect");
pipe->read(allocation, [](const Error& error) {});
});
CHECK(checkConnect.get_future().get()) << "Invalid connect message.";
pipes_[i] = pipe;
ReceiveFromPipe(pipe, queue_);
}
return true;
}
void TPReceiver::ReceiveFromPipe(std::shared_ptr<Pipe> pipe,
std::shared_ptr<RPCMessageQueue> queue) {
pipe->readDescriptor([pipe, queue = std::move(queue)](const Error& error,
Descriptor descriptor) {
if (error) {
// Error may happen when the pipe is closed
return;
}
Allocation allocation;
CHECK_EQ(descriptor.payloads.size(), 0) << "Invalid DGL RPC Message";
int tensorsize = descriptor.tensors.size();
if (tensorsize > 0) {
allocation.tensors.resize(tensorsize);
for (int i = 0; i < descriptor.tensors.size(); i++) {
tensorpipe::CpuBuffer cpu_buffer;
cpu_buffer.ptr = new char[descriptor.tensors[i].length];
allocation.tensors[i].buffer = cpu_buffer;
}
}
pipe->read(
allocation, [allocation, descriptor = std::move(descriptor),
queue = std::move(queue), pipe](const Error& error) {
if (error) {
// Because we always have a read event posted to the epoll,
// Therefore when pipe is closed, error will be raised.
// But this error is expected.
// Other error is not expected. But we cannot identify the error with each
// Other for now. Thus here we skip handling for all errors
return;
}
char* meta_msg_begin = const_cast<char*>(&descriptor.metadata[0]);
std::vector<void*> buffer_list(descriptor.tensors.size());
for (int i = 0; i < descriptor.tensors.size(); i++) {
buffer_list[i] = allocation.tensors[i].buffer.unwrap<CpuBuffer>().ptr;
}
StreamWithBuffer zc_read_strm(
meta_msg_begin, descriptor.metadata.size() - sizeof(int32_t),
buffer_list);
RPCMessage msg;
zc_read_strm.Read(&msg);
queue->push(msg);
TPReceiver::ReceiveFromPipe(pipe, queue);
});
});
}
void TPReceiver::Recv(RPCMessage* msg) { *msg = std::move(queue_->pop()); }
} // namespace rpc
} // namespace dgl
/*!
* Copyright (c) 2019 by Contributors
* \file tp_communicator.h
* \brief Tensorpipe Communicator for DGL distributed training.
*/
#ifndef DGL_RPC_TENSORPIPE_TP_COMMUNICATOR_H_
#define DGL_RPC_TENSORPIPE_TP_COMMUNICATOR_H_
#include <dmlc/logging.h>
#include <tensorpipe/tensorpipe.h>
#include <deque>
#include <memory>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>
#include "./queue.h"
namespace dgl {
namespace rpc {
class RPCMessage;
typedef Queue<RPCMessage> RPCMessageQueue;
/*!
* \brief TPSender for DGL distributed training.
*
* TPSender is the communicator implemented by tcp socket.
*/
class TPSender {
public:
/*!
* \brief Sender constructor
* \param queue_size size of message queue
*/
explicit TPSender(std::shared_ptr<tensorpipe::Context> ctx) {
CHECK(ctx) << "Context is not initialized";
this->context = ctx;
}
/*!
* \brief Add receiver's address and ID to the sender's namebook
* \param addr Networking address, e.g., 'tcp://127.0.0.1:50091'
* \param id receiver's ID
*
* AddReceiver() is not thread-safe and only one thread can invoke this API.
*/
void AddReceiver(const std::string& addr, int recv_id);
/*!
* \brief Connect with all the Receivers
* \return True for success and False for fail
*
* Connect() is not thread-safe and only one thread can invoke this API.
*/
bool Connect();
/*!
* \brief Send RPCMessage to specified Receiver.
* \param msg data message \param recv_id receiver's ID
*/
void Send(const RPCMessage& msg, int recv_id);
/*!
* \brief Finalize TPSender
*/
void Finalize();
/*!
* \brief Communicator type: 'tp'
*/
inline std::string Type() const { return std::string("tp"); }
private:
/*!
* \brief global context of tensorpipe
*/
std::shared_ptr<tensorpipe::Context> context;
/*!
* \brief pipe for each connection of receiver
*/
std::unordered_map<int /* receiver ID */, std::shared_ptr<tensorpipe::Pipe>>
pipes_;
/*!
* \brief receivers' listening address
*/
std::unordered_map<int /* receiver ID */, std::string> receiver_addrs_;
};
/*!
* \brief TPReceiver for DGL distributed training.
*
* Tensorpipe Receiver is the communicator implemented by tcp socket.
*/
class TPReceiver {
public:
/*!
* \brief Receiver constructor
* \param queue_size size of message queue.
*/
explicit TPReceiver(std::shared_ptr<tensorpipe::Context> ctx) {
CHECK(ctx) << "Context is not initialized";
this->context = ctx;
queue_ = std::make_shared<RPCMessageQueue>();
}
/*!
* \brief Wait for all the Senders to connect
* \param addr Networking address, e.g., 'tcp://127.0.0.1:50051'
* \param num_sender total number of Senders
* \return True for success and False for fail
*
* Wait() is not thread-safe and only one thread can invoke this API.
*/
bool Wait(const std::string& addr, int num_sender);
/*!
* \brief Recv RPCMessage from Sender. Actually removing data from queue.
* \param msg pointer of RPCmessage
* \param send_id which sender current msg comes from
* \return Status code
*
* (1) The Recv() API is blocking, which will not
* return until getting data from message queue.
* (2) The Recv() API is thread-safe.
* (3) Memory allocated by communicator but will not own it after the function
* returns.
*/
void Recv(RPCMessage* msg);
/*!
* \brief Finalize SocketReceiver
*
* Finalize() is not thread-safe and only one thread can invoke this API.
*/
void Finalize();
/*!
* \brief Communicator type: 'tp' (tensorpipe)
*/
inline std::string Type() const { return std::string("tp"); }
/*!
* \brief Issue a receive request on pipe, and push the result into queue
*/
static void ReceiveFromPipe(std::shared_ptr<tensorpipe::Pipe> pipe,
std::shared_ptr<RPCMessageQueue> queue);
private:
/*!
* \brief number of sender
*/
int num_sender_;
/*!
* \brief listener to build pipe
*/
std::shared_ptr<tensorpipe::Listener> listener;
/*!
* \brief global context of tensorpipe
*/
std::shared_ptr<tensorpipe::Context> context;
/*!
* \brief pipe for each client connections
*/
std::unordered_map<int /* Sender (virutal) ID */,
std::shared_ptr<tensorpipe::Pipe>>
pipes_;
/*!
* \brief RPCMessage queue
*/
std::shared_ptr<RPCMessageQueue> queue_;
};
} // namespace rpc
} // namespace dgl
#endif // DGL_RPC_TENSORPIPE_TP_COMMUNICATOR_H_
...@@ -107,12 +107,13 @@ class HelloRequest(dgl.distributed.Request): ...@@ -107,12 +107,13 @@ class HelloRequest(dgl.distributed.Request):
res = HelloResponse(self.hello_str, self.integer, new_tensor) res = HelloResponse(self.hello_str, self.integer, new_tensor)
return res return res
def start_server(num_clients, ip_config): def start_server(num_clients, ip_config, server_id=0):
print("Sleep 5 seconds to test client re-connect.") print("Sleep 5 seconds to test client re-connect.")
time.sleep(5) time.sleep(5)
server_state = dgl.distributed.ServerState(None, local_g=None, partition_book=None) server_state = dgl.distributed.ServerState(None, local_g=None, partition_book=None)
dgl.distributed.register_service(HELLO_SERVICE_ID, HelloRequest, HelloResponse) dgl.distributed.register_service(HELLO_SERVICE_ID, HelloRequest, HelloResponse)
dgl.distributed.start_server(server_id=0, print("Start server {}".format(server_id))
dgl.distributed.start_server(server_id=server_id,
ip_config=ip_config, ip_config=ip_config,
num_servers=1, num_servers=1,
num_clients=num_clients, num_clients=num_clients,
...@@ -224,8 +225,50 @@ def test_multi_client(): ...@@ -224,8 +225,50 @@ def test_multi_client():
pserver.join() pserver.join()
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
def test_multi_thread_rpc():
os.environ['DGL_DIST_MODE'] = 'distributed'
ip_config = open("rpc_ip_config_multithread.txt", "w")
num_servers = 2
for _ in range(num_servers): # 3 servers
ip_config.write('{}\n'.format(get_local_usable_addr()))
ip_config.close()
ctx = mp.get_context('spawn')
pserver_list = []
for i in range(num_servers):
pserver = ctx.Process(target=start_server, args=(1, "rpc_ip_config_multithread.txt", i))
pserver.start()
pserver_list.append(pserver)
def start_client_multithread(ip_config):
import threading
dgl.distributed.connect_to_server(ip_config=ip_config, num_servers=1)
dgl.distributed.register_service(HELLO_SERVICE_ID, HelloRequest, HelloResponse)
req = HelloRequest(STR, INTEGER, TENSOR, simple_func)
dgl.distributed.send_request(0, req)
def subthread_call(server_id):
req = HelloRequest(STR, INTEGER, TENSOR+ server_id, simple_func)
dgl.distributed.send_request(server_id, req)
subthread = threading.Thread(target=subthread_call, args=(1,))
subthread.start()
subthread.join()
res0 = dgl.distributed.recv_response()
res1 = dgl.distributed.recv_response()
assert_array_equal(F.asnumpy(res0.tensor), F.asnumpy(TENSOR))
assert_array_equal(F.asnumpy(res1.tensor), F.asnumpy(TENSOR+1))
dgl.distributed.exit_client()
start_client_multithread("rpc_ip_config_multithread.txt")
pserver.join()
if __name__ == '__main__': if __name__ == '__main__':
test_serialize() test_serialize()
test_rpc_msg() test_rpc_msg()
test_rpc() test_rpc()
test_multi_client() test_multi_client()
test_multi_thread_rpc()
\ No newline at end of file
Subproject commit 6042f1a4cbce8eef997f11ed0012de137b317361
Subproject commit 0140eeff1fffcf5069dea3abb57095695320971c Subproject commit 757e4063f6464740b8ff4a2cae9136d2f8458020
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment