Unverified Commit 6f675eb7 authored by czkkkkkk's avatar czkkkkkk Committed by GitHub
Browse files

[Graphbolt] Read/write CSCSamplingGraph on shared memory. (#5738)


Co-authored-by: default avatarHongzhi (Steve), Chen <chenhongzhi.nkcs@gmail.com>
parent 53714ca8
...@@ -11,7 +11,8 @@ ...@@ -11,7 +11,8 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include "sampled_subgraph.h" #include "./sampled_subgraph.h"
#include "./shared_memory.h"
namespace graphbolt { namespace graphbolt {
namespace sampling { namespace sampling {
...@@ -115,7 +116,39 @@ class CSCSamplingGraph : public torch::CustomClassHolder { ...@@ -115,7 +116,39 @@ class CSCSamplingGraph : public torch::CustomClassHolder {
c10::intrusive_ptr<SampledSubgraph> InSubgraph( c10::intrusive_ptr<SampledSubgraph> InSubgraph(
const torch::Tensor& nodes) const; const torch::Tensor& nodes) const;
/**
* @brief Copy the graph to shared memory.
* @param shared_memory_name The name of the shared memory.
*
* @return A new CSCSamplingGraph object on shared memory.
*/
c10::intrusive_ptr<CSCSamplingGraph> CopyToSharedMemory(
const std::string& shared_memory_name);
/**
* @brief Load the graph from shared memory.
* @param shared_memory_name The name of the shared memory.
*
* @return A new CSCSamplingGraph object on shared memory.
*/
static c10::intrusive_ptr<CSCSamplingGraph> LoadFromSharedMemory(
const std::string& shared_memory_name);
private: private:
/**
* @brief Build a CSCSamplingGraph from shared memory tensors.
*
* @param shared_memory_tensors A tuple of two share memory objects holding
* tensor meta information and data respectively, and a vector of optional
* tensors on shared memory.
*
* @return A new CSCSamplingGraph on shared memory.
*/
static c10::intrusive_ptr<CSCSamplingGraph> BuildGraphFromSharedMemoryTensors(
std::tuple<
SharedMemoryPtr, SharedMemoryPtr,
std::vector<torch::optional<torch::Tensor>>>&& shared_memory_tensors);
/** @brief CSC format index pointer array. */ /** @brief CSC format index pointer array. */
torch::Tensor indptr_; torch::Tensor indptr_;
...@@ -137,6 +170,22 @@ class CSCSamplingGraph : public torch::CustomClassHolder { ...@@ -137,6 +170,22 @@ class CSCSamplingGraph : public torch::CustomClassHolder {
* edge types. The length of it is equal to the number of edges. * edge types. The length of it is equal to the number of edges.
*/ */
torch::optional<torch::Tensor> type_per_edge_; torch::optional<torch::Tensor> type_per_edge_;
/**
* @brief Maximum number of bytes used to serialize the metadata of the
* member tensors, including tensor shape and dtype. The constant is estimated
* by multiplying the number of tensors in this class and the maximum number
* of bytes used to serialize the metadata of a tensor (4 * 8192 for now).
*/
static constexpr int64_t SERIALIZED_METAINFO_SIZE_MAX = 32768;
/**
* @brief Shared memory used to hold the tensor meta information and data of
* this class. By storing its shared memory objects, the graph controls the
* resources of shared memory, which will be released automatically when the
* graph is destroyed.
*/
SharedMemoryPtr tensor_meta_shm_, tensor_data_shm_;
}; };
} // namespace sampling } // namespace sampling
......
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
#include <windows.h> #include <windows.h>
#endif // _WIN32 #endif // _WIN32
#include <memory>
#include <string> #include <string>
namespace graphbolt { namespace graphbolt {
...@@ -100,6 +101,8 @@ class SharedMemory { ...@@ -100,6 +101,8 @@ class SharedMemory {
#endif // _WIN32 #endif // _WIN32
}; };
using SharedMemoryPtr = std::unique_ptr<SharedMemory>;
} // namespace sampling } // namespace sampling
} // namespace graphbolt } // namespace graphbolt
......
...@@ -7,6 +7,8 @@ ...@@ -7,6 +7,8 @@
#include <graphbolt/csc_sampling_graph.h> #include <graphbolt/csc_sampling_graph.h>
#include <graphbolt/serialize.h> #include <graphbolt/serialize.h>
#include "./shared_memory_utils.h"
namespace graphbolt { namespace graphbolt {
namespace sampling { namespace sampling {
...@@ -115,5 +117,35 @@ c10::intrusive_ptr<SampledSubgraph> CSCSamplingGraph::InSubgraph( ...@@ -115,5 +117,35 @@ c10::intrusive_ptr<SampledSubgraph> CSCSamplingGraph::InSubgraph(
: torch::nullopt); : torch::nullopt);
} }
c10::intrusive_ptr<CSCSamplingGraph>
CSCSamplingGraph::BuildGraphFromSharedMemoryTensors(
std::tuple<
SharedMemoryPtr, SharedMemoryPtr,
std::vector<torch::optional<torch::Tensor>>>&& shared_memory_tensors) {
auto& optional_tensors = std::get<2>(shared_memory_tensors);
auto graph = c10::make_intrusive<CSCSamplingGraph>(
optional_tensors[0].value(), optional_tensors[1].value(),
optional_tensors[2], optional_tensors[3]);
graph->tensor_meta_shm_ = std::move(std::get<0>(shared_memory_tensors));
graph->tensor_data_shm_ = std::move(std::get<1>(shared_memory_tensors));
return graph;
}
c10::intrusive_ptr<CSCSamplingGraph> CSCSamplingGraph::CopyToSharedMemory(
const std::string& shared_memory_name) {
auto optional_tensors = std::vector<torch::optional<torch::Tensor>>{
indptr_, indices_, node_type_offset_, type_per_edge_};
auto shared_memory_tensors = CopyTensorsToSharedMemory(
shared_memory_name, optional_tensors, SERIALIZED_METAINFO_SIZE_MAX);
return BuildGraphFromSharedMemoryTensors(std::move(shared_memory_tensors));
}
c10::intrusive_ptr<CSCSamplingGraph> CSCSamplingGraph::LoadFromSharedMemory(
const std::string& shared_memory_name) {
auto shared_memory_tensors = LoadTensorsFromSharedMemory(
shared_memory_name, SERIALIZED_METAINFO_SIZE_MAX);
return BuildGraphFromSharedMemoryTensors(std::move(shared_memory_tensors));
}
} // namespace sampling } // namespace sampling
} // namespace graphbolt } // namespace graphbolt
...@@ -28,10 +28,12 @@ TORCH_LIBRARY(graphbolt, m) { ...@@ -28,10 +28,12 @@ TORCH_LIBRARY(graphbolt, m) {
.def("indices", &CSCSamplingGraph::Indices) .def("indices", &CSCSamplingGraph::Indices)
.def("node_type_offset", &CSCSamplingGraph::NodeTypeOffset) .def("node_type_offset", &CSCSamplingGraph::NodeTypeOffset)
.def("type_per_edge", &CSCSamplingGraph::TypePerEdge) .def("type_per_edge", &CSCSamplingGraph::TypePerEdge)
.def("in_subgraph", &CSCSamplingGraph::InSubgraph); .def("in_subgraph", &CSCSamplingGraph::InSubgraph)
.def("copy_to_shared_memory", &CSCSamplingGraph::CopyToSharedMemory);
m.def("from_csc", &CSCSamplingGraph::FromCSC); m.def("from_csc", &CSCSamplingGraph::FromCSC);
m.def("load_csc_sampling_graph", &LoadCSCSamplingGraph); m.def("load_csc_sampling_graph", &LoadCSCSamplingGraph);
m.def("save_csc_sampling_graph", &SaveCSCSamplingGraph); m.def("save_csc_sampling_graph", &SaveCSCSamplingGraph);
m.def("load_from_shared_memory", &CSCSamplingGraph::LoadFromSharedMemory);
} }
} // namespace sampling } // namespace sampling
......
/**
* Copyright (c) 2023 by Contributors
*
* @file shared_memory_utils.cc
* @brief Share memory utility function implementation.
*/
#include "./shared_memory_utils.h"
#include <graphbolt/serialize.h>
#include <graphbolt/shared_memory.h>
namespace graphbolt {
namespace sampling {
static SharedMemoryPtr CopyTorchArchiveToSharedMemory(
const std::string& name, int64_t size,
torch::serialize::OutputArchive& archive) {
std::stringstream serialized;
archive.save_to(serialized);
auto serialized_str = serialized.str();
auto shm = std::make_unique<SharedMemory>(name);
auto mem_buf = shm->Create(size);
// Use the first 8 bytes to store the size of the serialized string.
static_cast<int64_t*>(mem_buf)[0] = serialized_str.size();
memcpy(
(char*)mem_buf + sizeof(int64_t), serialized_str.data(),
serialized_str.size());
return shm;
}
static SharedMemoryPtr LoadTorchArchiveFromSharedMemory(
const std::string& name, int64_t max_meta_size,
torch::serialize::InputArchive& archive) {
auto shm = std::make_unique<SharedMemory>(name);
auto mem_buf = shm->Open(max_meta_size);
int64_t meta_size = static_cast<int64_t*>(mem_buf)[0];
archive.load_from(
static_cast<const char*>(mem_buf) + sizeof(int64_t), meta_size);
return shm;
}
static SharedMemoryPtr CopyTensorsDataToSharedMemory(
const std::string& name,
const std::vector<torch::optional<torch::Tensor>>& tensors) {
int64_t memory_size = 0;
for (const auto& optional_tensor : tensors) {
if (optional_tensor.has_value()) {
auto tensor = optional_tensor.value();
memory_size += tensor.numel() * tensor.element_size();
}
}
auto shm = std::make_unique<SharedMemory>(name);
auto mem_buf = shm->Create(memory_size);
for (auto optional_tensor : tensors) {
if (optional_tensor.has_value()) {
auto tensor = optional_tensor.value().contiguous();
int64_t size = tensor.numel() * tensor.element_size();
memcpy(mem_buf, tensor.data_ptr(), size);
mem_buf = static_cast<char*>(mem_buf) + size;
}
}
return shm;
}
/**
* @brief Load tensors data from shared memory.
* @param name The name of shared memory.
* @param tensor_metas The meta info of tensors, including a flag indicating
* whether the optional tensor has value, tensor shape and dtype.
*
* @return A pair of shared memory holding the tensors.
*/
static std::pair<SharedMemoryPtr, std::vector<torch::optional<torch::Tensor>>>
LoadTensorsDataFromSharedMemory(
const std::string& name,
const std::vector<
std::tuple<bool, std::vector<int64_t>, torch::ScalarType>>&
tensor_metas) {
auto shm = std::make_unique<SharedMemory>(name);
int64_t memory_size = 0;
for (const auto& meta : tensor_metas) {
if (std::get<0>(meta)) {
int64_t size = std::accumulate(
std::get<1>(meta).begin(), std::get<1>(meta).end(), 1,
std::multiplies<int64_t>());
memory_size += size * torch::elementSize(std::get<2>(meta));
}
}
auto mem_buf = shm->Open(memory_size);
std::vector<torch::optional<torch::Tensor>> optional_tensors;
for (const auto& meta : tensor_metas) {
if (std::get<0>(meta)) {
auto tensor =
torch::from_blob(mem_buf, std::get<1>(meta), std::get<2>(meta));
optional_tensors.push_back(tensor);
int64_t size = std::accumulate(
std::get<1>(meta).begin(), std::get<1>(meta).end(), 1,
std::multiplies<int64_t>());
mem_buf = static_cast<char*>(mem_buf) +
size * torch::elementSize(std::get<2>(meta));
} else {
optional_tensors.push_back(torch::nullopt);
}
}
return std::make_pair(std::move(shm), std::move(optional_tensors));
}
SharedMemoryTensors CopyTensorsToSharedMemory(
const std::string& name,
const std::vector<torch::optional<torch::Tensor>>& tensors,
int64_t max_meta_memory_size) {
torch::serialize::OutputArchive archive;
archive.write("num_tensors", static_cast<int64_t>(tensors.size()));
for (size_t i = 0; i < tensors.size(); ++i) {
archive.write(
"tensor_" + std::to_string(i) + "_has_value", tensors[i].has_value());
if (tensors[i].has_value()) {
archive.write(
"tensor_" + std::to_string(i) + "_shape", tensors[i].value().sizes());
archive.write(
"tensor_" + std::to_string(i) + "_dtype",
tensors[i].value().scalar_type());
}
}
auto meta_shm = CopyTorchArchiveToSharedMemory(
name + "_meta", max_meta_memory_size, archive);
auto data_shm = CopyTensorsDataToSharedMemory(name + "_data", tensors);
std::vector<torch::optional<torch::Tensor>> ret_tensors;
auto mem_buf = data_shm->GetMemory();
for (auto optional_tensor : tensors) {
if (optional_tensor.has_value()) {
auto tensor = optional_tensor.value();
ret_tensors.push_back(
torch::from_blob(mem_buf, tensor.sizes(), tensor.dtype()));
int64_t size = tensor.numel() * tensor.element_size();
mem_buf = static_cast<char*>(mem_buf) + size;
} else {
ret_tensors.push_back(torch::nullopt);
}
}
return std::make_tuple(
std::move(meta_shm), std::move(data_shm), std::move(ret_tensors));
}
SharedMemoryTensors LoadTensorsFromSharedMemory(
const std::string& name, int64_t meta_memory_size) {
torch::serialize::InputArchive archive;
auto meta_shm = LoadTorchArchiveFromSharedMemory(
name + "_meta", meta_memory_size, archive);
std::vector<std::tuple<bool, std::vector<int64_t>, torch::ScalarType>> metas;
int64_t num_tensors = read_from_archive(archive, "num_tensors").toInt();
for (int64_t i = 0; i < num_tensors; ++i) {
bool has_value =
read_from_archive(archive, "tensor_" + std::to_string(i) + "_has_value")
.toBool();
if (has_value) {
auto shape =
read_from_archive(archive, "tensor_" + std::to_string(i) + "_shape")
.toIntVector();
auto dtype =
read_from_archive(archive, "tensor_" + std::to_string(i) + "_dtype")
.toScalarType();
metas.push_back({true, shape, dtype});
} else {
metas.push_back({false, {}, torch::ScalarType::Undefined});
}
}
SharedMemoryPtr data_shm;
std::vector<torch::optional<torch::Tensor>> ret_tensors;
std::tie(data_shm, ret_tensors) =
LoadTensorsDataFromSharedMemory(name + "_data", metas);
return std::make_tuple(
std::move(meta_shm), std::move(data_shm), std::move(ret_tensors));
}
} // namespace sampling
} // namespace graphbolt
/**
* Copyright (c) 2023 by Contributors
*
* @file shared_memory_utils.h
* @brief Share memory utilities.
*/
#ifndef GRAPHBOLT_SHM_UTILS_H_
#define GRAPHBOLT_SHM_UTILS_H_
#include <graphbolt/shared_memory.h>
#include <torch/torch.h>
#include <memory>
#include <sstream>
#include <string>
#include <vector>
namespace graphbolt {
namespace sampling {
/**
* @brief SharedMemoryTensors includes: (1) two share memory objects holding
* tensor meta information and data respectively; (2) a vector of optional
* tensors on shared memory.
*/
using SharedMemoryTensors = std::tuple<
SharedMemoryPtr, SharedMemoryPtr,
std::vector<torch::optional<torch::Tensor>>>;
/**
* @brief Copy torch tensors to shared memory.
*
* To simpilfy this interface, a regular tensor is also wrapped as an optional
* one.
*
* The function has two steps:
* 1. Copy meta info to shared memory `shared_memory_name + "_meta"`. This is to
* make sure that other loading processes can get the meta info of tensors.
* 2. Copy tensors to shared memory `shared_memory_name + "_data"`, which can be
* loaded by other processes with meta info.
*
* The order of tensors loaded from `LoadTensorsFromSharedMemory` will be
* exactly the same as the tensors copied from `CopyTensorsToSharedMemory`.
*
* @param name The name of shared memory.
* @param tensors The tensors to copy.
* @param max_meta_memory_size The maximum size of meta memory.
*
* @return A tuple of tensor meta shared memory, tensor data shared memory, and
* shared optional tensors.
*/
SharedMemoryTensors CopyTensorsToSharedMemory(
const std::string& name,
const std::vector<torch::optional<torch::Tensor>>& tensors,
int64_t max_meta_memory_size);
/**
* @brief Load torch tensors from shared memory.
*
* @param name The name of shared memory.
* @param max_meta_memory_size The maximum size of meta memory.
*
* @return A tuple of tensor meta shared memory, tensor data shared memory,
* and shared tensors.
*/
SharedMemoryTensors LoadTensorsFromSharedMemory(
const std::string& name, int64_t max_meta_memory_size);
} // namespace sampling
} // namespace graphbolt
#endif // GRAPHBOLT_SHM_UTILS_H_
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment