/*! * Copyright (c) 2021 by Contributors * \file ndarray_partition.h * \brief Operations on partition implemented in CUDA. */ #include "../partition_op.h" #include #include "../../array/cuda/dgl_cub.cuh" #include "../../runtime/cuda/cuda_common.h" #include "../../runtime/workspace.h" using namespace dgl::runtime; namespace dgl { namespace partition { namespace impl { template __global__ void _MapProcByRemainder( const IdType * const index, const int64_t num_index, const int64_t num_proc, IdType * const proc_id) { const int64_t idx = blockDim.x*static_cast(blockIdx.x)+threadIdx.x; if (idx < num_index) { proc_id[idx] = index[idx] % num_proc; } } template __global__ void _MapProcByMaskRemainder( const IdType * const index, const int64_t num_index, const IdType mask, IdType * const proc_id) { const int64_t idx = blockDim.x*static_cast(blockIdx.x)+threadIdx.x; if (idx < num_index) { proc_id[idx] = index[idx] & mask; } } template __global__ void _MapLocalIndexByRemainder( const IdType * const in, IdType * const out, const int64_t num_items, const int comm_size) { const int64_t idx = threadIdx.x+blockDim.x*blockIdx.x; if (idx < num_items) { out[idx] = in[idx] / comm_size; } } template std::pair GeneratePermutationFromRemainder( int64_t array_size, int num_parts, IdArray in_idx) { std::pair result; const auto& ctx = in_idx->ctx; auto device = DeviceAPI::Get(ctx); cudaStream_t stream = CUDAThreadEntry::ThreadLocal()->stream; const int64_t num_in = in_idx->shape[0]; CHECK_GE(num_parts, 1) << "The number of partitions (" << num_parts << ") must be at least 1."; if (num_parts == 1) { // no permutation result.first = aten::Range(0, num_in, sizeof(IdType)*8, ctx); result.second = aten::Full(num_in, num_parts, sizeof(int64_t)*8, ctx); return result; } result.second = aten::Full(0, num_parts, sizeof(int64_t)*8, ctx); int64_t * out_counts = static_cast(result.second->data); if (num_in == 0) { // now that we've zero'd out_counts, nothing left to do for an empty // mapping return result; } const int64_t part_bits = static_cast(std::ceil(std::log2(num_parts))); // First, generate a mapping of indexes to processors Workspace proc_id_in(device, ctx, num_in); { const dim3 block(256); const dim3 grid((num_in+block.x-1)/block.x); if (num_parts < (1 << part_bits)) { // num_parts is not a power of 2 CUDA_KERNEL_CALL(_MapProcByRemainder, grid, block, 0, stream, static_cast(in_idx->data), num_in, num_parts, proc_id_in.get()); } else { // num_parts is a power of 2 CUDA_KERNEL_CALL(_MapProcByMaskRemainder, grid, block, 0, stream, static_cast(in_idx->data), num_in, static_cast(num_parts-1), // bit mask proc_id_in.get()); } } // then create a permutation array that groups processors together by // performing a radix sort Workspace proc_id_out(device, ctx, num_in); result.first = aten::NewIdArray(num_in, ctx, sizeof(IdType)*8); IdType * perm_out = static_cast(result.first->data); { IdArray perm_in = aten::Range(0, num_in, sizeof(IdType)*8, ctx); size_t sort_workspace_size; CUDA_CALL(cub::DeviceRadixSort::SortPairs(nullptr, sort_workspace_size, proc_id_in.get(), proc_id_out.get(), static_cast(perm_in->data), perm_out, num_in, 0, part_bits, stream)); Workspace sort_workspace(device, ctx, sort_workspace_size); CUDA_CALL(cub::DeviceRadixSort::SortPairs(sort_workspace.get(), sort_workspace_size, proc_id_in.get(), proc_id_out.get(), static_cast(perm_in->data), perm_out, num_in, 0, part_bits, stream)); } // explicitly free so workspace can be re-used proc_id_in.free(); // perform a histogram and then prefixsum on the sorted proc_id vector // Count the number of values to be sent to each processor { using AtomicCount = unsigned long long; // NOLINT static_assert(sizeof(AtomicCount) == sizeof(*out_counts), "AtomicCount must be the same width as int64_t for atomicAdd " "in cub::DeviceHistogram::HistogramEven() to work"); // TODO(dlasalle): Once https://github.com/NVIDIA/cub/pull/287 is merged, // add a compile time check against the cub version to allow // num_in > (2 << 31). CHECK(num_in < static_cast(std::numeric_limits::max())) << "number of values to insert into histogram must be less than max " "value of int."; size_t hist_workspace_size; CUDA_CALL(cub::DeviceHistogram::HistogramEven( nullptr, hist_workspace_size, proc_id_out.get(), reinterpret_cast(out_counts), num_parts+1, static_cast(0), static_cast(num_parts+1), static_cast(num_in), stream)); Workspace hist_workspace(device, ctx, hist_workspace_size); CUDA_CALL(cub::DeviceHistogram::HistogramEven( hist_workspace.get(), hist_workspace_size, proc_id_out.get(), reinterpret_cast(out_counts), num_parts+1, static_cast(0), static_cast(num_parts+1), static_cast(num_in), stream)); } return result; } template std::pair GeneratePermutationFromRemainder( int64_t array_size, int num_parts, IdArray in_idx); template std::pair GeneratePermutationFromRemainder( int64_t array_size, int num_parts, IdArray in_idx); template IdArray MapToLocalFromRemainder( const int num_parts, IdArray global_idx) { const auto& ctx = global_idx->ctx; cudaStream_t stream = CUDAThreadEntry::ThreadLocal()->stream; IdArray local_idx = aten::NewIdArray(global_idx->shape[0], ctx, sizeof(IdType)*8); const dim3 block(128); const dim3 grid((global_idx->shape[0] +block.x-1)/block.x); CUDA_KERNEL_CALL( _MapLocalIndexByRemainder, grid, block, 0, stream, static_cast(global_idx->data), static_cast(local_idx->data), global_idx->shape[0], num_parts); return local_idx; } template IdArray MapToLocalFromRemainder( int num_parts, IdArray in_idx); template IdArray MapToLocalFromRemainder( int num_parts, IdArray in_idx); } // namespace impl } // namespace partition } // namespace dgl