zerocopy_serializer.cc 3.27 KB
Newer Older
1
/*!
2
 *  Copyright (c) 2020-2022 by Contributors
3
4
5
6
7
8
9
 * \file graph/serailize/zerocopy_serializer.cc
 * \brief serializer implementation.
 */

#include <dgl/zerocopy_serializer.h>

#include "dgl/runtime/ndarray.h"
10
#include "dmlc/memory_io.h"
11
12
13
14
15

namespace dgl {

using dgl::runtime::NDArray;

16
17
18
NDArray CreateNDArrayFromRawData(std::vector<int64_t> shape, DGLDataType dtype,
                                 DGLContext ctx, void* raw) {
  return NDArray::CreateFromRaw(shape, dtype, ctx, raw, true);
19
20
}

21
void StreamWithBuffer::PushNDArray(const NDArray& tensor) {
22
#ifndef _WIN32
23
24
  this->Write(tensor->ndim);
  this->Write(tensor->dtype);
25
  int ndim = tensor->ndim;
26
  this->WriteArray(tensor->shape, ndim);
27
  CHECK(tensor.IsContiguous())
28
    << "StreamWithBuffer only supports contiguous tensor";
29
  CHECK_EQ(tensor->byte_offset, 0)
30
    << "StreamWithBuffer only supports zero byte offset tensor";
31
32
33
34
35
36
37
38
39
40
41
  int type_bytes = tensor->dtype.bits / 8;
  int64_t num_elems = 1;
  for (int i = 0; i < ndim; ++i) {
    num_elems *= tensor->shape[i];
  }
  int64_t data_byte_size = type_bytes * num_elems;

  auto mem = tensor.GetSharedMem();
  if (send_to_remote_ || !mem) {
    // If the stream is for remote communication or the data is not stored in
    // shared memory, serialize the data content as a buffer.
42
    this->Write<bool>(false);
43
44
45
46
    // If this is a null ndarray, we will not push it into the underlying buffer_list
    if (data_byte_size != 0) {
      buffer_list_.emplace_back(tensor, tensor->data, data_byte_size);
    }
47
48
  } else {
    CHECK(mem) << "Tried to send non-shared-memroy tensor to local "
49
                  "StreamWithBuffer";
50
    // Serialize only the shared memory name.
51
52
    this->Write<bool>(true);
    this->Write(mem->GetName());
53
54
  }
#else
55
  LOG(FATAL) << "StreamWithBuffer is not supported on windows";
56
57
58
59
#endif  // _WIN32
  return;
}

60
NDArray StreamWithBuffer::PopNDArray() {
61
62
#ifndef _WIN32
  int ndim;
63
  DGLDataType dtype;
64

65
66
  CHECK(this->Read(&ndim)) << "Invalid DGLArray file format";
  CHECK(this->Read(&dtype)) << "Invalid DGLArray file format";
67
68
69

  std::vector<int64_t> shape(ndim);
  if (ndim != 0) {
70
    CHECK(this->ReadArray(&shape[0], ndim)) << "Invalid DGLArray file format";
71
72
  }

73
74
  DGLContext cpu_ctx;
  cpu_ctx.device_type = kDGLCPU;
75
76
77
  cpu_ctx.device_id = 0;

  bool is_shared_mem;
78
  CHECK(this->Read(&is_shared_mem)) << "Invalid stream read";
79
80
81
82
  std::string sharedmem_name;
  if (is_shared_mem) {
    CHECK(!send_to_remote_) << "Invalid attempt to deserialize from shared "
                               "memory with send_to_remote=true";
83
    CHECK(this->Read(&sharedmem_name)) << "Invalid stream read";
84
85
86
87
    return NDArray::EmptyShared(sharedmem_name, shape, dtype, cpu_ctx, false);
  } else {
    CHECK(send_to_remote_) << "Invalid attempt to deserialize from raw data "
                              "pointer with send_to_remote=false";
88
89
90
91
92
93
94
95
96
    NDArray ret;
    if (ndim == 0 || shape[0] == 0) {
      // Mean this is a null ndarray
      ret = CreateNDArrayFromRawData(shape, dtype, cpu_ctx, nullptr);
    } else {
      ret = CreateNDArrayFromRawData(shape, dtype, cpu_ctx,
                                     buffer_list_.front().data);
      buffer_list_.pop_front();
    }
97
98
99
    return ret;
  }
#else
100
  LOG(FATAL) << "StreamWithBuffer is not supported on windows";
101
102
103
104
105
  return NDArray();
#endif  // _WIN32
}

}  // namespace dgl