zerocopy_serializer.cc 4.29 KB
Newer Older
1
2
3
4
5
6
7
8
9
/*!
 *  Copyright (c) 2020 by Contributors
 * \file graph/serailize/zerocopy_serializer.cc
 * \brief serializer implementation.
 */

#include <dgl/zerocopy_serializer.h>

#include "dgl/runtime/ndarray.h"
10
#include "dmlc/memory_io.h"
11
12
13
14
15
16
17
18
19
20
21
22
23

namespace dgl {

using dgl::runtime::NDArray;

struct RawDataTensorCtx {
  std::vector<int64_t> shape;
  std::vector<int64_t> stride;
  DLManagedTensor tensor;
};

void RawDataTensoDLPackDeleter(DLManagedTensor* tensor) {
  auto ctx = static_cast<RawDataTensorCtx*>(tensor->manager_ctx);
24
  delete[] ctx->tensor.dl_tensor.data;
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
  delete ctx;
}

NDArray CreateNDArrayFromRawData(std::vector<int64_t> shape, DLDataType dtype,
                                 DLContext ctx, void* raw) {
  auto dlm_tensor_ctx = new RawDataTensorCtx();
  DLManagedTensor* dlm_tensor = &dlm_tensor_ctx->tensor;
  dlm_tensor_ctx->shape = shape;
  dlm_tensor->manager_ctx = dlm_tensor_ctx;
  dlm_tensor->dl_tensor.shape = dmlc::BeginPtr(dlm_tensor_ctx->shape);
  dlm_tensor->dl_tensor.ctx = ctx;
  dlm_tensor->dl_tensor.ndim = static_cast<int>(shape.size());
  dlm_tensor->dl_tensor.dtype = dtype;

  dlm_tensor_ctx->stride.resize(dlm_tensor->dl_tensor.ndim, 1);
  for (int i = dlm_tensor->dl_tensor.ndim - 2; i >= 0; --i) {
    dlm_tensor_ctx->stride[i] =
      dlm_tensor_ctx->shape[i + 1] * dlm_tensor_ctx->stride[i + 1];
  }
  dlm_tensor->dl_tensor.strides = dmlc::BeginPtr(dlm_tensor_ctx->stride);
  dlm_tensor->dl_tensor.data = raw;
  dlm_tensor->deleter = RawDataTensoDLPackDeleter;
  return NDArray::FromDLPack(dlm_tensor);
}

50
void StreamWithBuffer::PushNDArray(const NDArray& tensor) {
51
#ifndef _WIN32
52
53
  this->Write(tensor->ndim);
  this->Write(tensor->dtype);
54
  int ndim = tensor->ndim;
55
  this->WriteArray(tensor->shape, ndim);
56
  CHECK(tensor.IsContiguous())
57
    << "StreamWithBuffer only supports contiguous tensor";
58
  CHECK_EQ(tensor->byte_offset, 0)
59
    << "StreamWithBuffer only supports zero byte offset tensor";
60
61
62
63
64
65
66
67
68
69
70
  int type_bytes = tensor->dtype.bits / 8;
  int64_t num_elems = 1;
  for (int i = 0; i < ndim; ++i) {
    num_elems *= tensor->shape[i];
  }
  int64_t data_byte_size = type_bytes * num_elems;

  auto mem = tensor.GetSharedMem();
  if (send_to_remote_ || !mem) {
    // If the stream is for remote communication or the data is not stored in
    // shared memory, serialize the data content as a buffer.
71
    this->Write<bool>(false);
72
73
74
75
    // If this is a null ndarray, we will not push it into the underlying buffer_list
    if (data_byte_size != 0) {
      buffer_list_.emplace_back(tensor, tensor->data, data_byte_size);
    }
76
77
  } else {
    CHECK(mem) << "Tried to send non-shared-memroy tensor to local "
78
                  "StreamWithBuffer";
79
    // Serialize only the shared memory name.
80
81
    this->Write<bool>(true);
    this->Write(mem->GetName());
82
83
  }
#else
84
  LOG(FATAL) << "StreamWithBuffer is not supported on windows";
85
86
87
88
#endif  // _WIN32
  return;
}

89
NDArray StreamWithBuffer::PopNDArray() {
90
91
92
93
#ifndef _WIN32
  int ndim;
  DLDataType dtype;

94
95
  CHECK(this->Read(&ndim)) << "Invalid DLTensor file format";
  CHECK(this->Read(&dtype)) << "Invalid DLTensor file format";
96
97
98

  std::vector<int64_t> shape(ndim);
  if (ndim != 0) {
99
    CHECK(this->ReadArray(&shape[0], ndim)) << "Invalid DLTensor file format";
100
101
102
103
104
105
106
  }

  DLContext cpu_ctx;
  cpu_ctx.device_type = kDLCPU;
  cpu_ctx.device_id = 0;

  bool is_shared_mem;
107
  CHECK(this->Read(&is_shared_mem)) << "Invalid stream read";
108
109
110
111
  std::string sharedmem_name;
  if (is_shared_mem) {
    CHECK(!send_to_remote_) << "Invalid attempt to deserialize from shared "
                               "memory with send_to_remote=true";
112
    CHECK(this->Read(&sharedmem_name)) << "Invalid stream read";
113
114
115
116
    return NDArray::EmptyShared(sharedmem_name, shape, dtype, cpu_ctx, false);
  } else {
    CHECK(send_to_remote_) << "Invalid attempt to deserialize from raw data "
                              "pointer with send_to_remote=false";
117
118
119
120
121
122
123
124
125
    NDArray ret;
    if (ndim == 0 || shape[0] == 0) {
      // Mean this is a null ndarray
      ret = CreateNDArrayFromRawData(shape, dtype, cpu_ctx, nullptr);
    } else {
      ret = CreateNDArrayFromRawData(shape, dtype, cpu_ctx,
                                     buffer_list_.front().data);
      buffer_list_.pop_front();
    }
126
127
128
    return ret;
  }
#else
129
  LOG(FATAL) << "StreamWithBuffer is not supported on windows";
130
131
132
133
134
  return NDArray();
#endif  // _WIN32
}

}  // namespace dgl