Unverified Commit b725ee53 authored by xiangyuzhi's avatar xiangyuzhi Committed by GitHub
Browse files

[Graphbolt] Disk feature fetcher (#7128)


Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-40-250.ap-northeast-1.compute.internal>
parent fde1896f
......@@ -355,7 +355,7 @@ if(CMAKE_SYSTEM_NAME MATCHES "Linux")
ExternalProject_Add(
liburing
SOURCE_DIR ${CMAKE_SOURCE_DIR}/third_party/liburing
CONFIGURE_COMMAND <SOURCE_DIR>/configure --prefix=${LIBURING_INSTALL_DIR}
CONFIGURE_COMMAND <SOURCE_DIR>/configure --prefix=/
BUILD_COMMAND bash -c "make -j 4"
BUILD_IN_SOURCE ON
INSTALL_COMMAND make install DESTDIR=${LIBURING_INSTALL_DIR}
......@@ -588,4 +588,7 @@ if(BUILD_GRAPHBOLT)
if(USE_CUDA)
add_dependencies(graphbolt gpu_cache)
endif(USE_CUDA)
if(CMAKE_SYSTEM_NAME MATCHES "Linux")
add_dependencies(graphbolt liburing)
endif(USE_CUDA)
endif(BUILD_GRAPHBOLT)
......@@ -66,6 +66,11 @@ target_include_directories(${LIB_GRAPHBOLT_NAME} PRIVATE ${BOLT_DIR}
"../third_party/dmlc-core/include"
"../third_party/pcg/include")
target_link_libraries(${LIB_GRAPHBOLT_NAME} "${TORCH_LIBRARIES}")
if(CMAKE_SYSTEM_NAME MATCHES "Linux")
target_include_directories(${LIB_GRAPHBOLT_NAME} PRIVATE "../third_party/liburing/src/include")
get_filename_component(PARENT_DIR "${CMAKE_SOURCE_DIR}" DIRECTORY)
target_link_libraries(${LIB_GRAPHBOLT_NAME} ${PARENT_DIR}/build/third_party/liburing/lib/liburing.a)
endif()
if(USE_CUDA)
set_target_properties(${LIB_GRAPHBOLT_NAME} PROPERTIES CUDA_STANDARD 17)
......@@ -91,6 +96,3 @@ if(DEFINED MKL_LIBRARIES)
target_link_directories(${LIB_GRAPHBOLT_NAME} PRIVATE
${MKL_ROOT}/lib/${MKL_ARCH})
endif()
target_include_directories(${LIB_GRAPHBOLT_NAME} PRIVATE ${LIBURING_INCLUDE})
target_link_libraries(${LIB_GRAPHBOLT_NAME} ${LIBURING})
/**
* Copyright (c) 2023 by Contributors
* @file cnumpy.cc
* @brief Numpy File Fetecher class.
*/
#include "cnumpy.h"
#include <torch/torch.h>
#include <cstring>
#include <regex>
#include <stdexcept>
namespace graphbolt {
namespace storage {
static constexpr int kDiskAlignmentSize = 4096;
OnDiskNpyArray::OnDiskNpyArray(
std::string filename, torch::ScalarType dtype, torch::Tensor shape)
: filename_(filename), dtype_(dtype) {
#ifdef __linux__
ParseNumpyHeader(shape);
file_description_ = open(filename.c_str(), O_RDONLY | O_DIRECT);
if (file_description_ == -1) {
throw std::runtime_error("npy_load: Unable to open file " + filename);
}
// Get system max thread number.
num_thread_ = torch::get_num_threads();
io_uring_queue_ = new io_uring[num_thread_];
// Init io_uring queue.
for (int64_t t = 0; t < num_thread_; t++) {
io_uring_queue_init(group_size_, &io_uring_queue_[t], 0);
}
#endif // __linux__
}
c10::intrusive_ptr<OnDiskNpyArray> OnDiskNpyArray::Create(
std::string path, torch::ScalarType dtype, torch::Tensor shape) {
return c10::make_intrusive<OnDiskNpyArray>(path, dtype, shape);
}
OnDiskNpyArray::~OnDiskNpyArray() {
#ifdef __linux__
// IO queue exit.
for (int64_t t = 0; t < num_thread_; t++) {
io_uring_queue_exit(&io_uring_queue_[t]);
}
close(file_description_);
#endif // __linux__
}
void OnDiskNpyArray::ParseNumpyHeader(torch::Tensor shape) {
#ifdef __linux__
// Parse numpy file header to get basic info of feature.
size_t word_size = c10::elementSize(dtype_);
int64_t num_dim = shape.numel();
auto shape_ptr = shape.data_ptr<int64_t>();
for (int64_t d = 0; d < num_dim; d++) {
feature_dim_.emplace_back(shape_ptr[d]);
}
// Compute single feature size.
signed long feature_length = 1;
for (size_t i = 1; i < feature_dim_.size(); i++) {
feature_length *= feature_dim_[i];
}
feature_size_ = feature_length * word_size;
// Get file prefix length.
std::ifstream file(filename_);
if (!file.is_open()) {
throw std::runtime_error(
"ParseNumpyHeader: Unable to open file " + filename_);
}
std::string header;
std::getline(file, header);
// Get prefix length for computing feature offset,
// add one for new-line character.
prefix_len_ = header.size() + 1;
#endif // __linux__
}
torch::Tensor OnDiskNpyArray::IndexSelect(torch::Tensor index) {
#ifdef __linux__
return IndexSelectIOUring(index);
#else
TORCH_CHECK(false, "OnDiskNpyArray is not supported on non-Linux systems.");
return torch::empty({0});
#endif // __linux__
}
#ifdef __linux__
torch::Tensor OnDiskNpyArray::IndexSelectIOUring(torch::Tensor index) {
index = index.to(torch::kLong);
// The minimum page size to contain one feature.
int64_t aligned_length =
(feature_size_ + kDiskAlignmentSize) & (long)~(kDiskAlignmentSize - 1);
int64_t num_index = index.numel();
char *read_buffer = (char *)aligned_alloc(
kDiskAlignmentSize,
(aligned_length + kDiskAlignmentSize) * group_size_ * num_thread_);
char *result_buffer =
(char *)aligned_alloc(kDiskAlignmentSize, feature_size_ * num_index);
auto index_data = index.data_ptr<int64_t>();
// Record the inside offsets of feteched features.
int64_t residual[group_size_ * num_thread_];
// Indicator for index error.
std::atomic<bool> error_flag{};
TORCH_CHECK(
num_thread_ >= torch::get_num_threads(),
"The number of threads can not be changed to larger than the number of "
"threads when a disk feature fetcher is constructed.");
torch::parallel_for(
0, num_index, group_size_, [&](int64_t begin, int64_t end) {
auto thread_id = torch::get_thread_num();
if (!error_flag.load()) {
for (int64_t i = begin; i < end; i++) {
int64_t group_id = i - begin;
int64_t feature_id = index_data[i]; // Feature id.
if (feature_id >= feature_dim_[0]) {
error_flag.store(true);
break;
}
int64_t offset = feature_id * feature_size_ + prefix_len_;
int64_t aligned_offset = offset & (long)~(kDiskAlignmentSize - 1);
residual[thread_id * group_size_ + group_id] =
offset - aligned_offset;
int64_t read_size;
if (residual[thread_id * group_size_ + group_id] + feature_size_ >
kDiskAlignmentSize) {
read_size = aligned_length + kDiskAlignmentSize;
} else {
read_size = aligned_length;
}
// Put requests into io_uring queue.
struct io_uring_sqe *submit_queue =
io_uring_get_sqe(&io_uring_queue_[thread_id]);
io_uring_prep_read(
submit_queue, file_description_,
read_buffer +
((aligned_length + kDiskAlignmentSize) * group_size_ *
thread_id) +
((aligned_length + kDiskAlignmentSize) * group_id),
read_size, aligned_offset);
}
}
if (!error_flag.load()) {
// Submit I/O requests.
io_uring_submit(&io_uring_queue_[thread_id]);
// Wait for completion of I/O requests.
int64_t num_finish = 0;
// Wait until all the disk blocks are loaded in current group.
while (num_finish < end - begin) {
struct io_uring_cqe *complete_queue;
if (io_uring_wait_cqe(
&io_uring_queue_[thread_id], &complete_queue) < 0) {
perror("io_uring_wait_cqe");
abort();
}
struct io_uring_cqe *complete_queues[group_size_];
int cqe_count = io_uring_peek_batch_cqe(
&io_uring_queue_[thread_id], complete_queues, group_size_);
if (cqe_count == -1) {
perror("io_uring_peek_batch error\n");
abort();
}
// Move the head pointer of completion queue.
io_uring_cq_advance(&io_uring_queue_[thread_id], cqe_count);
num_finish += cqe_count;
}
// Copy the features in the disk blocks to the result buffer.
for (int64_t group_id = 0; group_id < end - begin; group_id++) {
memcpy(
result_buffer + feature_size_ * (begin + group_id),
read_buffer +
((aligned_length + kDiskAlignmentSize) * group_size_ *
thread_id) +
((aligned_length + kDiskAlignmentSize) * group_id +
residual[thread_id * group_size_ + group_id]),
feature_size_);
}
}
});
auto result = torch::empty({0});
if (!error_flag.load()) {
auto options = torch::TensorOptions()
.dtype(dtype_)
.layout(torch::kStrided)
.device(torch::kCPU)
.requires_grad(false);
std::vector<int64_t> shape;
shape.push_back(num_index);
shape.insert(shape.end(), feature_dim_.begin() + 1, feature_dim_.end());
result = torch::from_blob(result_buffer, torch::IntArrayRef(shape), options)
.clone();
} else {
throw std::runtime_error("IndexError: Index out of range.");
}
free(read_buffer);
free(result_buffer);
return result;
}
#endif // __linux__
} // namespace storage
} // namespace graphbolt
/**
* Copyright (c) 2023 by Contributors
* @file cnumpy.h
* @brief Numpy File Fetecher class.
*/
#include <fcntl.h>
#include <stdint.h>
#include <stdlib.h>
#include <torch/script.h>
#ifdef __linux__
#include <liburing.h>
#include <unistd.h>
#endif
#include <cassert>
#include <cstdio>
#include <cstring>
#include <fstream>
#include <iostream>
#include <string>
namespace graphbolt {
namespace storage {
/**
* @brief Disk Numpy Fetecher class.
*/
class OnDiskNpyArray : public torch::CustomClassHolder {
public:
/** @brief Default constructor. */
OnDiskNpyArray() = default;
/**
* @brief Constructor with given file path and data type.
* @param path Path to the on disk numpy file.
* @param dtype Data type of numpy array.
*
* @return OnDiskNpyArray
*/
OnDiskNpyArray(
std::string filename, torch::ScalarType dtype, torch::Tensor shape);
/** @brief Create a disk feature fetcher from numpy file. */
static c10::intrusive_ptr<OnDiskNpyArray> Create(
std::string path, torch::ScalarType dtype, torch::Tensor shape);
/** @brief Deconstructor. */
~OnDiskNpyArray();
/**
* @brief Parses the header of a numpy file to extract feature information.
**/
void ParseNumpyHeader(torch::Tensor shape);
/**
* @brief Read disk numpy file based on given index and transform to
* tensor.
*/
torch::Tensor IndexSelect(torch::Tensor index);
#ifdef __linux__
/**
* @brief Index-select operation on an on-disk numpy array using IO Uring for
* asynchronous I/O.
*
* This function performs index-select operation on an on-disk numpy array. It
* uses IO Uring for asynchronous I/O to efficiently read data from disk. The
* input tensor 'index' specifies the indices of features to select. The
* function reads features corresponding to the indices from the disk and
* returns a new tensor containing the selected features.
*
* @param index A 1D tensor containing the indices of features to select.
* @return A tensor containing the selected features.
* @throws std::runtime_error If index is out of range.
*/
torch::Tensor IndexSelectIOUring(torch::Tensor index);
#endif // __linux__
private:
std::string filename_; // Path to numpy file.
int file_description_; // File description.
size_t prefix_len_; // Length of head data in numpy file.
std::vector<int64_t> feature_dim_; // Shape of features, e.g. {N,M,K,L}.
torch::ScalarType dtype_; // Feature data type.
int64_t feature_size_; // Number of bytes of feature size.
int num_thread_; // Default thread number.
int64_t group_size_ = 512; // Default group size.
#ifdef __linux__
io_uring* io_uring_queue_; // io_uring queue.
#endif
};
} // namespace storage
} // namespace graphbolt
......@@ -3,6 +3,8 @@
* @file index_select.cc
* @brief Index select operators.
*/
#include "./index_select.h"
#include <graphbolt/cuda_ops.h>
#include <graphbolt/fused_csc_sampling_graph.h>
......
......@@ -12,6 +12,7 @@
#ifdef GRAPHBOLT_USE_CUDA
#include "./cuda/max_uva_threads.h"
#endif
#include "./cnumpy.h"
#include "./expand_indptr.h"
#include "./index_select.h"
#include "./random.h"
......@@ -36,6 +37,8 @@ TORCH_LIBRARY(graphbolt, m) {
.def_readwrite(
"original_edge_ids", &FusedSampledSubgraph::original_edge_ids)
.def_readwrite("type_per_edge", &FusedSampledSubgraph::type_per_edge);
m.class_<storage::OnDiskNpyArray>("OnDiskNpyArray")
.def("index_select", &storage::OnDiskNpyArray::IndexSelect);
m.class_<FusedCSCSamplingGraph>("FusedCSCSamplingGraph")
.def("num_nodes", &FusedCSCSamplingGraph::NumNodes)
.def("num_edges", &FusedCSCSamplingGraph::NumEdges)
......@@ -88,6 +91,7 @@ TORCH_LIBRARY(graphbolt, m) {
m.def("isin", &IsIn);
m.def("index_select", &ops::IndexSelect);
m.def("index_select_csc", &ops::IndexSelectCSC);
m.def("ondisk_npy_array", &storage::OnDiskNpyArray::Create);
m.def("set_seed", &RandomEngine::SetManualSeed);
#ifdef GRAPHBOLT_USE_CUDA
m.def("set_max_uva_threads", &cuda::set_max_uva_threads);
......
"""Torch-based feature store for GraphBolt."""
import copy
import platform
import textwrap
from typing import Dict, List
......@@ -12,7 +13,7 @@ from ..feature_store import Feature
from .basic_feature_store import BasicFeatureStore
from .ondisk_metadata import OnDiskFeatureData
__all__ = ["TorchBasedFeature", "TorchBasedFeatureStore"]
__all__ = ["TorchBasedFeature", "DiskBasedFeature", "TorchBasedFeatureStore"]
class TorchBasedFeature(Feature):
......@@ -234,6 +235,117 @@ class TorchBasedFeature(Feature):
)
class DiskBasedFeature(Feature):
r"""A wrapper of disk based feature.
Initialize a disk based feature fetcher by a numpy file.
Parameters
----------
path : string
The path to the numpy feature file.
Note that the dimension of the numpy should be greater than 1.
Examples
--------
>>> import torch
>>> from dgl import graphbolt as gb
>>> torch_feat = torch.arange(10).reshape(2, -1)
>>> feature = gb.DiskBasedFeature(torch_feat)
>>> feature.read(torch.tensor([0]))
tensor([[0, 1, 2, 3, 4]])
>>> feature.size()
torch.Size([5])
"""
def __init__(self, path: str, metadata: Dict = None):
super().__init__()
mmap_mode = "r+"
self._tensor = torch.from_numpy(
np.load(path, mmap_mode=mmap_mode)
).contiguous()
self._metadata = metadata
self._ondisk_npy_array = torch.ops.graphbolt.ondisk_npy_array(
path, self._tensor.dtype, torch.tensor(self._tensor.shape)
)
def read(self, ids: torch.Tensor = None):
"""Read the feature by index.
The returned tensor will be on CPU.
Parameters
----------
ids : torch.Tensor
The index of the feature. Only the specified indices of the
feature are read.
Returns
-------
torch.Tensor
The read feature.
"""
if ids is None:
if self._tensor.is_pinned():
return self._tensor.cuda()
return self._tensor
elif platform.system() == "Linux":
try:
return self._ondisk_npy_array.index_select(ids.cpu()).to(
ids.device
)
except RuntimeError:
raise IndexError
else:
return index_select(self._tensor, ids)
def size(self):
"""Get the size of the feature.
Returns
-------
torch.Size
The size of the feature.
"""
return self._tensor.size()[1:]
def update(self, value: torch.Tensor, ids: torch.Tensor = None):
"""Disk based feature does not support update for now."""
raise NotImplementedError
def metadata(self):
"""Get the metadata of the feature.
Returns
-------
Dict
The metadata of the feature.
"""
return (
self._metadata if self._metadata is not None else super().metadata()
)
def read_into_memory(self) -> TorchBasedFeature:
"""Change disk-based feature to torch-based feature."""
return TorchBasedFeature(self._tensor, self._metadata)
def __repr__(self) -> str:
ret = (
"{Classname}(\n"
" feature={feature},\n"
" metadata={metadata},\n"
")"
)
feature_str = textwrap.indent(
str(self._tensor), " " * len(" feature=")
).strip()
metadata_str = textwrap.indent(
str(self.metadata()), " " * len(" metadata=")
).strip()
return ret.format(
Classname=self.__class__.__name__,
feature=feature_str,
metadata=metadata_str,
)
class TorchBasedFeatureStore(BasicFeatureStore):
r"""A store to manage multiple pytorch based feature for access.
......
import os
import sys
import tempfile
import unittest
import numpy as np
import pytest
import torch
from dgl import graphbolt as gb
def to_on_disk_numpy(test_dir, name, t):
path = os.path.join(test_dir, name + ".npy")
t = t.numpy()
np.save(path, t)
return path
@unittest.skipIf(
sys.platform.startswith("win"),
reason="Tests for disk dataset can only deployed on Linux,"
"because the io_uring is only supportted by Linux kernel.",
)
def test_disk_based_feature():
with tempfile.TemporaryDirectory() as test_dir:
a = torch.tensor([[1, 2, 3], [4, 5, 6]])
b = torch.tensor([[[1, 2], [3, 4]], [[4, 5], [6, 7]]])
metadata = {"max_value": 3}
path_a = to_on_disk_numpy(test_dir, "a", a)
path_b = to_on_disk_numpy(test_dir, "b", b)
feature_a = gb.DiskBasedFeature(path=path_a, metadata=metadata)
feature_b = gb.DiskBasedFeature(path=path_b)
# Read the entire feature.
assert torch.equal(
feature_a.read(), torch.tensor([[1, 2, 3], [4, 5, 6]])
)
# Test read the feature with ids.
assert torch.equal(
feature_b.read(), torch.tensor([[[1, 2], [3, 4]], [[4, 5], [6, 7]]])
)
# Read the feature with ids.
assert torch.equal(
feature_a.read(torch.tensor([0])),
torch.tensor([[1, 2, 3]]),
)
assert torch.equal(
feature_b.read(torch.tensor([1])),
torch.tensor([[[4, 5], [6, 7]]]),
)
# Test get the size of the entire feature.
assert feature_a.size() == torch.Size([3])
assert feature_b.size() == torch.Size([2, 2])
# Test get metadata of the feature.
assert feature_a.metadata() == metadata
assert feature_b.metadata() == {}
with pytest.raises(IndexError):
feature_a.read(torch.tensor([0, 1, 2, 3]))
# For windows, the file is locked by the numpy.load. We need to delete
# it before closing the temporary directory.
a = b = None
feature_a = feature_b = None
# Test loaded tensors' contiguity from C/Fortran contiguous ndarray.
contiguous_numpy = np.array([[1, 2, 3], [4, 5, 6]], order="C")
non_contiguous_numpy = np.array([[1, 2, 3], [4, 5, 6]], order="F")
assert contiguous_numpy.flags["C_CONTIGUOUS"]
assert non_contiguous_numpy.flags["F_CONTIGUOUS"]
path_contiguous = os.path.join(test_dir, "contiguous_numpy.npy")
path_non_contiguous = os.path.join(test_dir, "non_contiguous_numpy.npy")
np.save(path_contiguous, contiguous_numpy)
np.save(path_non_contiguous, non_contiguous_numpy)
feature_c = gb.DiskBasedFeature(path=path_contiguous, metadata=metadata)
feature_n = gb.DiskBasedFeature(path=path_non_contiguous)
assert feature_c._tensor.is_contiguous()
assert feature_n._tensor.is_contiguous()
contiguous_numpy = non_contiguous_numpy = None
feature_c = feature_n = None
@unittest.skipIf(
sys.platform.startswith("win"),
reason="Tests for disk dataset can only deployed on Linux,"
"because the io_uring is only supportted by Linux kernel.",
)
@pytest.mark.parametrize(
"dtype",
[
torch.float32,
torch.float64,
torch.int32,
torch.int64,
torch.int8,
torch.float16,
torch.complex128,
],
)
@pytest.mark.parametrize("idtype", [torch.int32, torch.int64])
@pytest.mark.parametrize(
"shape", [(10, 20), (20, 10), (20, 25, 10), (137, 50, 30)]
)
@pytest.mark.parametrize("index", [[0], [1, 2, 3], [0, 6, 2, 8]])
def test_more_disk_based_feature(dtype, idtype, shape, index):
if dtype == torch.complex128:
tensor = torch.complex(
torch.randint(0, 13, shape, dtype=torch.float64),
torch.randint(0, 13, shape, dtype=torch.float64),
)
else:
tensor = torch.randint(0, 13, shape, dtype=dtype)
test_tensor = tensor.clone()
idx = torch.tensor(index)
with tempfile.TemporaryDirectory() as test_dir:
path = to_on_disk_numpy(test_dir, "tensor", tensor)
feature = gb.DiskBasedFeature(path=path)
# Test read feature.
assert torch.equal(
feature.read(torch.tensor(idx, dtype=idtype)), test_tensor[idx]
)
@unittest.skipIf(
sys.platform.startswith("win"),
reason="Tests for large disk dataset can only deployed on Linux,"
"because the io_uring is only supportted by Linux kernel.",
)
def test_disk_based_feature_repr():
with tempfile.TemporaryDirectory() as test_dir:
a = torch.tensor([[1, 2, 3], [4, 5, 6]])
b = torch.tensor([[[1, 2], [3, 4]], [[4, 5], [6, 7]]])
metadata = {"max_value": 3}
path_a = to_on_disk_numpy(test_dir, "a", a)
path_b = to_on_disk_numpy(test_dir, "b", b)
feature_a = gb.DiskBasedFeature(path=path_a, metadata=metadata)
feature_b = gb.DiskBasedFeature(path=path_b)
expected_str_feature_a = str(
"DiskBasedFeature(\n"
" feature=tensor([[1, 2, 3],\n"
" [4, 5, 6]]),\n"
" metadata={'max_value': 3},\n"
")"
)
expected_str_feature_b = str(
"DiskBasedFeature(\n"
" feature=tensor([[[1, 2],\n"
" [3, 4]],\n"
"\n"
" [[4, 5],\n"
" [6, 7]]]),\n"
" metadata={},\n"
")"
)
assert str(feature_a) == expected_str_feature_a
assert str(feature_b) == expected_str_feature_b
a = b = metadata = None
feature_a = feature_b = None
expected_str_feature_a = expected_str_feature_b = None
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment