zerocopy_serializer.cc 3.23 KB
Newer Older
1
/*!
2
 *  Copyright (c) 2020-2022 by Contributors
3
4
5
6
7
8
9
 * \file graph/serailize/zerocopy_serializer.cc
 * \brief serializer implementation.
 */

#include <dgl/zerocopy_serializer.h>

#include "dgl/runtime/ndarray.h"
10
#include "dmlc/memory_io.h"
11
12
13
14
15

namespace dgl {

using dgl::runtime::NDArray;

16
17
NDArray CreateNDArrayFromRawData(
    std::vector<int64_t> shape, DGLDataType dtype, DGLContext ctx, void* raw) {
18
  return NDArray::CreateFromRaw(shape, dtype, ctx, raw, true);
19
20
}

21
void StreamWithBuffer::PushNDArray(const NDArray& tensor) {
22
#ifndef _WIN32
23
24
  this->Write(tensor->ndim);
  this->Write(tensor->dtype);
25
  int ndim = tensor->ndim;
26
  this->WriteArray(tensor->shape, ndim);
27
  CHECK(tensor.IsContiguous())
28
      << "StreamWithBuffer only supports contiguous tensor";
29
  CHECK_EQ(tensor->byte_offset, 0)
30
      << "StreamWithBuffer only supports zero byte offset tensor";
31
32
33
34
35
36
37
38
39
40
41
  int type_bytes = tensor->dtype.bits / 8;
  int64_t num_elems = 1;
  for (int i = 0; i < ndim; ++i) {
    num_elems *= tensor->shape[i];
  }
  int64_t data_byte_size = type_bytes * num_elems;

  auto mem = tensor.GetSharedMem();
  if (send_to_remote_ || !mem) {
    // If the stream is for remote communication or the data is not stored in
    // shared memory, serialize the data content as a buffer.
42
    this->Write<bool>(false);
43
44
    // If this is a null ndarray, we will not push it into the underlying
    // buffer_list
45
46
47
    if (data_byte_size != 0) {
      buffer_list_.emplace_back(tensor, tensor->data, data_byte_size);
    }
48
49
  } else {
    CHECK(mem) << "Tried to send non-shared-memroy tensor to local "
50
                  "StreamWithBuffer";
51
    // Serialize only the shared memory name.
52
53
    this->Write<bool>(true);
    this->Write(mem->GetName());
54
55
  }
#else
56
  LOG(FATAL) << "StreamWithBuffer is not supported on windows";
57
58
59
60
#endif  // _WIN32
  return;
}

61
NDArray StreamWithBuffer::PopNDArray() {
62
63
#ifndef _WIN32
  int ndim;
64
  DGLDataType dtype;
65

66
67
  CHECK(this->Read(&ndim)) << "Invalid DGLArray file format";
  CHECK(this->Read(&dtype)) << "Invalid DGLArray file format";
68
69
70

  std::vector<int64_t> shape(ndim);
  if (ndim != 0) {
71
    CHECK(this->ReadArray(&shape[0], ndim)) << "Invalid DGLArray file format";
72
73
  }

74
75
  DGLContext cpu_ctx;
  cpu_ctx.device_type = kDGLCPU;
76
77
78
  cpu_ctx.device_id = 0;

  bool is_shared_mem;
79
  CHECK(this->Read(&is_shared_mem)) << "Invalid stream read";
80
81
82
83
  std::string sharedmem_name;
  if (is_shared_mem) {
    CHECK(!send_to_remote_) << "Invalid attempt to deserialize from shared "
                               "memory with send_to_remote=true";
84
    CHECK(this->Read(&sharedmem_name)) << "Invalid stream read";
85
86
87
88
    return NDArray::EmptyShared(sharedmem_name, shape, dtype, cpu_ctx, false);
  } else {
    CHECK(send_to_remote_) << "Invalid attempt to deserialize from raw data "
                              "pointer with send_to_remote=false";
89
90
91
92
93
    NDArray ret;
    if (ndim == 0 || shape[0] == 0) {
      // Mean this is a null ndarray
      ret = CreateNDArrayFromRawData(shape, dtype, cpu_ctx, nullptr);
    } else {
94
95
      ret = CreateNDArrayFromRawData(
          shape, dtype, cpu_ctx, buffer_list_.front().data);
96
97
      buffer_list_.pop_front();
    }
98
99
100
    return ret;
  }
#else
101
  LOG(FATAL) << "StreamWithBuffer is not supported on windows";
102
103
104
105
106
  return NDArray();
#endif  // _WIN32
}

}  // namespace dgl