Unverified Commit 1f15e3bc authored by Muhammed Fatih BALIN's avatar Muhammed Fatih BALIN Committed by GitHub
Browse files

[Graphbolt][CUDA] Implementing the sparse UVA and nonUVA index select...

[Graphbolt][CUDA] Implementing the sparse UVA and nonUVA index select functionality for graphs (#6645)
parent d9a3868f
/**
* Copyright (c) 2023 by Contributors
* Copyright (c) 2023, GT-TDAlab (Muhammed Fatih Balin & Umit V. Catalyurek)
* @file graphbolt/cuda_ops.h
* @brief Available CUDA operations in Graphbolt.
*/
#include <torch/script.h>
namespace graphbolt {
namespace ops {
std::pair<torch::Tensor, torch::Tensor> Sort(torch::Tensor input, int num_bits);
std::tuple<torch::Tensor, torch::Tensor> IndexSelectCSCImpl(
torch::Tensor indptr, torch::Tensor indices, torch::Tensor nodes);
std::tuple<torch::Tensor, torch::Tensor> UVAIndexSelectCSCImpl(
torch::Tensor indptr, torch::Tensor indices, torch::Tensor nodes);
torch::Tensor UVAIndexSelectImpl(torch::Tensor input, torch::Tensor index);
} // namespace ops
} // namespace graphbolt
/**
* Copyright (c) 2017-2023 by Contributors
* Copyright (c) 2023, GT-TDAlab (Muhammed Fatih Balin & Umit V. Catalyurek)
* @file cuda/common.h
* @brief Common utilities for CUDA
*/
......@@ -21,18 +22,18 @@ namespace cuda {
* @brief This class is designed to allocate workspace storage
* and to get a nonblocking thrust execution policy
* that uses torch's CUDA memory pool and the current cuda stream:
*
* cuda::CUDAWorkspaceAllocator allocator;
* const auto stream = torch::cuda::getDefaultCUDAStream();
* const auto exec_policy = thrust::cuda::par_nosync(allocator).on(stream);
*
* Now, one can pass exec_policy to thrust functions
*
* To get an integer array of size 1000 whose lifetime is managed by unique_ptr,
* use:
*
* auto int_array = allocator.AllocateStorage<int>(1000);
*
* int_array.get() gives the raw pointer.
*/
struct CUDAWorkspaceAllocator {
......@@ -64,6 +65,8 @@ struct CUDAWorkspaceAllocator {
}
};
inline auto GetAllocator() { return CUDAWorkspaceAllocator{}; }
template <typename T>
inline bool is_zero(T size) {
return size == 0;
......@@ -85,6 +88,36 @@ inline bool is_zero<dim3>(dim3 size) {
} \
}
#define GRAPHBOLT_DISPATCH_ELEMENT_SIZES(element_size, name, ...) \
[&] { \
switch (element_size) { \
case 1: { \
using element_size_t = uint8_t; \
return __VA_ARGS__(); \
} \
case 2: { \
using element_size_t = uint16_t; \
return __VA_ARGS__(); \
} \
case 4: { \
using element_size_t = uint32_t; \
return __VA_ARGS__(); \
} \
case 8: { \
using element_size_t = uint64_t; \
return __VA_ARGS__(); \
} \
case 16: { \
using element_size_t = float4; \
return __VA_ARGS__(); \
} \
default: \
TORCH_CHECK(false, name, " with the element_size is not supported!"); \
using element_size_t = uint8_t; \
return __VA_ARGS__(); \
} \
}()
} // namespace cuda
} // namespace graphbolt
#endif // GRAPHBOLT_CUDA_COMMON_H_
/**
* Copyright (c) 2023 by Contributors
* Copyright (c) 2023, GT-TDAlab (Muhammed Fatih Balin & Umit V. Catalyurek)
* @file cuda/index_select_csc_impl.cu
* @brief Index select csc operator implementation on CUDA.
*/
#include <c10/core/ScalarType.h>
#include <c10/cuda/CUDAStream.h>
#include <graphbolt/cuda_ops.h>
#include <thrust/execution_policy.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/transform_iterator.h>
#include <cub/cub.cuh>
#include <numeric>
#include "./common.h"
#include "./utils.h"
namespace graphbolt {
namespace ops {
constexpr int BLOCK_SIZE = 128;
// Given the in_degree array and a permutation, returns in_degree of the output
// and the permuted and modified in_degree of the input. The modified in_degree
// is modified so that there is slack to be able to align as needed.
template <typename indptr_t, typename indices_t>
struct AlignmentFunc {
static_assert(GPU_CACHE_LINE_SIZE % sizeof(indices_t) == 0);
const indptr_t* in_degree;
const int64_t* perm;
int64_t num_nodes;
__host__ __device__ auto operator()(int64_t row) {
constexpr int num_elements = GPU_CACHE_LINE_SIZE / sizeof(indices_t);
return thrust::make_tuple(
in_degree[row],
// A single cache line has num_elements items, we add num_elements - 1
// to ensure there is enough slack to move forward or backward by
// num_elements - 1 items if the performed access is not aligned.
(indptr_t)(in_degree[perm ? perm[row % num_nodes] : row] + num_elements - 1));
}
};
template <typename indptr_t, typename indices_t>
__global__ void _CopyIndicesAlignedKernel(
const indptr_t edge_count, const int64_t num_nodes,
const indptr_t* const indptr, const indptr_t* const output_indptr,
const indptr_t* const output_indptr_aligned, const indices_t* const indices,
indices_t* const output_indices, const int64_t* const perm) {
indptr_t idx = static_cast<indptr_t>(blockIdx.x) * blockDim.x + threadIdx.x;
const int stride_x = gridDim.x * blockDim.x;
while (idx < edge_count) {
const auto permuted_row_pos =
cuda::UpperBound(output_indptr_aligned, num_nodes, idx) - 1;
const auto row_pos = perm ? perm[permuted_row_pos] : permuted_row_pos;
const auto out_row = output_indptr[row_pos];
const auto d = output_indptr[row_pos + 1] - out_row;
const int offset =
((size_t)(indices + indptr[row_pos] - output_indptr_aligned[permuted_row_pos]) %
GPU_CACHE_LINE_SIZE) /
sizeof(indices_t);
const auto rofs = idx - output_indptr_aligned[permuted_row_pos] - offset;
if (rofs >= 0 && rofs < d) {
const auto in_idx = indptr[row_pos] + rofs;
assert((size_t)(indices + in_idx - idx) % GPU_CACHE_LINE_SIZE == 0);
const auto u = indices[in_idx];
output_indices[out_row + rofs] = u;
}
idx += stride_x;
}
}
// Given rows and indptr, computes:
// inrow_indptr[i] = indptr[rows[i]];
// in_degree[i] = indptr[rows[i] + 1] - indptr[rows[i]];
template <typename indptr_t, typename nodes_t>
struct SliceFunc {
const nodes_t* rows;
const indptr_t* indptr;
indptr_t* in_degree;
indptr_t* inrow_indptr;
__host__ __device__ auto operator()(int64_t tIdx) {
const auto out_row = rows[tIdx];
const auto indptr_val = indptr[out_row];
const auto degree = indptr[out_row + 1] - indptr_val;
in_degree[tIdx] = degree;
inrow_indptr[tIdx] = indptr_val;
}
};
struct PairSum {
template <typename indptr_t>
__host__ __device__ auto operator()(
const thrust::tuple<indptr_t, indptr_t> a,
const thrust::tuple<indptr_t, indptr_t> b) {
return thrust::make_tuple(
thrust::get<0>(a) + thrust::get<0>(b),
thrust::get<1>(a) + thrust::get<1>(b));
};
};
// Returns (indptr[nodes + 1] - indptr[nodes], indptr[nodes])
template <typename indptr_t>
auto SliceCSCIndptr(
const indptr_t* const indptr, torch::Tensor nodes, cudaStream_t stream) {
auto allocator = cuda::GetAllocator();
const auto exec_policy = thrust::cuda::par_nosync(allocator).on(stream);
const int64_t num_nodes = nodes.size(0);
// Read indptr only once in case it is pinned and access is slow.
auto sliced_indptr = allocator.AllocateStorage<indptr_t>(num_nodes);
// compute in-degrees
auto in_degree = allocator.AllocateStorage<indptr_t>(num_nodes + 1);
thrust::counting_iterator<int64_t> iota(0);
AT_DISPATCH_INDEX_TYPES(nodes.scalar_type(), "IndexSelectCSCNodes", ([&] {
using nodes_t = index_t;
thrust::for_each(
exec_policy, iota, iota + num_nodes,
SliceFunc<indptr_t, nodes_t>{
nodes.data_ptr<nodes_t>(), indptr,
in_degree.get(), sliced_indptr.get()});
}));
return std::make_pair(std::move(in_degree), std::move(sliced_indptr));
}
template <typename indptr_t, typename indices_t>
std::tuple<torch::Tensor, torch::Tensor> UVAIndexSelectCSCCopyIndices(
torch::Tensor indices, const int64_t num_nodes,
const indptr_t* const in_degree, const indptr_t* const sliced_indptr,
const int64_t* const perm, torch::TensorOptions nodes_options,
torch::ScalarType indptr_scalar_type, cudaStream_t stream) {
auto allocator = cuda::GetAllocator();
thrust::counting_iterator<int64_t> iota(0);
// Output indptr for the slice indexed by nodes.
auto output_indptr =
torch::empty(num_nodes + 1, nodes_options.dtype(indptr_scalar_type));
// Actual and modified number of edges.
indptr_t edge_count, edge_count_aligned;
auto output_indptr_aligned =
allocator.AllocateStorage<indptr_t>(num_nodes + 1);
{
// Returns the actual and modified_indegree as a pair, the
// latter overestimates the actual indegree for alignment
// purposes.
auto modified_in_degree = thrust::make_transform_iterator(
iota, AlignmentFunc<indptr_t, indices_t>{in_degree, perm, num_nodes});
auto output_indptr_pair = thrust::make_zip_iterator(
output_indptr.data_ptr<indptr_t>(), output_indptr_aligned.get());
thrust::tuple<indptr_t, indptr_t> zero_value{};
// Compute the prefix sum over actual and modified indegrees.
size_t tmp_storage_size = 0;
CUDA_CALL(cub::DeviceScan::ExclusiveScan(
nullptr, tmp_storage_size, modified_in_degree, output_indptr_pair,
PairSum{}, zero_value, num_nodes + 1, stream));
auto tmp_storage = allocator.AllocateStorage<char>(tmp_storage_size);
CUDA_CALL(cub::DeviceScan::ExclusiveScan(
tmp_storage.get(), tmp_storage_size, modified_in_degree,
output_indptr_pair, PairSum{}, zero_value, num_nodes + 1, stream));
}
// Copy the modified number of edges.
CUDA_CALL(cudaMemcpyAsync(
&edge_count_aligned, output_indptr_aligned.get() + num_nodes,
sizeof(edge_count_aligned), cudaMemcpyDeviceToHost, stream));
// Copy the actual total number of edges.
CUDA_CALL(cudaMemcpyAsync(
&edge_count, output_indptr.data_ptr<indptr_t>() + num_nodes,
sizeof(edge_count), cudaMemcpyDeviceToHost, stream));
// synchronizes here, we can read edge_count and edge_count_aligned
CUDA_CALL(cudaStreamSynchronize(stream));
// Allocate output array with actual number of edges.
torch::Tensor output_indices =
torch::empty(edge_count, nodes_options.dtype(indices.scalar_type()));
const dim3 block(BLOCK_SIZE);
const dim3 grid((edge_count_aligned + BLOCK_SIZE - 1) / BLOCK_SIZE);
// Perform the actual copying, of the indices array into
// output_indices in an aligned manner.
CUDA_KERNEL_CALL(
_CopyIndicesAlignedKernel, grid, block, 0, stream, edge_count_aligned,
num_nodes, sliced_indptr, output_indptr.data_ptr<indptr_t>(),
output_indptr_aligned.get(),
reinterpret_cast<indices_t*>(indices.data_ptr()),
reinterpret_cast<indices_t*>(output_indices.data_ptr()), perm);
return {output_indptr, output_indices};
}
std::tuple<torch::Tensor, torch::Tensor> UVAIndexSelectCSCImpl(
torch::Tensor indptr, torch::Tensor indices, torch::Tensor nodes) {
// Sorting nodes so that accesses over PCI-e are more regular.
const auto sorted_idx =
Sort(nodes, cuda::NumberOfBits(indptr.size(0) - 1)).second;
auto stream = c10::cuda::getDefaultCUDAStream();
const int64_t num_nodes = nodes.size(0);
return AT_DISPATCH_INTEGRAL_TYPES(
indptr.scalar_type(), "UVAIndexSelectCSCIndptr", ([&] {
using indptr_t = scalar_t;
auto [in_degree_ptr, sliced_indptr_ptr] =
SliceCSCIndptr(indptr.data_ptr<indptr_t>(), nodes, stream);
auto in_degree = in_degree_ptr.get();
auto sliced_indptr = sliced_indptr_ptr.get();
return GRAPHBOLT_DISPATCH_ELEMENT_SIZES(
indices.element_size(), "UVAIndexSelectCSCCopyIndices", ([&] {
return UVAIndexSelectCSCCopyIndices<indptr_t, element_size_t>(
indices, num_nodes, in_degree, sliced_indptr,
sorted_idx.data_ptr<int64_t>(), nodes.options(),
indptr.scalar_type(), stream);
}));
}));
}
template <typename indptr_t, typename indices_t>
struct IteratorFunc {
indptr_t* indptr;
indices_t* indices;
__host__ __device__ auto operator()(int64_t i) { return indices + indptr[i]; }
};
template <typename indptr_t, typename indices_t>
struct ConvertToBytes {
const indptr_t* in_degree;
__host__ __device__ indptr_t operator()(int64_t i) {
return in_degree[i] * sizeof(indices_t);
}
};
template <typename indptr_t, typename indices_t>
void IndexSelectCSCCopyIndices(
const int64_t num_nodes, indices_t* const indices,
indptr_t* const sliced_indptr, const indptr_t* const in_degree,
indptr_t* const output_indptr, indices_t* const output_indices,
cudaStream_t stream) {
auto allocator = cuda::GetAllocator();
thrust::counting_iterator<int64_t> iota(0);
auto input_buffer_it = thrust::make_transform_iterator(
iota, IteratorFunc<indptr_t, indices_t>{sliced_indptr, indices});
auto output_buffer_it = thrust::make_transform_iterator(
iota, IteratorFunc<indptr_t, indices_t>{output_indptr, output_indices});
auto buffer_sizes = thrust::make_transform_iterator(
iota, ConvertToBytes<indptr_t, indices_t>{in_degree});
constexpr int64_t max_copy_at_once = std::numeric_limits<int32_t>::max();
// Performs the copy from indices into output_indices.
for (int64_t i = 0; i < num_nodes; i += max_copy_at_once) {
size_t tmp_storage_size = 0;
CUDA_CALL(cub::DeviceMemcpy::Batched(
nullptr, tmp_storage_size, input_buffer_it + i, output_buffer_it + i,
buffer_sizes + i, std::min(num_nodes - i, max_copy_at_once), stream));
auto tmp_storage = allocator.AllocateStorage<char>(tmp_storage_size);
CUDA_CALL(cub::DeviceMemcpy::Batched(
tmp_storage.get(), tmp_storage_size, input_buffer_it + i,
output_buffer_it + i, buffer_sizes + i,
std::min(num_nodes - i, max_copy_at_once), stream));
}
}
std::tuple<torch::Tensor, torch::Tensor> IndexSelectCSCImpl(
torch::Tensor indptr, torch::Tensor indices, torch::Tensor nodes) {
auto stream = c10::cuda::getDefaultCUDAStream();
const int64_t num_nodes = nodes.size(0);
return AT_DISPATCH_INTEGRAL_TYPES(
indptr.scalar_type(), "IndexSelectCSCIndptr", ([&] {
using indptr_t = scalar_t;
auto [in_degree_ptr, sliced_indptr_ptr] =
SliceCSCIndptr(indptr.data_ptr<indptr_t>(), nodes, stream);
auto in_degree = in_degree_ptr.get();
auto sliced_indptr = sliced_indptr_ptr.get();
// Output indptr for the slice indexed by nodes.
torch::Tensor output_indptr = torch::empty(
num_nodes + 1, nodes.options().dtype(indptr.scalar_type()));
{ // Compute the output indptr, output_indptr.
size_t tmp_storage_size = 0;
CUDA_CALL(cub::DeviceScan::ExclusiveSum(
nullptr, tmp_storage_size, in_degree,
output_indptr.data_ptr<indptr_t>(), num_nodes + 1, stream));
auto allocator = cuda::GetAllocator();
auto tmp_storage = allocator.AllocateStorage<char>(tmp_storage_size);
CUDA_CALL(cub::DeviceScan::ExclusiveSum(
tmp_storage.get(), tmp_storage_size, in_degree,
output_indptr.data_ptr<indptr_t>(), num_nodes + 1, stream));
}
// Number of edges being copied.
indptr_t edge_count;
CUDA_CALL(cudaMemcpyAsync(
&edge_count, output_indptr.data_ptr<indptr_t>() + num_nodes,
sizeof(edge_count), cudaMemcpyDeviceToHost, stream));
// blocking read of edge_count
CUDA_CALL(cudaStreamSynchronize(stream));
// Allocate output array of size number of copied edges.
torch::Tensor output_indices = torch::empty(
edge_count, nodes.options().dtype(indices.scalar_type()));
GRAPHBOLT_DISPATCH_ELEMENT_SIZES(
indices.element_size(), "IndexSelectCSCCopyIndices", ([&] {
using indices_t = element_size_t;
IndexSelectCSCCopyIndices<indptr_t, indices_t>(
num_nodes, reinterpret_cast<indices_t*>(indices.data_ptr()),
sliced_indptr, in_degree, output_indptr.data_ptr<indptr_t>(),
reinterpret_cast<indices_t*>(output_indices.data_ptr()),
stream);
}));
return std::make_tuple(output_indptr, output_indices);
}));
}
} // namespace ops
} // namespace graphbolt
/**
* Copyright (c) 2023 by Contributors
* Copyright (c) 2023, GT-TDAlab (Muhammed Fatih Balin & Umit V. Catalyurek)
* @file cuda/index_select_impl.cu
* @brief Index select operator implementation on CUDA.
*/
#include <c10/core/ScalarType.h>
#include <c10/cuda/CUDAStream.h>
#include <graphbolt/cuda_ops.h>
#include <thrust/execution_policy.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/transform_iterator.h>
#include <cub/cub.cuh>
#include <numeric>
#include "../index_select.h"
#include "./common.h"
#include "./utils.h"
namespace graphbolt {
namespace ops {
std::pair<torch::Tensor, torch::Tensor> Sort(
torch::Tensor input, int num_bits) {
int64_t num_items = input.size(0);
// We utilize int64_t for the values array. (torch::kLong == int64_t)
auto original_idx =
torch::arange(num_items, input.options().dtype(torch::kLong));
auto sorted_array = torch::empty_like(input);
auto sorted_idx = torch::empty_like(original_idx);
cuda::CUDAWorkspaceAllocator allocator;
AT_DISPATCH_INDEX_TYPES(
input.scalar_type(), "SortImpl", ([&] {
using IdType = index_t;
const auto input_keys = input.data_ptr<index_t>();
const int64_t* input_values = original_idx.data_ptr<int64_t>();
IdType* sorted_keys = sorted_array.data_ptr<index_t>();
int64_t* sorted_values = sorted_idx.data_ptr<int64_t>();
cudaStream_t stream = torch::cuda::getDefaultCUDAStream();
if (num_bits == 0) {
num_bits = sizeof(index_t) * 8;
}
size_t workspace_size = 0;
CUDA_CALL(cub::DeviceRadixSort::SortPairs(
nullptr, workspace_size, input_keys, sorted_keys, input_values,
sorted_values, num_items, 0, num_bits, stream));
auto temp = allocator.AllocateStorage<char>(workspace_size);
CUDA_CALL(cub::DeviceRadixSort::SortPairs(
temp.get(), workspace_size, input_keys, sorted_keys, input_values,
sorted_values, num_items, 0, num_bits, stream));
}));
return std::make_pair(sorted_array, sorted_idx);
}
/** @brief Index select operator implementation for feature size 1. */
template <typename DType, typename IdType>
__global__ void IndexSelectSingleKernel(
......@@ -152,7 +124,7 @@ torch::Tensor UVAIndexSelectImpl_(torch::Tensor input, torch::Tensor index) {
const IdType* index_sorted_ptr = sorted_index.data_ptr<IdType>();
const int64_t* permutation_ptr = permutation.data_ptr<int64_t>();
cudaStream_t stream = torch::cuda::getDefaultCUDAStream();
cudaStream_t stream = c10::cuda::getDefaultCUDAStream();
if (aligned_feature_size == 1) {
// Use a single thread to process each output row to avoid wasting threads.
......@@ -212,21 +184,10 @@ torch::Tensor UVAIndexSelectImpl(torch::Tensor input, torch::Tensor index) {
// for the copies.
const int aligned_access_size =
std::gcd(16, std::gcd(ptr, input.element_size() * feature_size));
switch (aligned_access_size) {
case 1:
return UVAIndexSelectImpl_<uint8_t, index_t>(input, index);
case 2:
return UVAIndexSelectImpl_<uint16_t, index_t>(input, index);
case 4:
return UVAIndexSelectImpl_<uint32_t, index_t>(input, index);
case 8:
return UVAIndexSelectImpl_<uint64_t, index_t>(input, index);
case 16:
return UVAIndexSelectImpl_<float4, index_t>(input, index);
default:
TORCH_CHECK(false, "UVAIndexSelectImpl: Unreachable code path!");
return torch::Tensor{};
}
return GRAPHBOLT_DISPATCH_ELEMENT_SIZES(
aligned_access_size, "UVAIndexSelectImplElementSize", ([&] {
return UVAIndexSelectImpl_<element_size_t, index_t>(input, index);
}));
}));
}
......
/**
* Copyright (c) 2023 by Contributors
* Copyright (c) 2023, GT-TDAlab (Muhammed Fatih Balin & Umit V. Catalyurek)
* @file cuda/sort_impl.cu
* @brief Sort implementation on CUDA.
*/
#include <c10/core/ScalarType.h>
#include <c10/cuda/CUDAStream.h>
#include <cub/cub.cuh>
#include "./common.h"
#include "./utils.h"
namespace graphbolt {
namespace ops {
std::pair<torch::Tensor, torch::Tensor> Sort(
torch::Tensor input, int num_bits) {
int64_t num_items = input.size(0);
// We utilize int64_t for the values array. (torch::kLong == int64_t)
auto original_idx =
torch::arange(num_items, input.options().dtype(torch::kLong));
auto sorted_array = torch::empty_like(input);
auto sorted_idx = torch::empty_like(original_idx);
auto allocator = cuda::GetAllocator();
auto stream = c10::cuda::getDefaultCUDAStream();
AT_DISPATCH_INDEX_TYPES(
input.scalar_type(), "SortImpl", ([&] {
const auto input_keys = input.data_ptr<index_t>();
const int64_t* input_values = original_idx.data_ptr<int64_t>();
index_t* sorted_keys = sorted_array.data_ptr<index_t>();
int64_t* sorted_values = sorted_idx.data_ptr<int64_t>();
if (num_bits == 0) {
num_bits = sizeof(index_t) * 8;
}
size_t tmp_storage_size = 0;
CUDA_CALL(cub::DeviceRadixSort::SortPairs(
nullptr, tmp_storage_size, input_keys, sorted_keys, input_values,
sorted_values, num_items, 0, num_bits, stream));
auto tmp_storage = allocator.AllocateStorage<char>(tmp_storage_size);
CUDA_CALL(cub::DeviceRadixSort::SortPairs(
tmp_storage.get(), tmp_storage_size, input_keys, sorted_keys,
input_values, sorted_values, num_items, 0, num_bits, stream));
}));
return std::make_pair(sorted_array, sorted_idx);
}
} // namespace ops
} // namespace graphbolt
......@@ -8,13 +8,13 @@
#ifndef GRAPHBOLT_CUDA_UTILS_H_
#define GRAPHBOLT_CUDA_UTILS_H_
namespace graphbolt {
namespace cuda {
// The cache line size of GPU.
#define GPU_CACHE_LINE_SIZE 128
constexpr int GPU_CACHE_LINE_SIZE = 128;
// The max number of threads per block.
#define CUDA_MAX_NUM_THREADS 1024
constexpr int CUDA_MAX_NUM_THREADS = 1024;
namespace graphbolt {
namespace cuda {
/**
* @brief Calculate the number of threads needed given the size of the dimension
......@@ -34,7 +34,6 @@ inline int FindNumThreads(int size) {
/**
* @brief Calculate the smallest number of bits needed to represent a given
* range of integers [0, range).
*
*/
template <typename T>
int NumberOfBits(const T& range) {
......@@ -52,6 +51,31 @@ int NumberOfBits(const T& range) {
return bits;
}
/**
* @brief Given a sorted array and a value this function returns the index
* of the first element which compares greater than value.
*
* This function assumes 0-based index
* @param A: ascending sorted array
* @param n: size of the A
* @param x: value to search in A
* @return index, i, of the first element st. A[i]>x. If x>=A[n-1] returns n.
* if x<A[0] then it returns 0.
*/
template <typename indptr_t, typename indices_t>
__device__ indices_t UpperBound(const indptr_t* A, indices_t n, indptr_t x) {
indices_t l = 0, r = n;
while (l < r) {
const auto m = l + (r - l) / 2;
if (x >= A[m]) {
l = m + 1;
} else {
r = m;
}
}
return l;
}
} // namespace cuda
} // namespace graphbolt
......
......@@ -3,9 +3,11 @@
* @file index_select.cc
* @brief Index select operators.
*/
#include "./index_select.h"
#include <graphbolt/cuda_ops.h>
#include <graphbolt/fused_csc_sampling_graph.h>
#include "./macro.h"
#include "./utils.h"
namespace graphbolt {
namespace ops {
......@@ -20,5 +22,36 @@ torch::Tensor IndexSelect(torch::Tensor input, torch::Tensor index) {
return input.index({index.to(torch::kLong)});
}
std::tuple<torch::Tensor, torch::Tensor> IndexSelectCSC(
torch::Tensor indptr, torch::Tensor indices, torch::Tensor nodes) {
TORCH_CHECK(
indices.sizes().size() == 1, "IndexSelectCSC only supports 1d tensors");
if (indices.is_pinned() && utils::is_accessible_from_gpu(indptr) &&
utils::is_accessible_from_gpu(nodes)) {
GRAPHBOLT_DISPATCH_CUDA_ONLY_DEVICE(
c10::DeviceType::CUDA, "UVAIndexSelectCSC",
{ return UVAIndexSelectCSCImpl(indptr, indices, nodes); });
} else if (
indices.device().type() == c10::DeviceType::CUDA &&
utils::is_accessible_from_gpu(indptr) &&
utils::is_accessible_from_gpu(nodes)) {
GRAPHBOLT_DISPATCH_CUDA_ONLY_DEVICE(
c10::DeviceType::CUDA, "nodesSelectCSC",
{ return IndexSelectCSCImpl(indptr, indices, nodes); });
}
// @todo: The CPU supports only integer dtypes for indices tensor.
TORCH_CHECK(
c10::isIntegralType(indices.scalar_type(), false),
"IndexSelectCSC is not implemented to slice noninteger types yet.");
torch::optional<torch::Tensor> temp;
torch::optional<sampling::FusedCSCSamplingGraph::NodeTypeToIDMap> temp2;
torch::optional<sampling::FusedCSCSamplingGraph::EdgeTypeToIDMap> temp3;
torch::optional<sampling::FusedCSCSamplingGraph::EdgeAttrMap> temp4;
sampling::FusedCSCSamplingGraph g(
indptr, indices, temp, temp, temp2, temp3, temp4);
const auto res = g.InSubgraph(nodes);
return std::make_tuple(res->indptr, res->indices);
}
} // namespace ops
} // namespace graphbolt
......@@ -11,8 +11,25 @@
namespace graphbolt {
namespace ops {
/** @brief Implemented in the cuda directory. */
torch::Tensor UVAIndexSelectImpl(torch::Tensor input, torch::Tensor index);
/**
* @brief Select columns for a sparse matrix in a CSC format according to nodes
* tensor.
*
* NOTE:
* 1. The shape of all tensors must be 1-D.
* 2. If indices is on pinned memory and nodes is on pinned memory or GPU
* memory, then UVAIndexSelectCSCImpl will be called. If indices is on GPU
* memory, then IndexSelectCSCImpl will be called. Otherwise,
* FusedCSCSamplingGraph::InSubgraph will be called.
*
* @param indptr Indptr tensor containing offsets with shape (N,).
* @param indices Indices tensor with edge information of shape (indptr[N],).
* @param nodes Nodes tensor with shape (M,).
* @return (torch::Tensor, torch::Tensor) Output indptr and indices tensors of
* shapes (M + 1,) and ((indptr[nodes + 1] - indptr[nodes]).sum(),).
*/
std::tuple<torch::Tensor, torch::Tensor> IndexSelectCSC(
torch::Tensor indptr, torch::Tensor indices, torch::Tensor nodes);
/**
* @brief Select rows from input tensor according to index tensor.
......
......@@ -71,6 +71,7 @@ TORCH_LIBRARY(graphbolt, m) {
m.def("unique_and_compact", &UniqueAndCompact);
m.def("isin", &IsIn);
m.def("index_select", &ops::IndexSelect);
m.def("index_select_csc", &ops::IndexSelectCSC);
m.def("set_seed", &RandomEngine::SetManualSeed);
}
......
/**
* Copyright (c) 2023 by Contributors
* @file utils.h
* @brief Graphbolt utils.
*/
#ifndef GRAPHBOLT_UTILS_H_
#define GRAPHBOLT_UTILS_H_
#include <torch/script.h>
namespace graphbolt {
namespace utils {
/**
* @brief Checks whether the tensor is stored on the GPU or the pinned memory.
*/
inline bool is_accessible_from_gpu(torch::Tensor tensor) {
return tensor.is_pinned() || tensor.device().type() == c10::DeviceType::CUDA;
}
} // namespace utils
} // namespace graphbolt
#endif // GRAPHBOLT_UTILS_H_
import unittest
import backend as F
import dgl.graphbolt as gb
import pytest
import torch
from .. import gb_test_utils
@unittest.skipIf(
F._default_context_str == "cpu",
reason="Tests for pinned memory are only meaningful on GPU.",
)
@pytest.mark.parametrize(
"indptr_dtype",
[torch.int32, torch.int64],
)
@pytest.mark.parametrize(
"indices_dtype",
[torch.int8, torch.int16, torch.int32, torch.int64],
)
@pytest.mark.parametrize("idtype", [torch.int32, torch.int64])
@pytest.mark.parametrize("is_pinned", [False, True])
def test_index_select_csc(indptr_dtype, indices_dtype, idtype, is_pinned):
"""Original graph in COO:
1 0 1 0 1 0
1 0 0 1 0 1
0 1 0 1 0 0
0 1 0 0 1 0
1 0 0 0 0 1
0 0 1 0 1 0
"""
indptr = torch.tensor([0, 3, 5, 7, 9, 12, 14], dtype=indptr_dtype)
indices = torch.tensor(
[0, 1, 4, 2, 3, 0, 5, 1, 2, 0, 3, 5, 1, 4], dtype=indices_dtype
)
index = torch.tensor([0, 5, 3], dtype=idtype)
cpu_indptr, cpu_indices = torch.ops.graphbolt.index_select_csc(
indptr, indices, index
)
if is_pinned:
indptr = indptr.pin_memory()
indices = indices.pin_memory()
else:
indptr = indptr.cuda()
indices = indices.cuda()
index = index.cuda()
gpu_indptr, gpu_indices = torch.ops.graphbolt.index_select_csc(
indptr, indices, index
)
assert not cpu_indptr.is_cuda
assert not cpu_indices.is_cuda
assert gpu_indptr.is_cuda
assert gpu_indices.is_cuda
assert torch.equal(cpu_indptr, gpu_indptr.cpu())
assert torch.equal(cpu_indices, gpu_indices.cpu())
def test_InSubgraphSampler_homo():
"""Original graph in COO:
1 0 1 0 1 0
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment