Unverified Commit 5db90b0e authored by Rhett Ying's avatar Rhett Ying Committed by GitHub
Browse files

[GraphBolt] avoid pre-defined metadata size (#6700)

parent 2c325b2d
......@@ -374,14 +374,6 @@ class FusedCSCSamplingGraph : public torch::CustomClassHolder {
*/
torch::optional<EdgeAttrMap> edge_attributes_;
/**
* @brief Maximum number of bytes used to serialize the metadata of the
* member tensors, including tensor shape and dtype. The constant is estimated
* by multiplying the number of tensors in this class and the maximum number
* of bytes used to serialize the metadata of a tensor (10 * 8192 for now).
*/
static constexpr int64_t SERIALIZED_METAINFO_SIZE_MAX = 10 * 81920;
/**
* @brief Shared memory used to hold the tensor metadata and data of this
* class. By storing its shared memory objects, the graph controls the
......
......@@ -46,6 +46,9 @@ class SharedMemory {
/** @brief Get the pointer to the shared memory. */
void* GetMemory() const { return ptr_; }
/** @brief Get the size of the shared memory. */
size_t GetSize() const { return size_; }
/**
* @brief Creates the shared memory object and map the shared memory.
*
......@@ -57,10 +60,8 @@ class SharedMemory {
/**
* @brief Open the created shared memory object and map the shared memory.
*
* @param size The size of the shared memory.
* @return The pointer to the shared memory.
*/
void* Open(size_t size);
void* Open();
/**
* @brief Check if the shared memory exists.
......
......@@ -609,7 +609,7 @@ BuildGraphFromSharedMemoryHelper(SharedMemoryHelper&& helper) {
c10::intrusive_ptr<FusedCSCSamplingGraph>
FusedCSCSamplingGraph::CopyToSharedMemory(
const std::string& shared_memory_name) {
SharedMemoryHelper helper(shared_memory_name, SERIALIZED_METAINFO_SIZE_MAX);
SharedMemoryHelper helper(shared_memory_name);
helper.WriteTorchTensor(indptr_);
helper.WriteTorchTensor(indices_);
helper.WriteTorchTensor(node_type_offset_);
......@@ -624,7 +624,7 @@ FusedCSCSamplingGraph::CopyToSharedMemory(
c10::intrusive_ptr<FusedCSCSamplingGraph>
FusedCSCSamplingGraph::LoadFromSharedMemory(
const std::string& shared_memory_name) {
SharedMemoryHelper helper(shared_memory_name, SERIALIZED_METAINFO_SIZE_MAX);
SharedMemoryHelper helper(shared_memory_name);
return BuildGraphFromSharedMemoryHelper(std::move(helper));
}
......
......@@ -6,6 +6,7 @@
#ifndef _WIN32
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#endif // !_WIN32
......@@ -63,18 +64,24 @@ void* SharedMemory::Create(size_t size) {
return ptr_;
}
void* SharedMemory::Open(size_t size) {
size_ = size;
void* SharedMemory::Open() {
std::string decorated_name = DecorateName(name_);
handle_ = OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, decorated_name.c_str());
TORCH_CHECK(
handle_ != nullptr, "Failed to open ", decorated_name,
", Win32 Error: ", GetLastError());
ptr_ = MapViewOfFile(handle_, FILE_MAP_ALL_ACCESS, 0, 0, size);
ptr_ = MapViewOfFile(handle_, FILE_MAP_ALL_ACCESS, 0, 0, 0);
TORCH_CHECK(
ptr_ != nullptr, "Memory mapping failed, Win32 error: ", GetLastError());
// Obtain the size of the memory-mapped file.
MEMORY_BASIC_INFORMATION memInfo;
TORCH_CHECK(
VirtualQuery(ptr_, &memInfo, sizeof(memInfo)) != 0,
"Failed to get the size of shared memory: ", GetLastError());
size_ = static_cast<size_t>(memInfo.RegionSize);
return ptr_;
}
......@@ -121,9 +128,7 @@ void *SharedMemory::Create(size_t size) {
return ptr_;
}
void *SharedMemory::Open(size_t size) {
size_ = size;
void *SharedMemory::Open() {
std::string decorated_name = DecorateName(name_);
file_descriptor_ =
shm_open(decorated_name.c_str(), O_RDWR, S_IRUSR | S_IWUSR);
......@@ -131,8 +136,14 @@ void *SharedMemory::Open(size_t size) {
file_descriptor_ != -1, "Failed to open ", decorated_name, ": ",
strerror(errno));
ptr_ =
mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, file_descriptor_, 0);
struct stat shm_stat;
TORCH_CHECK(
fstat(file_descriptor_, &shm_stat) == 0,
"Failed to get the size of shared memory: ", strerror(errno));
size_ = shm_stat.st_size;
ptr_ = mmap(
NULL, size_, PROT_READ | PROT_WRITE, MAP_SHARED, file_descriptor_, 0);
TORCH_CHECK(
ptr_ != MAP_FAILED,
"Failed to map shared memory, mmap failed with error: ", strerror(errno));
......
......@@ -33,10 +33,10 @@ inline static int64_t GetRoundedSize(int64_t size) {
return (size + ALIGNED_SIZE - 1) / ALIGNED_SIZE * ALIGNED_SIZE;
}
SharedMemoryHelper::SharedMemoryHelper(
const std::string& name, int64_t max_metadata_size)
SharedMemoryHelper::SharedMemoryHelper(const std::string& name)
: name_(name),
max_metadata_size_(max_metadata_size),
metadata_size_(0),
data_size_(0),
metadata_shared_memory_(nullptr),
data_shared_memory_(nullptr),
metadata_offset_(0),
......@@ -49,16 +49,12 @@ void SharedMemoryHelper::InitializeRead() {
// Reader process opens the shared memory.
metadata_shared_memory_ =
std::make_unique<SharedMemory>(GetSharedMemoryMetadataName(name_));
metadata_shared_memory_->Open(max_metadata_size_);
auto archive = this->ReadTorchArchive();
int64_t data_size = read_from_archive(archive, "data_size").toInt();
metadata_shared_memory_->Open();
metadata_size_ = metadata_shared_memory_->GetSize();
data_shared_memory_ =
std::make_unique<SharedMemory>(GetSharedMemoryDataName(name_));
data_shared_memory_->Open(data_size);
} else {
// Writer process already has the shared memory.
// Skip the first archive recording data size before read.
this->ReadTorchArchive();
data_shared_memory_->Open();
data_size_ = data_shared_memory_->GetSize();
}
}
......@@ -146,18 +142,27 @@ SharedMemoryHelper::ReadTorchTensorDict() {
return tensor_dict;
}
void SharedMemoryHelper::WriteTorchArchiveInternal(
torch::serialize::OutputArchive& archive) {
void SharedMemoryHelper::SerializeMetadata() {
for (auto& archive : metadata_to_write_) {
std::stringstream serialized;
archive.save_to(serialized);
auto serialized_str = serialized.str();
metadata_strings_to_write_.push_back(std::move(serialized.str()));
}
metadata_to_write_.clear();
}
void SharedMemoryHelper::WriteMetadataToSharedMemory() {
metadata_offset_ = 0;
for (const auto& str : metadata_strings_to_write_) {
auto metadata_ptr = this->GetCurrentMetadataPtr();
static_cast<int64_t*>(metadata_ptr)[0] = serialized_str.size();
static_cast<int64_t*>(metadata_ptr)[0] = str.size();
memcpy(
static_cast<char*>(metadata_ptr) + sizeof(int64_t), serialized_str.data(),
serialized_str.size());
int64_t rounded_size = GetRoundedSize(serialized_str.size());
static_cast<char*>(metadata_ptr) + sizeof(int64_t), str.data(),
str.size());
int64_t rounded_size = GetRoundedSize(str.size());
this->MoveMetadataPtr(sizeof(int64_t) + rounded_size);
}
metadata_strings_to_write_.clear();
}
void SharedMemoryHelper::WriteTorchTensorInternal(
......@@ -172,8 +177,6 @@ void SharedMemoryHelper::WriteTorchTensorInternal(
}
void SharedMemoryHelper::Flush() {
// The first archive records the size of the tensor data.
torch::serialize::OutputArchive archive;
size_t data_size = 0;
for (auto tensor : tensors_to_write_) {
if (tensor.has_value()) {
......@@ -181,23 +184,32 @@ void SharedMemoryHelper::Flush() {
data_size += GetRoundedSize(tensor_size);
}
}
archive.write("data_size", static_cast<int64_t>(data_size));
// Serialize the metadata archives.
SerializeMetadata();
// Create the shared memory objects.
const size_t metadata_size = std::accumulate(
metadata_strings_to_write_.begin(), metadata_strings_to_write_.end(), 0,
[](size_t sum, const std::string& str) {
return sum + sizeof(int64_t) + GetRoundedSize(str.size());
});
metadata_shared_memory_ =
std::make_unique<SharedMemory>(GetSharedMemoryMetadataName(name_));
metadata_shared_memory_->Create(max_metadata_size_);
metadata_offset_ = 0;
this->WriteTorchArchiveInternal(archive);
for (auto& archive : metadata_to_write_) {
this->WriteTorchArchiveInternal(archive);
}
metadata_shared_memory_->Create(metadata_size);
metadata_size_ = metadata_size;
// Write the metadata and tensor data to the shared memory.
WriteMetadataToSharedMemory();
data_shared_memory_ =
std::make_unique<SharedMemory>(GetSharedMemoryDataName(name_));
data_shared_memory_->Create(data_size);
data_size_ = data_size;
data_offset_ = 0;
for (auto tensor : tensors_to_write_) {
this->WriteTorchTensorInternal(tensor);
}
metadata_to_write_.clear();
tensors_to_write_.clear();
}
......
......@@ -29,8 +29,8 @@ namespace sampling {
* solve this problem, we use two shared memory objects: one for storing the
* metadata and the other for storing the binary buffer. The metadata includes
* the metadata of data structures such as size and shape. The size of the
* metadata is decided by the user via `max_metadata_size`. The size of the
* binary buffer is decided by the size of the data structures.
* metadata is decided by the size of metadata. The size of the binary buffer is
* decided by the size of the data structures.
*
* To avoid repeated shared memory allocation, this helper class uses lazy data
* structure writing. The data structures are written to the shared memory only
......@@ -69,9 +69,8 @@ class SharedMemoryHelper {
/**
* @brief Constructor of the shared memory helper.
* @param name The name of the shared memory.
* @param max_metadata_size The maximum size of metadata.
*/
SharedMemoryHelper(const std::string& name, int64_t max_metadata_size);
SharedMemoryHelper(const std::string& name);
/** @brief Initialize this helper class before reading. */
void InitializeRead();
......@@ -94,11 +93,15 @@ class SharedMemoryHelper {
std::pair<SharedMemoryPtr, SharedMemoryPtr> ReleaseSharedMemory();
private:
/**
* @brief Serialize metadata to string.
*/
void SerializeMetadata();
/**
* @brief Write the metadata to the shared memory. This function is
* called by `Flush`.
*/
void WriteTorchArchiveInternal(torch::serialize::OutputArchive& archive);
void WriteMetadataToSharedMemory();
/**
* @brief Write the tensor data to the shared memory. This function is
* called by `Flush`.
......@@ -114,26 +117,33 @@ class SharedMemoryHelper {
}
inline void MoveMetadataPtr(int64_t offset) {
TORCH_CHECK(
metadata_offset_ + offset <= max_metadata_size_,
metadata_offset_ + offset <= metadata_size_,
"The size of metadata exceeds the maximum size of shared memory.");
metadata_offset_ += offset;
}
inline void MoveDataPtr(int64_t offset) { data_offset_ += offset; }
inline void MoveDataPtr(int64_t offset) {
TORCH_CHECK(
data_offset_ + offset <= data_size_,
"The size of data exceeds the maximum size of shared memory.");
data_offset_ += offset;
}
std::string name_;
bool is_creator_;
int64_t max_metadata_size_;
size_t metadata_size_;
size_t data_size_;
// The shared memory objects for storing metadata and tensor data.
SharedMemoryPtr metadata_shared_memory_, data_shared_memory_;
// The read/write offsets of the metadata and tensor data.
int64_t metadata_offset_, data_offset_;
size_t metadata_offset_, data_offset_;
// The data structures to write to the shared memory. They are written to the
// shared memory only when `Flush` is called.
std::vector<torch::serialize::OutputArchive> metadata_to_write_;
std::vector<std::string> metadata_strings_to_write_;
std::vector<torch::optional<torch::Tensor>> tensors_to_write_;
};
......
......@@ -1144,9 +1144,11 @@ def test_homo_graph_on_shared_memory(
)
@pytest.mark.parametrize(
"total_num_nodes, total_num_edges",
[(1, 1), (100, 1), (10, 50), (1000, 50000)],
[(1, 1), (100, 1), (10, 50), (1000, 50 * 1000), (10 * 1000, 100 * 1000)],
)
@pytest.mark.parametrize(
"num_ntypes, num_etypes", [(1, 1), (3, 5), (100, 1), (1000, 1000)]
)
@pytest.mark.parametrize("num_ntypes, num_etypes", [(1, 1), (3, 5), (100, 1)])
@pytest.mark.parametrize("test_edge_attrs", [True, False])
def test_hetero_graph_on_shared_memory(
total_num_nodes, total_num_edges, num_ntypes, num_etypes, test_edge_attrs
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment