Unverified Commit 7639b5e7 authored by Jinjing Zhou's avatar Jinjing Zhou Committed by GitHub
Browse files

[Fix] New StreamWithBuffer interface (#1557)



* WIP: rpc components

* client & server

* move network package to rpc

* fix include

* fix compile

* c api

* wip: test

* add basic tests

* missing file

* [RPC] Zero copy serializer (#1517)

* zerocopy serialization

* add test for HeteroGraph

* fix lint

* remove unnecessary codes

* add comment

* lint

* lint

* disable pylint for now

* add include for win

* windows guard

* lint

* lint

* skip test on windows

* refactor

* add comment

* fix

* comment

* 1111

* fix

* Update Jenkinsfile

* [RPC] Implementation of RPC infra (#1544)

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* remove client.cc and server.cc

* fix lint

* update

* update

* fix linr

* update

* fix lint

* update

* update

* update

* update

* update

* update

* update test

* update

* update test

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update comment

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* fix lint

* fix lint

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* Refactor StreamWithBuffer (#1550)

* refactor

* fix with new interface

* remove copy

* fix

* remove comment
Co-authored-by: default avatarMinjie Wang <wmjlyjemaine@gmail.com>
Co-authored-by: default avatarChao Ma <mctt90@gmail.com>
parent cc07bd44
......@@ -22,14 +22,27 @@
namespace dgl {
// StringStreamWithBuffer is backed up by a string. This class supports
// serializing and deserializing NDArrays stored in shared memory. If the stream
// is created for sending/recving data through network, the data pointer of the
// NDArray will be transmitted directly without and copy. Otherwise, the stream
// is for sending/recving data to another process on the same machine, so if an
// NDArray is stored in shared memory, it will just record the shared memory
// name instead of the actual data buffer.
class StringStreamWithBuffer : public dmlc::MemoryStringStream {
/* StreamWithBuffer is backed up by dmlc::MemoryFixedSizeStream or
dmlc::MemoryStringStream. This class supports serializing and deserializing
NDArrays stored in shared memory. If the stream is created for
sending/recving data through network, the data pointer of the NDArray will be
transmitted directly without and copy. Otherwise, the stream is for
sending/recving data to another process on the same machine, so if an NDArray
is stored in shared memory, it will just record the shared memory name
instead of the actual data buffer.
For example:
std::string blob;
// Send to local
StreamWithBuffer strm(&blob, false);
// Send to remote
StreamWithBuffer strm(&blob, true);
// Receive from local
StreamWithBuffer strm(&blob, false);
// Receive from remote
std::vector<void*> ptr_list
StreamWithBuffer strm(&blob, ptr_list);
*/
class StreamWithBuffer : public dmlc::SeekStream {
public:
// Buffer type. Storing NDArray to maintain the reference counting to ensure
// the liveness of data pointer
......@@ -47,46 +60,95 @@ class StringStreamWithBuffer : public dmlc::MemoryStringStream {
/*!
* \brief This constructor is for writing scenario or reading from local
* machine
* \param metadata_ptr The string to write/load from zerocopy write/load
* \param strm The backup stream to write/load from
* \param send_to_remote Whether this stream will be deserialized at remote
* machine or the local machine. If true, will record the data pointer into
* buffer list.
*
* For example:
* std::string blob;
* // Write to send to local
* StringStreamWithBuffer buf_strm(&blob, false)
* // Write to send to remote
* StringStreamWithBuffer buf_strm(&blob, true)
* // Or
* StringStreamWithBuffer buf_strm(&blob)
* // Read from local
* StringStreamWithBuffer buf_strm(&blob, false)
*/
explicit StringStreamWithBuffer(std::string* metadata_ptr,
bool send_to_remote = true)
: MemoryStringStream(metadata_ptr),
StreamWithBuffer(std::unique_ptr<dmlc::SeekStream> strm, bool send_to_remote)
: strm_(std::move(strm)),
buffer_list_(),
send_to_remote_(send_to_remote) {}
/*!
* \brief This constructor is for reading from remote
* \param metadata_ptr The string to write/load from zerocopy write/load
* \param strm The stream to write/load from zerocopy write/load
* \param data_ptr_list list of pointer to reconstruct NDArray
*
* For example:
* std::string blob;
* std::vector<void*> data_ptr_list;
* // Read from remote sended pointer list
* StringStreamWithBuffer buf_strm(&blob, data_ptr_list)
* StreamWithBuffer buf_strm(&blob, data_ptr_list)
*/
StreamWithBuffer(std::unique_ptr<dmlc::SeekStream> strm,
const std::vector<void*>& data_ptr_list)
: strm_(std::move(strm)), send_to_remote_(true) {
for (void* data : data_ptr_list) {
buffer_list_.emplace_back(data);
}
}
/*!
* \brief Construct stream backed up by string
* \param blob The string to write/load from zerocopy write/load
* \param send_to_remote Whether this stream will be deserialized at remote
* machine or the local machine. If true, will record the data pointer into
* buffer list.
*/
StreamWithBuffer(std::string* blob, bool send_to_remote)
: strm_(new dmlc::MemoryStringStream(blob)),
send_to_remote_(send_to_remote) {}
/*!
* \brief Construct stream backed up by string
* \param p_buffer buffer pointer
* \param size buffer size
* \param send_to_remote Whether this stream will be deserialized at remote
* machine or the local machine. If true, will record the data pointer into
* buffer list.
*/
StringStreamWithBuffer(std::string* metadata_ptr,
StreamWithBuffer(char* p_buffer, size_t size, bool send_to_remote)
: strm_(new dmlc::MemoryFixedSizeStream(p_buffer, size)),
send_to_remote_(send_to_remote) {}
/*!
* \brief Construct stream backed up by string, and reconstruct NDArray
* from data_ptr_list
* \param blob The string to write/load from zerocopy write/load
* \param data_ptr_list pointer list for NDArrays to deconstruct from
*/
StreamWithBuffer(std::string* blob, const std::vector<void*>& data_ptr_list)
: strm_(new dmlc::MemoryStringStream(blob)), send_to_remote_(true) {
for (void* data : data_ptr_list) {
buffer_list_.emplace_back(data);
}
}
/*!
* \brief Construct stream backed up by string, and reconstruct NDArray
* from data_ptr_list
* \param p_buffer buffer pointer
* \param size buffer size
* \param data_ptr_list pointer list for NDArrays to deconstruct from
*/
StreamWithBuffer(char* p_buffer, size_t size,
const std::vector<void*>& data_ptr_list)
: MemoryStringStream(metadata_ptr), send_to_remote_(true) {
: strm_(new dmlc::MemoryFixedSizeStream(p_buffer, size)),
send_to_remote_(true) {
for (void* data : data_ptr_list) {
buffer_list_.emplace_back(data);
}
}
// delegate methods to strm_
virtual size_t Read(void* ptr, size_t size) { return strm_->Read(ptr, size); }
virtual void Write(const void* ptr, size_t size) { strm_->Write(ptr, size); }
virtual void Seek(size_t pos) { strm_->Seek(pos); }
virtual size_t Tell(void) { return strm_->Tell(); }
using dmlc::Stream::Read;
using dmlc::Stream::Write;
/*!
* \brief push NDArray into stream
* If send_to_remote=true, the NDArray will be saved to the buffer list
......@@ -113,6 +175,7 @@ class StringStreamWithBuffer : public dmlc::MemoryStringStream {
const std::deque<Buffer>& buffer_list() const { return buffer_list_; }
private:
std::unique_ptr<dmlc::SeekStream> strm_;
std::deque<Buffer> buffer_list_;
bool send_to_remote_;
}; // namespace dgl
......
......@@ -7,6 +7,7 @@
#include <dgl/zerocopy_serializer.h>
#include "dgl/runtime/ndarray.h"
#include "dmlc/memory_io.h"
namespace dgl {
......@@ -46,17 +47,16 @@ NDArray CreateNDArrayFromRawData(std::vector<int64_t> shape, DLDataType dtype,
return NDArray::FromDLPack(dlm_tensor);
}
void StringStreamWithBuffer::PushNDArray(const NDArray& tensor) {
void StreamWithBuffer::PushNDArray(const NDArray& tensor) {
#ifndef _WIN32
auto strm = static_cast<dmlc::Stream*>(this);
strm->Write(tensor->ndim);
strm->Write(tensor->dtype);
this->Write(tensor->ndim);
this->Write(tensor->dtype);
int ndim = tensor->ndim;
strm->WriteArray(tensor->shape, ndim);
this->WriteArray(tensor->shape, ndim);
CHECK(tensor.IsContiguous())
<< "StringStreamWithBuffer only supports contiguous tensor";
<< "StreamWithBuffer only supports contiguous tensor";
CHECK_EQ(tensor->byte_offset, 0)
<< "StringStreamWithBuffer only supports zero byte offset tensor";
<< "StreamWithBuffer only supports zero byte offset tensor";
int type_bytes = tensor->dtype.bits / 8;
int64_t num_elems = 1;
for (int i = 0; i < ndim; ++i) {
......@@ -68,33 +68,32 @@ void StringStreamWithBuffer::PushNDArray(const NDArray& tensor) {
if (send_to_remote_ || !mem) {
// If the stream is for remote communication or the data is not stored in
// shared memory, serialize the data content as a buffer.
strm->Write<bool>(false);
this->Write<bool>(false);
buffer_list_.emplace_back(tensor, tensor->data, data_byte_size);
} else {
CHECK(mem) << "Tried to send non-shared-memroy tensor to local "
"StringStreamWithBuffer";
"StreamWithBuffer";
// Serialize only the shared memory name.
strm->Write<bool>(true);
strm->Write(mem->GetName());
this->Write<bool>(true);
this->Write(mem->GetName());
}
#else
LOG(FATAL) << "StringStreamWithBuffer is not supported on windows";
LOG(FATAL) << "StreamWithBuffer is not supported on windows";
#endif // _WIN32
return;
}
NDArray StringStreamWithBuffer::PopNDArray() {
NDArray StreamWithBuffer::PopNDArray() {
#ifndef _WIN32
auto strm = static_cast<dmlc::Stream*>(this);
int ndim;
DLDataType dtype;
CHECK(strm->Read(&ndim)) << "Invalid DLTensor file format";
CHECK(strm->Read(&dtype)) << "Invalid DLTensor file format";
CHECK(this->Read(&ndim)) << "Invalid DLTensor file format";
CHECK(this->Read(&dtype)) << "Invalid DLTensor file format";
std::vector<int64_t> shape(ndim);
if (ndim != 0) {
CHECK(strm->ReadArray(&shape[0], ndim)) << "Invalid DLTensor file format";
CHECK(this->ReadArray(&shape[0], ndim)) << "Invalid DLTensor file format";
}
DLContext cpu_ctx;
......@@ -102,12 +101,12 @@ NDArray StringStreamWithBuffer::PopNDArray() {
cpu_ctx.device_id = 0;
bool is_shared_mem;
CHECK(strm->Read(&is_shared_mem)) << "Invalid stream read";
CHECK(this->Read(&is_shared_mem)) << "Invalid stream read";
std::string sharedmem_name;
if (is_shared_mem) {
CHECK(!send_to_remote_) << "Invalid attempt to deserialize from shared "
"memory with send_to_remote=true";
CHECK(strm->Read(&sharedmem_name)) << "Invalid stream read";
CHECK(this->Read(&sharedmem_name)) << "Invalid stream read";
return NDArray::EmptyShared(sharedmem_name, shape, dtype, cpu_ctx, false);
} else {
CHECK(send_to_remote_) << "Invalid attempt to deserialize from raw data "
......@@ -118,7 +117,7 @@ NDArray StringStreamWithBuffer::PopNDArray() {
return ret;
}
#else
LOG(FATAL) << "StringStreamWithBuffer is not supported on windows";
LOG(FATAL) << "StreamWithBuffer is not supported on windows";
return NDArray();
#endif // _WIN32
}
......
......@@ -18,8 +18,8 @@ namespace rpc {
RPCStatus SendRPCMessage(const RPCMessage& msg) {
std::shared_ptr<std::string> zerocopy_blob(new std::string());
StringStreamWithBuffer zc_write_strm(zerocopy_blob.get());
static_cast<dmlc::Stream *>(&zc_write_strm)->Write(msg);
StreamWithBuffer zc_write_strm(zerocopy_blob.get(), true);
zc_write_strm.Write(msg);
int32_t ndarray_count = msg.tensors.size();
zerocopy_blob->append(
reinterpret_cast<char*>(&ndarray_count),
......@@ -50,13 +50,8 @@ RPCStatus RecvRPCMessage(RPCMessage* msg, int32_t timeout) {
int send_id;
CHECK_EQ(RPCContext::ThreadLocal()->receiver->Recv(
&rpc_meta_msg, &send_id), REMOVE_SUCCESS);
// Copy the data for now, can be optimized later
std::string zerocopy_blob(
rpc_meta_msg.data,
rpc_meta_msg.size-sizeof(int32_t));
char* count_ptr = rpc_meta_msg.data+rpc_meta_msg.size-sizeof(int32_t);
int32_t ndarray_count = *(reinterpret_cast<int32_t*>(count_ptr));
rpc_meta_msg.deallocator(&rpc_meta_msg);
// Recv real ndarray data
std::vector<void* > buffer_list(ndarray_count);
for (int i = 0; i < ndarray_count; ++i) {
......@@ -65,8 +60,9 @@ RPCStatus RecvRPCMessage(RPCMessage* msg, int32_t timeout) {
&ndarray_data_msg, send_id), REMOVE_SUCCESS);
buffer_list[i] = ndarray_data_msg.data;
}
StringStreamWithBuffer zc_read_strm(&zerocopy_blob, buffer_list);
static_cast<dmlc::Stream *>(&zc_read_strm)->Read(msg);
StreamWithBuffer zc_read_strm(rpc_meta_msg.data, rpc_meta_msg.size-sizeof(int32_t), buffer_list);
zc_read_strm.Read(msg);
rpc_meta_msg.deallocator(&rpc_meta_msg);
return kRPCSuccess;
}
......
......@@ -288,7 +288,7 @@ std::shared_ptr<SharedMemory> NDArray::GetSharedMem() const {
void NDArray::Save(dmlc::Stream* strm) const {
auto zc_strm = dynamic_cast<StringStreamWithBuffer*>(strm);
auto zc_strm = dynamic_cast<StreamWithBuffer*>(strm);
if (zc_strm) {
zc_strm->PushNDArray(*this);
return;
......@@ -297,7 +297,7 @@ void NDArray::Save(dmlc::Stream* strm) const {
}
bool NDArray::Load(dmlc::Stream* strm) {
auto zc_strm = dynamic_cast<StringStreamWithBuffer*>(strm);
auto zc_strm = dynamic_cast<StreamWithBuffer*>(strm);
if (zc_strm) {
*this = zc_strm->PopNDArray();
return true;
......
......@@ -48,9 +48,9 @@ TEST(ZeroCopySerialize, NDArray) {
static_cast<dmlc::Stream *>(&ifs)->Write(tensor2);
std::string zerocopy_blob;
StringStreamWithBuffer zc_write_strm(&zerocopy_blob);
static_cast<dmlc::Stream *>(&zc_write_strm)->Write(tensor1);
static_cast<dmlc::Stream *>(&zc_write_strm)->Write(tensor2);
StreamWithBuffer zc_write_strm(&zerocopy_blob, true);
zc_write_strm.Write(tensor1);
zc_write_strm.Write(tensor2);
EXPECT_EQ(nonzerocopy_blob.size() - zerocopy_blob.size(), 126)
<< "Invalid save";
......@@ -64,9 +64,9 @@ TEST(ZeroCopySerialize, NDArray) {
}
NDArray loadtensor1, loadtensor2;
StringStreamWithBuffer zc_read_strm(&zerocopy_blob, new_ptr_list);
static_cast<dmlc::Stream *>(&zc_read_strm)->Read(&loadtensor1);
static_cast<dmlc::Stream *>(&zc_read_strm)->Read(&loadtensor2);
StreamWithBuffer zc_read_strm(&zerocopy_blob, new_ptr_list);
zc_read_strm.Read(&loadtensor1);
zc_read_strm.Read(&loadtensor2);
}
TEST(ZeroCopySerialize, SharedMem) {
......@@ -83,15 +83,15 @@ TEST(ZeroCopySerialize, SharedMem) {
static_cast<dmlc::Stream *>(&ifs)->Write(shared_tensor);
std::string zerocopy_blob;
StringStreamWithBuffer zc_write_strm(&zerocopy_blob, false);
static_cast<dmlc::Stream *>(&zc_write_strm)->Write(shared_tensor);
StreamWithBuffer zc_write_strm(&zerocopy_blob, false);
zc_write_strm.Write(shared_tensor);
EXPECT_EQ(nonzerocopy_blob.size() - zerocopy_blob.size(), 51)
<< "Invalid save";
NDArray loadtensor1;
NDArray loadtensor1, loadtensor2;
StringStreamWithBuffer zc_read_strm(&zerocopy_blob, false);
static_cast<dmlc::Stream *>(&zc_read_strm)->Read(&loadtensor1);
StreamWithBuffer zc_read_strm = StreamWithBuffer(&zerocopy_blob, false);
zc_read_strm.Read(&loadtensor1);
}
TEST(ZeroCopySerialize, HeteroGraph) {
......@@ -114,8 +114,8 @@ TEST(ZeroCopySerialize, HeteroGraph) {
static_cast<dmlc::Stream *>(&ifs)->Write(hrptr);
std::string zerocopy_blob;
StringStreamWithBuffer zc_write_strm(&zerocopy_blob, true);
static_cast<dmlc::Stream *>(&zc_write_strm)->Write(hrptr);
StreamWithBuffer zc_write_strm(&zerocopy_blob, true);
zc_write_strm.Write(hrptr);
EXPECT_EQ(nonzerocopy_blob.size() - zerocopy_blob.size(), 745)
<< "Invalid save";
......@@ -129,8 +129,8 @@ TEST(ZeroCopySerialize, HeteroGraph) {
}
auto gptr = dgl::Serializer::make_shared<HeteroGraph>();
StringStreamWithBuffer zc_read_strm(&zerocopy_blob, new_ptr_list);
static_cast<dmlc::Stream *>(&zc_read_strm)->Read(&gptr);
StreamWithBuffer zc_read_strm(&zerocopy_blob, new_ptr_list);
zc_read_strm.Read(&gptr);
EXPECT_EQ(gptr->NumVertices(0), 9);
EXPECT_EQ(gptr->NumVertices(1), 8);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment