/*! * Copyright (c) 2021 by Contributors * \file nccl_api.cc * \brief Implementation of wrapper around NCCL routines. */ #include "nccl_api.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "cuda_common.h" #include "../../runtime/workspace.h" #include "../../partition/ndarray_partition.h" #include "../../kernel/cuda/atomic.cuh" #include "../../array/cuda/dgl_cub.cuh" #include "../../array/cuda/array_index_select.cuh" #define NCCL_CALL(func) \ { \ ncclResult_t result = func; \ if (result != ncclSuccess) { \ LOG(FATAL) \ << "NCCLError: " #func " failed with error: " << result; \ } \ } namespace dgl { using namespace kernel::cuda; using namespace partition; namespace runtime { namespace cuda { namespace { enum class AllToAllMode : int { REMAINDER = 0 }; template ncclDataType_t NCCLType(); template<> ncclDataType_t NCCLType() { return ncclInt32; } template<> ncclDataType_t NCCLType() { return ncclInt64; } template<> ncclDataType_t NCCLType<__half>() { return ncclHalf; } template<> ncclDataType_t NCCLType() { return ncclFloat32; } template<> ncclDataType_t NCCLType() { return ncclFloat64; } template __global__ void _DualPermKernel( const IdType * const in_idx, const DType * const in_value, const IdType * const perm, const int64_t num_in, const int64_t num_feat, IdType * const out_idx, DType * const out_value) { // set index permutation const int64_t tidx = blockDim.x*static_cast(blockIdx.x)+threadIdx.x; if (tidx < num_in) { const IdType perm_idx = perm[tidx]; assert(perm_idx < num_in); out_idx[tidx] = in_idx[perm_idx]; } if (num_feat > 1) { for (int d = 0; d < blockDim.x; ++d) { const int64_t bidx = blockDim.x*static_cast(blockIdx.x) + d; if (bidx < num_in) { const IdType perm_idx = perm[bidx]; for (int64_t f = threadIdx.x; f < num_feat; f+=blockDim.x) { out_value[bidx*num_feat+f] = in_value[perm_idx*num_feat+f]; } } } } else { if (tidx < num_in) { const IdType perm_idx = perm[tidx]; out_value[tidx] = in_value[perm_idx]; } } } template __global__ void _InversePermKernel( const DType* const array, const int64_t num_feat, int64_t length, const IdType* const perm, DType* const out) { int64_t in_row = blockIdx.x*blockDim.y+threadIdx.y; const int64_t stride = blockDim.y*gridDim.x; while (in_row < length) { int64_t col = threadIdx.x; const int64_t out_row = perm[in_row]; while (col < num_feat) { out[out_row*num_feat+col] = array[in_row*num_feat+col]; col += blockDim.x; } in_row += stride; } } template std::pair SparsePush( NCCLCommunicatorRef comm, IdArray in_idx, NDArray in_value, NDArrayPartitionRef part) { CHECK_EQ(in_idx->shape[0], in_value->shape[0]) << "Leading dimension of indices (" << in_idx->shape[0] << ") must match " "leading dimension of values (" << in_value->shape[0] << ")."; const auto& ctx = in_idx->ctx; CHECK_EQ(ctx, in_value->ctx) << "Indices and values must be on the same " "device"; auto device = DeviceAPI::Get(ctx); // TODO(dlasalle): Get the stream from the device context. cudaStream_t stream = 0; CHECK_EQ(in_idx->ndim, 1) << "Indices must be 1-dimensional"; const int64_t num_in = in_idx->shape[0]; int64_t num_feat = 1; for (int d = 1; d < in_value->ndim; ++d) { num_feat *= in_value->shape[d]; } const int64_t comm_size = comm->size(); if (comm_size == 1) { // nothing to do, just return original arrays return std::pair(in_idx, in_value); } std::pair part_perm = part->GeneratePermutation(in_idx); const IdType * const perm = static_cast(part_perm.first->data); const int64_t * const send_sum = static_cast(part_perm.second->data); Workspace send_idx(device, ctx, num_in); Workspace send_value(device, ctx, num_in*num_feat); // permute the indices and values { const dim3 block(256); const dim3 grid((num_in+block.x-1)/block.x); _DualPermKernel<<>>( static_cast(in_idx->data), static_cast(in_value->data), perm, num_in, num_feat, send_idx.get(), send_value.get()); CUDA_CALL(cudaGetLastError()); } // compute the prefix sum of the send values Workspace send_prefix(device, ctx, comm_size+1); { size_t prefix_workspace_size; CUDA_CALL(cub::DeviceScan::ExclusiveSum(nullptr, prefix_workspace_size, send_sum, send_prefix.get(), comm_size+1, stream)); Workspace prefix_workspace(device, ctx, prefix_workspace_size); CUDA_CALL(cub::DeviceScan::ExclusiveSum(prefix_workspace.get(), prefix_workspace_size, send_sum, send_prefix.get(), comm_size+1, stream)); } std::vector send_prefix_host(comm_size+1); device->CopyDataFromTo( send_prefix.get(), 0, send_prefix_host.data(), 0, send_prefix_host.size()*sizeof(*send_prefix.get()), ctx, DGLContext{kDLCPU, 0}, DGLType{kDLInt, sizeof(*send_prefix.get())*8, 1}, stream); send_prefix.free(); CHECK_EQ(send_prefix_host.back(), num_in) << "Internal Error: " "send_prefix_host.back() = " << send_prefix_host.back() << ", and num_in = " << num_in; // communicate the amount to send Workspace recv_sum(device, ctx, comm_size+1); comm->AllToAll(send_sum, recv_sum.get(), 1, stream); cudaEvent_t d2h; cudaEventCreate(&d2h); // compute the prefix sum of the recv values Workspace recv_prefix(device, ctx, comm_size+1); { size_t prefix_workspace_size; CUDA_CALL(cub::DeviceScan::ExclusiveSum(nullptr, prefix_workspace_size, recv_sum.get(), recv_prefix.get(), comm_size+1)); Workspace prefix_workspace(device, ctx, prefix_workspace_size); CUDA_CALL(cub::DeviceScan::ExclusiveSum(prefix_workspace.get(), prefix_workspace_size, recv_sum.get(), recv_prefix.get(), comm_size+1)); } recv_sum.free(); // finally copy the prefixsum sum down to the host std::vector recv_prefix_host(comm_size+1); device->CopyDataFromTo( recv_prefix.get(), 0, recv_prefix_host.data(), 0, recv_prefix_host.size()*sizeof(*recv_prefix.get()), ctx, DGLContext{kDLCPU, 0}, DGLType{kDLInt, sizeof(*recv_prefix.get())*8, 1}, stream); recv_prefix.free(); // use an event to track when copying is done cudaEventRecord(d2h, stream); // allocate output space cudaEventSynchronize(d2h); cudaEventDestroy(d2h); IdArray recv_idx = aten::NewIdArray( recv_prefix_host.back(), ctx, sizeof(IdType)*8); std::vector value_shape(in_value->ndim, 0); value_shape[0] = recv_prefix_host.back(); for (int d = 1; d < in_value->ndim; ++d) { value_shape[d] = in_value->shape[d]; } NDArray recv_value = NDArray::Empty(value_shape, in_value->dtype, ctx); // send data comm->SparseAllToAll( send_idx.get(), send_value.get(), num_feat, send_prefix_host.data(), static_cast(recv_idx->data), static_cast(recv_value->data), recv_prefix_host.data(), stream); return std::pair(recv_idx, recv_value); } template NDArray SparsePull( NCCLCommunicatorRef comm, IdArray req_idx, NDArray local_tensor, NDArrayPartitionRef part) { const auto& ctx = req_idx->ctx; CHECK_EQ(ctx, local_tensor->ctx) << "The request indices and set of local " "values must be on the same device"; auto device = DeviceAPI::Get(ctx); cudaStream_t stream = CUDAThreadEntry::ThreadLocal()->stream; CHECK_EQ(req_idx->ndim, 1) << "The tensor of requested indices must be of " "dimension one."; const int64_t num_in = req_idx->shape[0]; int64_t num_feat = 1; for (int d = 1; d < local_tensor->ndim; ++d) { num_feat *= local_tensor->shape[d]; } const int64_t comm_size = comm->size(); if (comm_size == 1) { // Just return index selection from current local_tensor return aten::IndexSelect(local_tensor, req_idx); } // First we need to send our requests to other processors. This means // re-ordering our index array to be contiguous among processors, and // counting the number of indices we are sending each processor. For now, // we assume a poorly partitioned graph, and that there exists the // possibility that each processor could request data from this one. // the buffer for us to re-order our requests in Workspace send_idx(device, ctx, num_in); std::pair part_perm = part->GeneratePermutation(req_idx); const IdType * const perm = static_cast(part_perm.first->data); const int64_t * const send_sum = static_cast(part_perm.second->data); // permute requests { const dim3 block(256); const dim3 grid((num_in+block.x-1)/block.x); aten::impl::IndexSelectSingleKernel<<>>( static_cast(req_idx->data), perm, num_in, send_idx.get()); CUDA_CALL(cudaGetLastError()); } // compute the prefix sum of the indexes this process is requesting Workspace request_prefix(device, ctx, comm_size+1); { size_t prefix_workspace_size; CUDA_CALL(cub::DeviceScan::ExclusiveSum(nullptr, prefix_workspace_size, send_sum, request_prefix.get(), comm_size+1, stream)); Workspace prefix_workspace(device, ctx, prefix_workspace_size); CUDA_CALL(cub::DeviceScan::ExclusiveSum(prefix_workspace.get(), prefix_workspace_size, send_sum, request_prefix.get(), comm_size+1, stream)); } cudaEvent_t d2h; cudaEventCreate(&d2h); std::vector request_prefix_host(comm_size+1); device->CopyDataFromTo( request_prefix.get(), 0, request_prefix_host.data(), 0, request_prefix_host.size()*sizeof(*request_prefix.get()), ctx, DGLContext{kDLCPU, 0}, DGLType{kDLInt, sizeof(*request_prefix.get())*8, 1}, stream); request_prefix.free(); CHECK_EQ(request_prefix_host.back(), num_in) << "Internal Error: " "request_prefix_host.back() = " << request_prefix_host.back() << ", num_in = " << num_in; // communicate the amount requested Workspace recv_sum(device, ctx, comm_size+1); comm->AllToAll(send_sum, recv_sum.get(), 1, stream); // compute the prefix sum of the requested indexes Workspace response_prefix(device, ctx, comm_size+1); { size_t prefix_workspace_size; CUDA_CALL(cub::DeviceScan::ExclusiveSum(nullptr, prefix_workspace_size, recv_sum.get(), response_prefix.get(), comm_size+1, stream)); Workspace prefix_workspace(device, ctx, prefix_workspace_size); CUDA_CALL(cub::DeviceScan::ExclusiveSum(prefix_workspace.get(), prefix_workspace_size, recv_sum.get(), response_prefix.get(), comm_size+1, stream)); } recv_sum.free(); // finally copy the prefixsum sum down to the host std::vector response_prefix_host(comm_size+1); device->CopyDataFromTo( response_prefix.get(), 0, response_prefix_host.data(), 0, response_prefix_host.size()*sizeof(*response_prefix.get()), ctx, DGLContext{kDLCPU, 0}, DGLType{kDLInt, sizeof(*response_prefix.get())*8, 1}, stream); response_prefix.free(); // use an event to track when copying is done cudaEventRecord(d2h, stream); // allocate output space cudaEventSynchronize(d2h); cudaEventDestroy(d2h); // gather requested indexes IdArray recv_idx = aten::NewIdArray( response_prefix_host.back(), ctx, sizeof(IdType)*8); comm->AllToAllV( send_idx.get(), request_prefix_host.data(), static_cast(recv_idx->data), response_prefix_host.data(), stream); send_idx.free(); // convert requested indices to local indices depending on partition if (response_prefix_host.back() > 0) { recv_idx = part->MapToLocal(recv_idx); } // and then index select them into place Workspace filled_response_value(device, ctx, response_prefix_host.back()*num_feat); if (request_prefix_host.back() > 0) { dim3 block(256, 1); while (block.x >= 2*num_feat) { block.x /= 2; block.y *= 2; } const dim3 grid((request_prefix_host.back()+block.y-1)/block.y); aten::impl::IndexSelectMultiKernel<<>>( static_cast(local_tensor->data), num_feat, static_cast(recv_idx->data), response_prefix_host.back(), filled_response_value.get()); CUDA_CALL(cudaGetLastError()); } // we will collect recieved values in this array std::vector value_shape(local_tensor->ndim, 0); value_shape[0] = request_prefix_host.back(); for (int d = 1; d < local_tensor->ndim; ++d) { value_shape[d] = local_tensor->shape[d]; } Workspace filled_request_value(device, ctx, request_prefix_host.back()*num_feat); // multiply the prefixes by the number of features being sent for (auto& v : request_prefix_host) { v *= num_feat; } for (auto& v : response_prefix_host) { v *= num_feat; } // send the values comm->AllToAllV( filled_response_value.get(), response_prefix_host.data(), filled_request_value.get(), request_prefix_host.data(), stream); filled_response_value.free(); // finally, we need to permute the values back into the requested order NDArray result = NDArray::Empty(value_shape, local_tensor->dtype, ctx); if (num_in > 0) { dim3 block(256, 1); while (block.x >= 2*num_feat) { block.x /= 2; block.y *= 2; } const dim3 grid((num_in+block.y-1)/block.y); _InversePermKernel<<>>( filled_request_value.get(), num_feat, num_in, perm, static_cast(result->data)); CUDA_CALL(cudaGetLastError()); } return result; } } // namespace /* NCCLUniqueId **************************************************************/ NCCLUniqueId::NCCLUniqueId() : id_() { // this ID is unique to the process, not to each call of this function NCCL_CALL(ncclGetUniqueId(&id_)); } ncclUniqueId NCCLUniqueId::Get() const { return id_; } std::string NCCLUniqueId::ToString() const { std::ostringstream oss; oss << std::hex; for (size_t b = 0; b < NCCL_UNIQUE_ID_BYTES; ++b) { const int num = static_cast(id_.internal[b]); oss << std::setw(2) << std::setfill('0') << num; } std::string result = oss.str(); CHECK_EQ(result.length(), NCCL_UNIQUE_ID_BYTES*2) << "Invalid NCCL ID format: '" << result << "'"; return result; } void NCCLUniqueId::FromString( const std::string& str) { // must be exactly 256 hex characters CHECK_EQ(str.length(), NCCL_UNIQUE_ID_BYTES * 2) << "Invalid NCCL ID format: '" << str << "'"; for (size_t b = 0; b < NCCL_UNIQUE_ID_BYTES; ++b) { id_.internal[b] = std::strtol(str.substr(b*2, 2).c_str(), nullptr, 16); } } /* NCCLCommunicator **********************************************************/ NCCLCommunicator::NCCLCommunicator( const int size, const int rank, ncclUniqueId id) : comm_(), size_(size), rank_(rank) { CHECK_LT(rank, size) << "The rank (" << rank << ") must be smaller than " "the size of the communicator (" << size << ")."; CHECK_GE(rank, 0) << "The rank (" << rank << ") must be greater than or " "equal to 0."; NCCL_CALL(ncclCommInitRank(&comm_, size_, id, rank_)); } NCCLCommunicator::~NCCLCommunicator() { ncclCommDestroy(comm_); } ncclComm_t NCCLCommunicator::Get() { return comm_; } template void NCCLCommunicator::AllToAllV( const DType * const send, const int64_t * const send_prefix, DType * const recv, const int64_t * const recv_prefix, cudaStream_t stream) { const ncclDataType_t type = NCCLType(); NCCL_CALL(ncclGroupStart()); for (int r = 0; r < size_; ++r) { const int64_t send_size = send_prefix[r+1]-send_prefix[r]; if (send_size > 0) { NCCL_CALL(ncclSend(send+send_prefix[r], send_size, type, r, comm_, stream)); } const int64_t recv_size = recv_prefix[r+1]-recv_prefix[r]; if (recv_size > 0) { NCCL_CALL(ncclRecv(recv+recv_prefix[r], recv_size, type, r, comm_, stream)); } } NCCL_CALL(ncclGroupEnd()); } template void NCCLCommunicator::AllToAllV( const int32_t * const send, const int64_t * send_prefix, int32_t * const recv, const int64_t * recv_prefix, cudaStream_t stream); template void NCCLCommunicator::AllToAllV( const int64_t * const send, const int64_t * send_prefix, int64_t * const recv, const int64_t * recv_prefix, cudaStream_t stream); template void NCCLCommunicator::AllToAllV( const float * const send, const int64_t * send_prefix, float * const recv, const int64_t * recv_prefix, cudaStream_t stream); template void NCCLCommunicator::AllToAllV<__half>( const __half * const send, const int64_t * send_prefix, __half * const recv, const int64_t * recv_prefix, cudaStream_t stream); template void NCCLCommunicator::AllToAll( const IdType * const send, IdType * const recv, const int64_t count, cudaStream_t stream) { const ncclDataType_t type = NCCLType(); ncclGroupStart(); for (int r = 0; r < size_; ++r) { ncclSend(send+(r*count), count, type, r, comm_, stream); ncclRecv(recv+(r*count), count, type, r, comm_, stream); } ncclGroupEnd(); } template void NCCLCommunicator::AllToAll( const int32_t * const send, int32_t * const recv, const int64_t count, cudaStream_t stream); template void NCCLCommunicator::AllToAll( const int64_t * const send, int64_t * const recv, const int64_t count, cudaStream_t stream); template void NCCLCommunicator::SparseAllToAll( const IdType * const send_idx, const DType * const send_value, const int64_t num_feat, const int64_t * const send_prefix, IdType * const recv_idx, DType * const recv_value, const int64_t * const recv_prefix, cudaStream_t stream) { const ncclDataType_t idx_type = NCCLType(); const ncclDataType_t value_type = NCCLType(); ncclGroupStart(); for (int r = 0; r < size_; ++r) { const int64_t send_size = send_prefix[r+1]-send_prefix[r]; if (send_size > 0) { ncclSend(send_idx+send_prefix[r], send_size, idx_type, r, comm_, stream); ncclSend(send_value+send_prefix[r]*num_feat, send_size*num_feat, value_type, r, comm_, stream); } const int64_t recv_size = recv_prefix[r+1]-recv_prefix[r]; if (recv_size > 0) { ncclRecv(recv_idx+recv_prefix[r], recv_size, idx_type, r, comm_, stream); ncclRecv(recv_value+recv_prefix[r]*num_feat, recv_size*num_feat, value_type, r, comm_, stream); } } ncclGroupEnd(); } template void NCCLCommunicator::SparseAllToAll( const int32_t * const send_idx, const __half * const send_value, const int64_t num_feat, const int64_t * const send_prefix, int32_t * const recv_idx, __half * const recv_value, const int64_t * const recv_prefix, cudaStream_t stream); template void NCCLCommunicator::SparseAllToAll( const int64_t * const send_idx, const __half * const send_value, const int64_t num_feat, const int64_t * const send_prefix, int64_t * const recv_idx, __half * const recv_value, const int64_t * const recv_prefix, cudaStream_t stream); int NCCLCommunicator::size() const { return size_; } int NCCLCommunicator::rank() const { return rank_; } /* CAPI **********************************************************************/ DGL_REGISTER_GLOBAL("cuda.nccl._CAPI_DGLNCCLGetUniqueId") .set_body([] (DGLArgs args, DGLRetValue* rv) { *rv = NCCLUniqueIdRef(std::make_shared()); }); DGL_REGISTER_GLOBAL("cuda.nccl._CAPI_DGLNCCLUniqueIdToString") .set_body([] (DGLArgs args, DGLRetValue* rv) { NCCLUniqueIdRef idObj = args[0]; *rv = idObj->ToString(); }); DGL_REGISTER_GLOBAL("cuda.nccl._CAPI_DGLNCCLUniqueIdFromString") .set_body([] (DGLArgs args, DGLRetValue* rv) { const std::string str = args[0]; NCCLUniqueIdRef ref(std::make_shared()); ref->FromString(str); *rv = ref; }); DGL_REGISTER_GLOBAL("cuda.nccl._CAPI_DGLNCCLCreateComm") .set_body([] (DGLArgs args, DGLRetValue* rv) { const int size = args[0]; const int rank = args[1]; NCCLUniqueIdRef idObj = args[2]; *rv = NCCLCommunicatorRef(std::make_shared(size, rank, idObj->Get())); }); DGL_REGISTER_GLOBAL("cuda.nccl._CAPI_DGLNCCLSparseAllToAllPush") .set_body([] (DGLArgs args, DGLRetValue* rv) { NCCLCommunicatorRef comm = args[0]; IdArray in_idx = args[1]; NDArray in_values = args[2]; NDArrayPartitionRef part = args[3]; List ret; ATEN_ID_TYPE_SWITCH(in_idx->dtype, IdType, { ATEN_DTYPE_SWITCH(in_values->dtype, DType, "values", { auto result = SparsePush(comm, in_idx, in_values, part); ret.push_back(Value(MakeValue(result.first))); ret.push_back(Value(MakeValue(result.second))); }); }); *rv = ret; }); DGL_REGISTER_GLOBAL("cuda.nccl._CAPI_DGLNCCLSparseAllToAllPull") .set_body([] (DGLArgs args, DGLRetValue* rv) { NCCLCommunicatorRef comm = args[0]; // the indexes this process is requesting from others IdArray req_idx = args[1]; // the tensor this process has to fulfill other requests NDArray tensor = args[2]; NDArrayPartitionRef part = args[3]; ATEN_ID_TYPE_SWITCH(req_idx->dtype, IdType, { ATEN_DTYPE_SWITCH(tensor->dtype, DType, "values", { *rv = SparsePull(comm, req_idx, tensor, part); }); }); }); } // namespace cuda } // namespace runtime } // namespace dgl