Unverified Commit 8d5d8962 authored by Xin Yao's avatar Xin Yao Committed by GitHub
Browse files

[Refactor] Replace third_party/nccl with PyTorch's NCCL backend (#4989)

* expose GeneratePermutation

* add sparse_all_to_all_push

* add sparse_all_to_all_pull

* add unit test

* handle world_size=1

* remove python nccl wrapper

* remove the nccl dependency

* use pinned memory to speedup D2H copy

* fix lint

* resolve comments

* fix lint

* fix ut

* resolve comments
parent b1ec112e
......@@ -22,9 +22,6 @@
[submodule "third_party/nanoflann"]
path = third_party/nanoflann
url = https://github.com/jlblancoc/nanoflann
[submodule "third_party/nccl"]
path = third_party/nccl
url = https://github.com/nvidia/nccl
[submodule "third_party/libxsmm"]
path = third_party/libxsmm
url = https://github.com/hfp/libxsmm.git
......
......@@ -23,8 +23,6 @@ endif()
# and add set(OPTION VALUE) to override these build options.
# Alernatively, use cmake -DOPTION=VALUE through command-line.
dgl_option(USE_CUDA "Build with CUDA" OFF)
dgl_option(USE_NCCL "Build with NCCL support" OFF)
dgl_option(USE_SYSTEM_NCCL "Build using system's NCCL library" OFF)
dgl_option(USE_OPENMP "Build with OpenMP" ON)
dgl_option(USE_AVX "Build with AVX optimization" OFF)
dgl_option(USE_LIBXSMM "Build with LIBXSMM library optimization" ON)
......@@ -171,25 +169,7 @@ list(APPEND DGL_SRC ${DGL_RPC_SRC})
if(USE_CUDA)
dgl_config_cuda(DGL_CUDA_SRC)
list(APPEND DGL_SRC ${DGL_CUDA_SRC})
if(USE_NCCL)
add_definitions(-DDGL_USE_NCCL)
if (USE_SYSTEM_NCCL)
include(cmake/util/FindNccl.cmake)
include_directories(${NCCL_INCLUDE_DIR})
else()
include(cmake/modules/NCCL.cmake)
cuda_include_directories(BEFORE ${NCCL_INCLUDE_DIR})
endif()
endif(USE_NCCL)
list(APPEND DGL_LINKER_LIBS ${NCCL_LIBRARY})
endif(USE_CUDA)
if(USE_CUDA)
cuda_add_library(dgl SHARED ${DGL_SRC})
if (USE_NCCL AND NOT USE_SYSTEM_NCCL)
add_dependencies(dgl nccl_external)
endif()
else(USE_CUDA)
add_library(dgl SHARED ${DGL_SRC})
endif(USE_CUDA)
......
include(ExternalProject)
# set path to submodule
set(NCCL_BUILD_DIR "${CMAKE_CURRENT_BINARY_DIR}/nccl")
# NCCL doesn't have CMAKE, so build externally
ExternalProject_Add(nccl_external
SOURCE_DIR ${PROJECT_SOURCE_DIR}/third_party/nccl
BUILD_IN_SOURCE 1
CONFIGURE_COMMAND ""
BUILD_COMMAND
env
make
"src.build"
"-j"
"BUILDDIR=${NCCL_BUILD_DIR}"
BUILD_BYPRODUCTS "${NCCL_BUILD_DIR}/lib/libnccl_static.a"
INSTALL_COMMAND ""
)
# set output variables
set(NCCL_FOUND TRUE)
set(NCCL_LIBRARY "${NCCL_BUILD_DIR}/lib/libnccl_static.a")
set(NCCL_INCLUDE_DIR "${NCCL_BUILD_DIR}/include")
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Tries to find NCCL headers and libraries.
#
# Usage of this module as follows:
#
# find_package(NCCL)
#
# Variables used by this module, they can change the default behaviour and need
# to be set before calling find_package:
#
# NCCL_ROOT - When set, this path is inspected instead of standard library
# locations as the root of the NCCL installation.
# The environment variable NCCL_ROOT overrides this variable.
#
# This module defines
# Nccl_FOUND, whether nccl has been found
# NCCL_INCLUDE_DIR, directory containing header
# NCCL_LIBRARY, directory containing nccl library
# NCCL_LIB_NAME, nccl library name
# USE_NCCL_LIB_PATH, when set, NCCL_LIBRARY path is also inspected for the
# location of the nccl library. This would disable
# switching between static and shared.
#
# This module assumes that the user has already called find_package(CUDA)
#
# This file is from https://github.com/dmlc/xgboost, with modifications to
# check the version.
if (NCCL_LIBRARY)
if(NOT USE_NCCL_LIB_PATH)
# Don't cache NCCL_LIBRARY to enable switching between static and shared.
unset(NCCL_LIBRARY CACHE)
endif(NOT USE_NCCL_LIB_PATH)
endif()
if (BUILD_WITH_SHARED_NCCL)
# libnccl.so
set(NCCL_LIB_NAME nccl)
else ()
# libnccl_static.a
set(NCCL_LIB_NAME nccl_static)
endif (BUILD_WITH_SHARED_NCCL)
find_path(NCCL_INCLUDE_DIR
NAMES nccl.h
PATHS $ENV{NCCL_ROOT}/include ${NCCL_ROOT}/include)
# make sure it has point to point support
file(STRINGS "${NCCL_INCLUDE_DIR}/nccl.h" NCCL_VERSION_CODE REGEX "^#define[ \t]+NCCL_VERSION_CODE[ \t]+[0-9]+.*$" LIMIT_COUNT 1)
string(REGEX REPLACE "^.*NCCL_VERSION_CODE[ \t]+([0-9]+).*$" "\\1" NCCL_VERSION "${NCCL_VERSION_CODE}")
find_library(NCCL_LIBRARY
NAMES ${NCCL_LIB_NAME}
PATHS $ENV{NCCL_ROOT}/lib/ ${NCCL_ROOT}/lib)
if ("${NCCL_VERSION}" LESS "2700")
message(FATAL_ERROR "Require nccl >= 2700, but found ${NCCL_LIBRARY}==${NCCL_VERSION}")
else()
message(STATUS "Using nccl library: ${NCCL_LIBRARY} ${NCCL_VERSION}")
endif()
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(Nccl DEFAULT_MSG
NCCL_INCLUDE_DIR NCCL_LIBRARY)
mark_as_advanced(
NCCL_INCLUDE_DIR
NCCL_LIBRARY
)
""" CUDA wrappers """
from . import nccl
from .. import backend as F
if F.get_preferred_backend() == "pytorch":
from . import nccl
"""API creating NCCL communicators."""
"""API wrapping NCCL primitives."""
from .. import backend as F
from .._ffi.function import _init_api
import torch
import torch.distributed as dist
_COMM_MODES_MAP = {"remainder": 0}
def sparse_all_to_all_push(idx, value, partition):
"""Perform an all-to-all-v operation, where by all processors send out
a set of indices and corresponding values. Indices and values,
corresponding to the current process, will copied into the output
arrays.
class UniqueId(object):
"""Class for allowing python code to create and communicate NCCL Unique
IDs, needed for creating communicators.
"""
Note: This method requires 'torch.distributed.get_backend() == "nccl"'.
def __init__(self, id_str=None):
"""Create an object reference the current NCCL unique id."""
if id_str:
if isinstance(id_str, bytes):
id_str = id_str.decode("utf-8")
self._handle = _CAPI_DGLNCCLUniqueIdFromString(id_str)
else:
self._handle = _CAPI_DGLNCCLGetUniqueId()
def get(self):
"""Get the C-handle for this object."""
return self._handle
def __str__(self):
return _CAPI_DGLNCCLUniqueIdToString(self._handle)
def __repr__(self):
return "UniqueId[{}]".format(str(self))
def __eq__(self, other):
return str(self) == str(other)
class Communicator(object):
"""High-level wrapper for NCCL communication."""
def __init__(self, size, rank, unique_id):
"""Create a new NCCL communicator.
Parameters
----------
size : int
The number of processes in the communicator.
rank : int
The rank of the current process in the communicator.
unique_id : NCCLUniqueId
The unique id of the root process (rank=0).
Examples
--------
>>> from dgl.cuda.nccl import Communicator, UniqueId
The root process will generate a unique NCCL id and communicate it
to the other processes.
>>> uid = UniqueId()
>>> store.set('nccl_root_id', str(uid))
And all other processes create unique ids from the root processes.
>>> uid = UniqueId(store.get('nccl_root_id'))
Then, all processes should create the communicator.
>>> comm = Communicator(world_size, rank, uid)
"""
assert rank < size, (
"The rank of a process must be less than the "
"size of the communicator."
)
self._handle = _CAPI_DGLNCCLCreateComm(size, rank, unique_id.get())
self._rank = rank
self._size = size
def sparse_all_to_all_push(self, idx, value, partition):
"""Perform an all-to-all-v operation, where by all processors send out
a set of indices and corresponding values. Indices and values,
corresponding to the current process, will copied into the output
arrays.
Parameters
----------
idx : tensor
The 1D set of indices to send to other processors.
value : tensor
The multi-dimension set of values to send to other processors.
The first dimension must match that of `idx`.
partition : NDArrayPartition
The object containing information for assigning indices to
processors.
Returns
-------
tensor
The 1D tensor of the recieved indices.
tensor
The set of recieved values.
Examples
--------
To perform a sparse_all_to_all_push(), a partition object must be
provided. A partition of a homgeonous graph, where the vertices are
striped across processes can be generated via:
>>> from dgl.partition import NDArrayPartition
>>> part = NDArrayPartition(g.num_nodes(), comm.size(), mode='remainder' )
With this partition, each processor can send values to be associatd
with vertices in the graph. So if we have an array `global_idxs` of all of
the neighbors updated during mini-batch processing, and an array
`global_values` containing the new values associated with the neighbors,
we communicate them to the own processes via:
>>> my_idxs, my_values = comm.sparse_all_to_all_push(global_idxs, global_values, part)
This communication pattern is common when communicating gradient
updates for node embeddings.
Indices the current process owns, do not need to treated specially,
as internally they will be copied to the output array. If we have a
set of indices in process 0 '[0, 3, 8, 9, 10]` and for process 1
'[0, 2, 4, 5, 8, 8, 9]'. Using a remainder partition will result
indices for processe 0 of '[0, 8, 10, 0, 2, 4, 8, 8]', and for
process 1 of '[3, 9, 5, 9]'.
"""
out_idx, out_value = _CAPI_DGLNCCLSparseAllToAllPush(
self.get(),
F.zerocopy_to_dgl_ndarray(idx),
F.zerocopy_to_dgl_ndarray(value),
partition.get(),
)
return (
F.zerocopy_from_dgl_ndarray(out_idx),
F.zerocopy_from_dgl_ndarray(out_value),
)
def sparse_all_to_all_pull(self, req_idx, value, partition):
"""Perform an all-to-all-v operation, where by all processors request
the values corresponding to their set of indices.
Parameters
----------
req_idx : IdArray
The set of indices this processor is requesting.
value : NDArray
The multi-dimension set of values that can be requested from
this processor.
partition : NDArrayPartition
The object containing information for assigning indices to
processors.
Returns
-------
tensor
The set of recieved values, corresponding to `req_idx`.
Examples
--------
To perform a sparse_all_to_all_pull(), a partition object must be
provided. A partition of a homgeonous graph, where the vertices are
striped across processes can be generated via:
>>> from dgl.partition import NDArrayPartition
>>> part = NDArrayPartition(g.num_nodes(), comm.size(), mode='remainder' )
With this partition, each processor can request values/features
associated with vertices in the graph. So in the case where we have
a set of neighbors 'nbr_idxs' we need features for, and each process
has a tensor 'node_feat' storing the features of nodes it owns in
the partition, the features can be requested via:
>>> nbr_values = comm.sparse_all_to_all_pull(nbr_idxs, node_feat, part)
Then two the arrays 'nbr_idxs' and 'nbr_values' forms the sparse
set of features, where 'nbr_idxs[i]' is the global node id, and
'nbr_values[i]' is the feature vector for that node. This
communication pattern is useful for node features or node
embeddings.
"""
out_value = _CAPI_DGLNCCLSparseAllToAllPull(
self.get(),
F.zerocopy_to_dgl_ndarray(req_idx),
F.zerocopy_to_dgl_ndarray(value),
partition.get(),
)
return F.zerocopy_from_dgl_ndarray(out_value)
def get(self):
"""Get the C-Handle for this object."""
return self._handle
def rank(self):
"""Get the rank of this process in this communicator.
Returns
-------
int
The rank of this process.
"""
return self._rank
def size(self):
"""Get the size of this communicator.
Returns
-------
int
The number of processes in this communicator.
"""
return self._size
def is_supported():
"""Check if DGL was built with NCCL support.
Parameters
----------
idx : torch.Tensor
The 1D set of indices to send to other processors.
value : torch.Tensor
The multi-dimension set of values to send to other processors.
The first dimension must match that of `idx`.
partition : NDArrayPartition
The object containing information for assigning indices to
processors.
Returns
-------
bool
True if NCCL support was built in.
torch.Tensor
The 1D tensor of the recieved indices.
torch.Tensor
The set of recieved values.
Examples
--------
To perform a sparse_all_to_all_push(), a partition object must be
provided. A partition of a homgeonous graph, where the vertices are
striped across processes can be generated via:
>>> from dgl.partition import NDArrayPartition
>>> part = NDArrayPartition(g.num_nodes(), world_size, mode='remainder')
With this partition, each processor can send values to be associatd
with vertices in the graph. So if we have an array `global_idxs` of all of
the neighbors updated during mini-batch processing, and an array
`global_values` containing the new values associated with the neighbors,
we communicate them to the own processes via:
>>> my_idxs, my_values = nccl.sparse_all_to_all_push(global_idxs, global_values, part)
This communication pattern is common when communicating gradient
updates for node embeddings.
Indices the current process owns, do not need to treated specially,
as internally they will be copied to the output array. If we have a
set of indices in process 0 '[0, 3, 8, 9, 10]` and for process 1
'[0, 2, 4, 5, 8, 8, 9]'. Using a remainder partition will result
indices for processe 0 of '[0, 8, 10, 0, 2, 4, 8, 8]', and for
process 1 of '[3, 9, 5, 9]'.
"""
return _CAPI_DGLNCCLHasSupport()
if not dist.is_initialized() or dist.get_world_size() == 1:
return idx, value
assert (
dist.get_backend() == "nccl"
), "requires NCCL backend to communicate CUDA tensors."
perm, send_splits = partition.generate_permutation(idx)
perm = perm.long()
# Get receive splits.
recv_splits = torch.empty_like(send_splits)
dist.all_to_all_single(recv_splits, send_splits)
# Use pinned memory to speedup D2H copy.
recv_splits = recv_splits.to("cpu", non_blocking=True)
send_splits = send_splits.to("cpu", non_blocking=True)
send_idx = idx[perm]
send_value = value[perm]
# Wait D2H copy finish.
torch.cuda.current_stream().synchronize()
recv_sum = recv_splits.sum()
recv_splits = recv_splits.tolist()
send_splits = send_splits.tolist()
# Send idx.
recv_idx = torch.empty((recv_sum,), dtype=idx.dtype, device=idx.device)
dist.all_to_all_single(recv_idx, send_idx, recv_splits, send_splits)
# Send value.
recv_value = torch.empty(
(recv_sum, *value.shape[1:]), dtype=value.dtype, device=value.device
)
dist.all_to_all_single(recv_value, send_value, recv_splits, send_splits)
return recv_idx, recv_value
def sparse_all_to_all_pull(req_idx, value, partition):
"""Perform an all-to-all-v operation, where by all processors request
the values corresponding to their set of indices.
Note: This method requires 'torch.distributed.get_backend() == "nccl"'.
Parameters
----------
req_idx : torch.Tensor
The set of indices this processor is requesting.
value : torch.Tensor
The multi-dimension set of values that can be requested from
this processor.
partition : NDArrayPartition
The object containing information for assigning indices to
processors.
Returns
-------
torch.Tensor
The set of recieved values, corresponding to `req_idx`.
Examples
--------
To perform a sparse_all_to_all_pull(), a partition object must be
provided. A partition of a homgeonous graph, where the vertices are
striped across processes can be generated via:
_init_api("dgl.cuda.nccl")
>>> from dgl.partition import NDArrayPartition
>>> part = NDArrayPartition(g.num_nodes(), world_size, mode='remainder')
With this partition, each processor can request values/features
associated with vertices in the graph. So in the case where we have
a set of neighbors 'nbr_idxs' we need features for, and each process
has a tensor 'node_feat' storing the features of nodes it owns in
the partition, the features can be requested via:
>>> nbr_values = nccl.sparse_all_to_all_pull(nbr_idxs, node_feat, part)
Then two the arrays 'nbr_idxs' and 'nbr_values' forms the sparse
set of features, where 'nbr_idxs[i]' is the global node id, and
'nbr_values[i]' is the feature vector for that node. This
communication pattern is useful for node features or node
embeddings.
"""
if not dist.is_initialized() or dist.get_world_size() == 1:
return value[req_idx.long()]
assert (
dist.get_backend() == "nccl"
), "requires NCCL backend to communicate CUDA tensors."
perm, req_splits = partition.generate_permutation(req_idx)
perm = perm.long()
# Get response splits.
resp_splits = torch.empty_like(req_splits)
dist.all_to_all_single(resp_splits, req_splits)
# Use pinned memory to speedup D2H copy.
resp_splits = resp_splits.to("cpu", non_blocking=True)
req_splits = req_splits.to("cpu", non_blocking=True)
req_idx = req_idx[perm]
# Wait D2H copy finish.
torch.cuda.current_stream().synchronize()
resp_sum = resp_splits.sum()
resp_splits = resp_splits.tolist()
req_splits = req_splits.tolist()
# Gather requested indices.
resp_idx = torch.empty(
(resp_sum,), dtype=req_idx.dtype, device=req_idx.device
)
dist.all_to_all_single(resp_idx, req_idx, resp_splits, req_splits)
# Convert requested indices to local indices depending on partition.
if resp_sum > 0:
resp_idx = partition.map_to_local(resp_idx)
# Collect the request value.
req_value = torch.empty(
(req_idx.size(0), *value.shape[1:]),
dtype=value.dtype,
device=value.device,
)
dist.all_to_all_single(req_value, value[resp_idx], req_splits, resp_splits)
# Permute the value back into the requested order.
return_value = torch.empty_like(req_value)
return_value[perm] = req_value
return return_value
......@@ -9,7 +9,6 @@ from ...partition import NDArrayPartition
from ...utils import create_shared_mem_array, get_shared_mem_array
_STORE = None
_COMM = None
class NodeEmbedding: # NodeEmbedding
......@@ -78,7 +77,6 @@ class NodeEmbedding: # NodeEmbedding
partition=None,
):
global _STORE
global _COMM
if device is None:
device = th.device("cpu")
......@@ -132,25 +130,7 @@ class NodeEmbedding: # NodeEmbedding
)
self._tensor = emb
else: # embeddings is stored in GPU memory.
# setup nccl communicator
if _COMM is None:
if rank < 0:
_COMM = nccl.Communicator(1, 0, nccl.UniqueId())
else:
# needs to be set for nccl to work
th.cuda.set_device(device)
if rank == 0:
# root process broadcasts nccl id
nccl_id = nccl.UniqueId()
self._store.set("nccl_root_id_sparse_emb", str(nccl_id))
else:
nccl_id = nccl.UniqueId(
self._store.get("nccl_root_id_sparse_emb")
)
_COMM = nccl.Communicator(
self._world_size, self._rank, nccl_id
)
self._comm = _COMM
self._comm = True
if not self._partition:
# for communication we need a partition
......@@ -161,7 +141,7 @@ class NodeEmbedding: # NodeEmbedding
)
# create local tensors for the weights
local_size = self._partition.local_size(self._comm.rank())
local_size = self._partition.local_size(max(self._rank, 0))
# TODO(dlasalle): support 16-bit/half embeddings
emb = th.empty(
......@@ -187,15 +167,15 @@ class NodeEmbedding: # NodeEmbedding
device : th.device
Target device to put the collected embeddings.
"""
if not self._comm or self._comm.size() == 1:
if not self._comm:
# embeddings are stored on the CPU
emb = self._tensor[node_ids].to(device)
else:
if self.world_size > 0:
emb = self._comm.sparse_all_to_all_pull(
node_ids, self._tensor, self._partition
)
else:
emb = self._tensor[node_ids]
# embeddings are stored on the GPU
# the following method also covers self._world_size = 0 or 1
emb = nccl.sparse_all_to_all_pull(
node_ids, self._tensor, self._partition
)
emb = emb.to(device)
if F.is_recording():
emb = F.attach_grad(emb)
......@@ -215,18 +195,6 @@ class NodeEmbedding: # NodeEmbedding
"""
return self._store
@property
def comm(self):
"""Return dgl.cuda.nccl.Communicator for data
sharing across processes.
Returns
-------
dgl.cuda.nccl.Communicator
Communicator used for data sharing.
"""
return self._comm
@property
def partition(self):
"""Return the partition identifying how the tensor is split across
......@@ -361,7 +329,8 @@ class NodeEmbedding: # NodeEmbedding
if self._partition:
idxs = F.copy_to(
self._partition.get_local_indices(
self._comm.rank(), ctx=F.context(self._tensor)
max(self._rank, 0),
ctx=F.context(self._tensor),
),
F.context(values),
)
......
......@@ -63,7 +63,6 @@ class SparseGradOptimizer(abc.ABC):
), "MultiGPU world_size for each embedding should be same."
assert not self._rank is None
assert not self._world_size is None
self._nccl_root_id = "SparseGradOptimizer.nccl_root_id"
def step(self):
"""The step function.
......@@ -74,7 +73,7 @@ class SparseGradOptimizer(abc.ABC):
if self._first_step:
for emb in self._params:
for _, data in emb._trace:
if data.grad.data.device.type == "cuda":
if data.grad.device.type == "cuda":
# create a communicator
if self._device:
assert (
......@@ -116,27 +115,7 @@ class SparseGradOptimizer(abc.ABC):
"""
def _comm_setup(self):
# find a store to communicate the unique id through
if len(self._params) > 0:
store = self._params[0].store
if self._rank < 0:
self._comm = nccl.Communicator(1, 0, nccl.UniqueId())
else:
th.cuda.set_device(self._device)
if self._rank == 0:
# root process broadcasts nccl id
nccl_id = nccl.UniqueId()
uid = str(nccl_id)
store.set(self._nccl_root_id, uid)
else:
uid = store.get(self._nccl_root_id)
nccl_id = nccl.UniqueId(uid)
# needs to be set for nccl to work
self._comm = nccl.Communicator(
self._world_size, self._rank, nccl_id
)
th.distributed.barrier()
self._comm = True
def _shared_setup(self):
for emb in self._params:
......@@ -162,7 +141,6 @@ class SparseGradOptimizer(abc.ABC):
self._opt_meta[emb_name] = opt_meta
def _comm_step(self):
comm = self._comm
with th.no_grad():
idx_in = {}
grad_in = {}
......@@ -203,7 +181,7 @@ class SparseGradOptimizer(abc.ABC):
(
idx_in[emb_name],
grad_in[emb_name],
) = comm.sparse_all_to_all_push(idx, grad, partition=partition)
) = nccl.sparse_all_to_all_push(idx, grad, partition=partition)
if emb.partition:
# if the embedding is partitioned, map back to indexes
# into the local tensor
......
......@@ -592,5 +592,44 @@ class NDArrayPartition(object):
)
)
def generate_permutation(self, idxs):
"""Produce a scheme that maps the given indices to separate partitions
and the counts of how many indices are in each partition.
Parameters
----------
idxs: torch.Tensor.
A tensor with shape (`num_indices`,), representing global indices.
Return
------
torch.Tensor.
A tensor with shape (`num_indices`,), representing the permutation
to re-order the indices by partition.
torch.Tensor.
A tensor with shape (`num_partition`,), representing the number of
indices per partition.
Examples
--------
>>> import torch
>>> from dgl.partition import NDArrayPartition
>>> part = NDArrayPartition(10, 2, mode="remainder")
>>> idx = torch.tensor([0, 2, 4, 5, 8, 8, 9], device="cuda:0")
>>> perm, splits_sum = part.generate_permutation(idx)
>>> perm
tensor([0, 1, 2, 4, 5, 3, 6], device='cuda:0')
>>> splits_sum
tensor([5, 2], device='cuda:0')
"""
ret = _CAPI_DGLNDArrayPartitionGeneratePermutation(
self._partition, F.zerocopy_to_dgl_ndarray(idxs)
)
return F.zerocopy_from_dgl_ndarray(ret(0)), F.zerocopy_from_dgl_ndarray(
ret(1)
)
_init_api("dgl.partition")
/**
* Copyright (c) 2018 by Contributors
* @file c_runtime_api.cc
* @file c_api_common.cc
* @brief DGL C API common implementations
*/
#include "c_api_common.h"
......
......@@ -12,6 +12,7 @@
#include <memory>
#include <utility>
#include "../c_api_common.h"
#include "partition_op.h"
using namespace dgl::runtime;
......@@ -251,5 +252,15 @@ DGL_REGISTER_GLOBAL("partition._CAPI_DGLNDArrayPartitionMapToGlobal")
*rv = part->MapToGlobal(idxs, part_id);
});
DGL_REGISTER_GLOBAL("partition._CAPI_DGLNDArrayPartitionGeneratePermutation")
.set_body([](DGLArgs args, DGLRetValue* rv) {
NDArrayPartitionRef part = args[0];
IdArray idxs = args[1];
std::pair<IdArray, NDArray> part_perm = part->GeneratePermutation(idxs);
*rv =
ConvertNDArrayVectorToPackedFunc({part_perm.first, part_perm.second});
});
} // namespace partition
} // namespace dgl
/**
* Copyright (c) 2021-2022 by Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @file nccl_api.cu
* @brief Implementation of wrapper around NCCL routines.
*/
#include <cuda_fp16.h>
#include <cuda_runtime.h>
#include <dgl/array.h>
#include <dgl/aten/array_ops.h>
#include <dgl/packed_func_ext.h>
#include <dgl/runtime/container.h>
#include <dgl/runtime/device_api.h>
#include <dgl/runtime/registry.h>
#include <algorithm>
#include <cmath>
#include <iomanip>
#include <limits>
#include <memory>
#include <sstream>
#include <string>
#include <utility>
#include <vector>
#include "../../array/cuda/array_index_select.cuh"
#include "../../array/cuda/dgl_cub.cuh"
#include "../../partition/ndarray_partition.h"
#include "../../runtime/workspace.h"
#include "cuda_common.h"
#include "nccl_api.h"
#define NCCL_CALL(func) \
{ \
ncclResult_t result = func; \
if (result != ncclSuccess) { \
LOG(FATAL) << "NCCLError: " #func " failed with error: " << result; \
} \
}
namespace dgl {
using namespace partition;
namespace runtime {
namespace cuda {
namespace {
#ifdef DGL_USE_NCCL
template <typename T>
ncclDataType_t NCCLType();
template <>
ncclDataType_t NCCLType<int32_t>() {
return ncclInt32;
}
template <>
ncclDataType_t NCCLType<int64_t>() {
return ncclInt64;
}
template <>
ncclDataType_t NCCLType<__half>() {
return ncclHalf;
}
template <>
ncclDataType_t NCCLType<float>() {
return ncclFloat32;
}
template <>
ncclDataType_t NCCLType<double>() {
return ncclFloat64;
}
#endif // DGL_USE_NCCL
template <typename IdType, typename DType>
__global__ void _DualPermKernel(
const IdType* const in_idx, const DType* const in_value,
const IdType* const perm, const int64_t num_in, const int64_t num_feat,
IdType* const out_idx, DType* const out_value) {
// set index permutation
const int64_t tidx =
blockDim.x * static_cast<int64_t>(blockIdx.x) + threadIdx.x;
if (tidx < num_in) {
const IdType perm_idx = perm[tidx];
assert(perm_idx < num_in);
out_idx[tidx] = in_idx[perm_idx];
}
if (num_feat > 1) {
for (int d = 0; d < blockDim.x; ++d) {
const int64_t bidx = blockDim.x * static_cast<int64_t>(blockIdx.x) + d;
if (bidx < num_in) {
const IdType perm_idx = perm[bidx];
for (int64_t f = threadIdx.x; f < num_feat; f += blockDim.x) {
out_value[bidx * num_feat + f] = in_value[perm_idx * num_feat + f];
}
}
}
} else {
if (tidx < num_in) {
const IdType perm_idx = perm[tidx];
out_value[tidx] = in_value[perm_idx];
}
}
}
template <typename DType, typename IdType>
__global__ void _InversePermKernel(
const DType* const array, const int64_t num_feat, int64_t length,
const IdType* const perm, DType* const out) {
int64_t in_row = blockIdx.x * blockDim.y + threadIdx.y;
const int64_t stride = blockDim.y * gridDim.x;
while (in_row < length) {
int64_t col = threadIdx.x;
const int64_t out_row = perm[in_row];
while (col < num_feat) {
out[out_row * num_feat + col] = array[in_row * num_feat + col];
col += blockDim.x;
}
in_row += stride;
}
}
template <typename IdType, typename DType>
std::pair<IdArray, NDArray> SparsePush(
NCCLCommunicatorRef comm, IdArray in_idx, NDArray in_value,
NDArrayPartitionRef part) {
const auto& ctx = in_idx->ctx;
CHECK_EQ(ctx, in_value->ctx) << "Indices and values must be on the same "
"device";
auto device = DeviceAPI::Get(ctx);
cudaStream_t stream = runtime::getCurrentCUDAStream();
CHECK_LE(in_idx->ndim, 1) << "The tensor of sending indices must be of "
"dimension one (or empty).";
const int64_t num_in = in_idx->ndim > 0 ? in_idx->shape[0] : 0;
CHECK_EQ(num_in, in_value->ndim > 0 ? in_value->shape[0] : 0)
<< "Leading dimension of indices (" << num_in
<< ") must match "
"leading dimension of values ("
<< (in_value->ndim > 0 ? in_value->shape[0] : 0) << ").";
int64_t num_feat = 1;
for (int d = 1; d < in_value->ndim; ++d) {
num_feat *= in_value->shape[d];
}
const int64_t comm_size = comm->size();
if (comm_size == 1) {
// nothing to do, just return original arrays
return std::pair<IdArray, NDArray>(in_idx, in_value);
}
std::pair<IdArray, NDArray> part_perm = part->GeneratePermutation(in_idx);
const IdType* const perm = static_cast<const IdType*>(part_perm.first->data);
const int64_t* const send_sum =
static_cast<const int64_t*>(part_perm.second->data);
Workspace<IdType> send_idx(device, ctx, num_in);
Workspace<DType> send_value(device, ctx, num_in * num_feat);
// permute the indices and values
if (num_in > 0) {
const dim3 block(256);
const dim3 grid((num_in + block.x - 1) / block.x);
CUDA_KERNEL_CALL(
_DualPermKernel, grid, block, 0, stream,
static_cast<const IdType*>(in_idx->data),
static_cast<const DType*>(in_value->data), perm, num_in, num_feat,
send_idx.get(), send_value.get());
}
// compute the prefix sum of the send values
Workspace<int64_t> send_prefix(device, ctx, comm_size + 1);
{
size_t prefix_workspace_size;
CUDA_CALL(cub::DeviceScan::ExclusiveSum(
nullptr, prefix_workspace_size, send_sum, send_prefix.get(),
comm_size + 1, stream));
Workspace<void> prefix_workspace(device, ctx, prefix_workspace_size);
CUDA_CALL(cub::DeviceScan::ExclusiveSum(
prefix_workspace.get(), prefix_workspace_size, send_sum,
send_prefix.get(), comm_size + 1, stream));
}
std::vector<int64_t> send_prefix_host(comm_size + 1);
// copy using the same stream (local current stream), no need to sync
device->CopyDataFromTo(
send_prefix.get(), 0, send_prefix_host.data(), 0,
send_prefix_host.size() * sizeof(*send_prefix.get()), ctx,
DGLContext{kDGLCPU, 0},
DGLDataType{kDGLInt, sizeof(*send_prefix.get()) * 8, 1});
send_prefix.free();
CHECK_EQ(send_prefix_host.back(), num_in)
<< "Internal Error: "
"send_prefix_host.back() = "
<< send_prefix_host.back() << ", and num_in = " << num_in;
// communicate the amount to send
Workspace<int64_t> recv_sum(device, ctx, comm_size + 1);
comm->AllToAll(send_sum, recv_sum.get(), 1, stream);
cudaEvent_t d2h;
CUDA_CALL(cudaEventCreate(&d2h));
// compute the prefix sum of the recv values
Workspace<int64_t> recv_prefix(device, ctx, comm_size + 1);
{
size_t prefix_workspace_size;
CUDA_CALL(cub::DeviceScan::ExclusiveSum(
nullptr, prefix_workspace_size, recv_sum.get(), recv_prefix.get(),
comm_size + 1, stream));
Workspace<void> prefix_workspace(device, ctx, prefix_workspace_size);
CUDA_CALL(cub::DeviceScan::ExclusiveSum(
prefix_workspace.get(), prefix_workspace_size, recv_sum.get(),
recv_prefix.get(), comm_size + 1, stream));
}
recv_sum.free();
// finally copy the prefixsum sum down to the host
std::vector<int64_t> recv_prefix_host(comm_size + 1);
// copy using the same stream (local current stream), no need to sync
device->CopyDataFromTo(
recv_prefix.get(), 0, recv_prefix_host.data(), 0,
recv_prefix_host.size() * sizeof(*recv_prefix.get()), ctx,
DGLContext{kDGLCPU, 0},
DGLDataType{kDGLInt, sizeof(*recv_prefix.get()) * 8, 1});
recv_prefix.free();
// use an event to track when copying is done
CUDA_CALL(cudaEventRecord(d2h, stream));
// allocate output space
CUDA_CALL(cudaEventSynchronize(d2h));
CUDA_CALL(cudaEventDestroy(d2h));
IdArray recv_idx =
aten::NewIdArray(recv_prefix_host.back(), ctx, sizeof(IdType) * 8);
std::vector<int64_t> value_shape(in_value->ndim, 0);
value_shape[0] = recv_prefix_host.back();
for (int d = 1; d < in_value->ndim; ++d) {
value_shape[d] = in_value->shape[d];
}
NDArray recv_value = NDArray::Empty(value_shape, in_value->dtype, ctx);
// send data
comm->SparseAllToAll(
send_idx.get(), send_value.get(), num_feat, send_prefix_host.data(),
static_cast<IdType*>(recv_idx->data),
static_cast<DType*>(recv_value->data), recv_prefix_host.data(), stream);
return std::pair<IdArray, NDArray>(recv_idx, recv_value);
}
template <typename IdType, typename DType>
NDArray SparsePull(
NCCLCommunicatorRef comm, IdArray req_idx, NDArray local_tensor,
NDArrayPartitionRef part) {
const auto& ctx = req_idx->ctx;
CHECK_EQ(ctx, local_tensor->ctx) << "The request indices and set of local "
"values must be on the same device";
auto device = DeviceAPI::Get(ctx);
cudaStream_t stream = runtime::getCurrentCUDAStream();
CHECK_LE(req_idx->ndim, 1) << "The tensor of requested indices must be of "
"dimension one (or empty).";
const int64_t num_in = req_idx->ndim > 0 ? req_idx->shape[0] : 0;
int64_t num_feat = 1;
for (int d = 1; d < local_tensor->ndim; ++d) {
num_feat *= local_tensor->shape[d];
}
const int64_t comm_size = comm->size();
if (comm_size == 1) {
// Just return index selection from current local_tensor
return aten::IndexSelect(local_tensor, req_idx);
}
// First we need to send our requests to other processors. This means
// re-ordering our index array to be contiguous among processors, and
// counting the number of indices we are sending each processor. For now,
// we assume a poorly partitioned graph, and that there exists the
// possibility that each processor could request data from this one.
// the buffer for us to re-order our requests in
Workspace<IdType> send_idx(device, ctx, num_in);
std::pair<IdArray, NDArray> part_perm = part->GeneratePermutation(req_idx);
const IdType* const perm = static_cast<const IdType*>(part_perm.first->data);
const int64_t* const send_sum =
static_cast<const int64_t*>(part_perm.second->data);
// permute requests
if (num_in > 0) {
const dim3 block(256);
const dim3 grid((num_in + block.x - 1) / block.x);
CUDA_KERNEL_CALL(
aten::impl::IndexSelectSingleKernel, grid, block, 0, stream,
static_cast<const IdType*>(req_idx->data), perm, num_in,
req_idx->shape[0], send_idx.get());
}
// compute the prefix sum of the indexes this process is requesting
Workspace<int64_t> request_prefix(device, ctx, comm_size + 1);
{
size_t prefix_workspace_size;
CUDA_CALL(cub::DeviceScan::ExclusiveSum(
nullptr, prefix_workspace_size, send_sum, request_prefix.get(),
comm_size + 1, stream));
Workspace<void> prefix_workspace(device, ctx, prefix_workspace_size);
CUDA_CALL(cub::DeviceScan::ExclusiveSum(
prefix_workspace.get(), prefix_workspace_size, send_sum,
request_prefix.get(), comm_size + 1, stream));
}
cudaEvent_t d2h;
CUDA_CALL(cudaEventCreate(&d2h));
std::vector<int64_t> request_prefix_host(comm_size + 1);
// copy using the same stream (local current stream), no need to sync
device->CopyDataFromTo(
request_prefix.get(), 0, request_prefix_host.data(), 0,
request_prefix_host.size() * sizeof(*request_prefix.get()), ctx,
DGLContext{kDGLCPU, 0},
DGLDataType{kDGLInt, sizeof(*request_prefix.get()) * 8, 1});
request_prefix.free();
CHECK_EQ(request_prefix_host.back(), num_in)
<< "Internal Error: "
"request_prefix_host.back() = "
<< request_prefix_host.back() << ", num_in = " << num_in;
// communicate the amount requested
Workspace<int64_t> recv_sum(device, ctx, comm_size + 1);
comm->AllToAll(send_sum, recv_sum.get(), 1, stream);
// compute the prefix sum of the requested indexes
Workspace<int64_t> response_prefix(device, ctx, comm_size + 1);
{
size_t prefix_workspace_size;
CUDA_CALL(cub::DeviceScan::ExclusiveSum(
nullptr, prefix_workspace_size, recv_sum.get(), response_prefix.get(),
comm_size + 1, stream));
Workspace<void> prefix_workspace(device, ctx, prefix_workspace_size);
CUDA_CALL(cub::DeviceScan::ExclusiveSum(
prefix_workspace.get(), prefix_workspace_size, recv_sum.get(),
response_prefix.get(), comm_size + 1, stream));
}
recv_sum.free();
// finally copy the prefixsum sum down to the host
std::vector<int64_t> response_prefix_host(comm_size + 1);
// copy using the same stream (local current stream), no need to sync
device->CopyDataFromTo(
response_prefix.get(), 0, response_prefix_host.data(), 0,
response_prefix_host.size() * sizeof(*response_prefix.get()), ctx,
DGLContext{kDGLCPU, 0},
DGLDataType{kDGLInt, sizeof(*response_prefix.get()) * 8, 1});
response_prefix.free();
// use an event to track when copying is done
CUDA_CALL(cudaEventRecord(d2h, stream));
// allocate output space
CUDA_CALL(cudaEventSynchronize(d2h));
CUDA_CALL(cudaEventDestroy(d2h));
// gather requested indexes
IdArray recv_idx =
aten::NewIdArray(response_prefix_host.back(), ctx, sizeof(IdType) * 8);
comm->AllToAllV(
send_idx.get(), request_prefix_host.data(),
static_cast<IdType*>(recv_idx->data), response_prefix_host.data(),
stream);
send_idx.free();
// convert requested indices to local indices depending on partition
if (response_prefix_host.back() > 0) {
recv_idx = part->MapToLocal(recv_idx);
}
// and then index select them into place
Workspace<DType> filled_response_value(
device, ctx, response_prefix_host.back() * num_feat);
if (response_prefix_host.back() > 0) {
dim3 block(256, 1);
while (block.x >= 2 * num_feat) {
block.x /= 2;
block.y *= 2;
}
const dim3 grid((response_prefix_host.back() + block.y - 1) / block.y);
CUDA_KERNEL_CALL(
aten::impl::IndexSelectMultiKernel, grid, block, 0, stream,
static_cast<const DType*>(local_tensor->data), num_feat,
static_cast<IdType*>(recv_idx->data), response_prefix_host.back(),
local_tensor->shape[0], filled_response_value.get());
}
// we will collect recieved values in this array
std::vector<int64_t> value_shape(local_tensor->ndim, 0);
value_shape[0] = request_prefix_host.back();
for (int d = 1; d < local_tensor->ndim; ++d) {
value_shape[d] = local_tensor->shape[d];
}
Workspace<DType> filled_request_value(
device, ctx, request_prefix_host.back() * num_feat);
// multiply the prefixes by the number of features being sent
for (auto& v : request_prefix_host) {
v *= num_feat;
}
for (auto& v : response_prefix_host) {
v *= num_feat;
}
// send the values
comm->AllToAllV(
filled_response_value.get(), response_prefix_host.data(),
filled_request_value.get(), request_prefix_host.data(), stream);
filled_response_value.free();
// finally, we need to permute the values back into the requested order
NDArray result = NDArray::Empty(value_shape, local_tensor->dtype, ctx);
if (num_in > 0) {
dim3 block(256, 1);
while (block.x >= 2 * num_feat) {
block.x /= 2;
block.y *= 2;
}
const dim3 grid((num_in + block.y - 1) / block.y);
CUDA_KERNEL_CALL(
_InversePermKernel, grid, block, 0, stream, filled_request_value.get(),
num_feat, num_in, perm, static_cast<DType*>(result->data));
}
return result;
}
} // namespace
/* NCCLUniqueId **************************************************************/
NCCLUniqueId::NCCLUniqueId() : id_() {
#ifdef DGL_USE_NCCL
// this ID is unique to the process, not to each call of this function
NCCL_CALL(ncclGetUniqueId(&id_));
#else
// when NCCL isn't enabled, use all zeros
std::fill(
id_.internal, id_.internal + NCCL_UNIQUE_ID_BYTES, static_cast<char>(0));
#endif
}
ncclUniqueId NCCLUniqueId::Get() const { return id_; }
std::string NCCLUniqueId::ToString() const {
std::ostringstream oss;
oss << std::hex;
for (size_t b = 0; b < NCCL_UNIQUE_ID_BYTES; ++b) {
const int num = static_cast<uint8_t>(id_.internal[b]);
oss << std::setw(2) << std::setfill('0') << num;
}
std::string result = oss.str();
CHECK_EQ(result.length(), NCCL_UNIQUE_ID_BYTES * 2)
<< "Invalid NCCL ID format: '" << result << "'";
return result;
}
void NCCLUniqueId::FromString(const std::string& str) {
// must be exactly 256 hex characters
CHECK_EQ(str.length(), NCCL_UNIQUE_ID_BYTES * 2)
<< "Invalid NCCL ID format: '" << str << "'";
for (size_t b = 0; b < NCCL_UNIQUE_ID_BYTES; ++b) {
id_.internal[b] = std::strtol(str.substr(b * 2, 2).c_str(), nullptr, 16);
}
}
/* NCCLCommunicator **********************************************************/
NCCLCommunicator::NCCLCommunicator(
const int size, const int rank, ncclUniqueId id)
: comm_(), size_(size), rank_(rank) {
CHECK_LT(rank, size) << "The rank (" << rank
<< ") must be smaller than "
"the size of the communicator ("
<< size << ").";
CHECK_GE(rank, 0) << "The rank (" << rank
<< ") must be greater than or "
"equal to 0.";
#ifdef DGL_USE_NCCL
NCCL_CALL(ncclCommInitRank(&comm_, size_, id, rank_));
#else
CHECK_EQ(size, 1)
<< "Cannot create a communicator of size " << size
<< ". "
"To use a communicator size greater than 1, compile DGL with NCCL "
"support.";
#endif
}
NCCLCommunicator::~NCCLCommunicator() {
#ifdef DGL_USE_NCCL
ncclCommDestroy(comm_);
#endif
}
ncclComm_t NCCLCommunicator::Get() { return comm_; }
template <typename DType>
void NCCLCommunicator::AllToAllV(
const DType* const send, const int64_t* const send_prefix,
DType* const recv, const int64_t* const recv_prefix, cudaStream_t stream) {
#ifdef DGL_USE_NCCL
const ncclDataType_t type = NCCLType<DType>();
NCCL_CALL(ncclGroupStart());
for (int r = 0; r < size_; ++r) {
const int64_t send_size = send_prefix[r + 1] - send_prefix[r];
if (send_size > 0) {
NCCL_CALL(
ncclSend(send + send_prefix[r], send_size, type, r, comm_, stream));
}
const int64_t recv_size = recv_prefix[r + 1] - recv_prefix[r];
if (recv_size > 0) {
NCCL_CALL(
ncclRecv(recv + recv_prefix[r], recv_size, type, r, comm_, stream));
}
}
NCCL_CALL(ncclGroupEnd());
#else
CHECK_EQ(send_prefix[1] - send_prefix[0], recv_prefix[1] - recv_prefix[0])
<< "Send message size must equal receive message size.";
int dev_id;
CUDA_CALL(cudaGetDevice(&dev_id));
DGLContext ctx{kDGLCUDA, dev_id};
auto device = runtime::DeviceAPI::Get(ctx);
auto dtype = DGLDataTypeTraits<DType>::dtype;
// copy using the same stream (local current stream), no need to sync
device->CopyDataFromTo(
send, send_prefix[0], recv, recv_prefix[0],
sizeof(DType) * send_prefix[1] - send_prefix[0], ctx, ctx, dtype);
#endif
}
template void NCCLCommunicator::AllToAllV<int32_t>(
const int32_t* const send, const int64_t* send_prefix, int32_t* const recv,
const int64_t* recv_prefix, cudaStream_t stream);
template void NCCLCommunicator::AllToAllV<int64_t>(
const int64_t* const send, const int64_t* send_prefix, int64_t* const recv,
const int64_t* recv_prefix, cudaStream_t stream);
template void NCCLCommunicator::AllToAllV<float>(
const float* const send, const int64_t* send_prefix, float* const recv,
const int64_t* recv_prefix, cudaStream_t stream);
template void NCCLCommunicator::AllToAllV<__half>(
const __half* const send, const int64_t* send_prefix, __half* const recv,
const int64_t* recv_prefix, cudaStream_t stream);
template <typename IdType>
void NCCLCommunicator::AllToAll(
const IdType* const send, IdType* const recv, const int64_t count,
cudaStream_t stream) {
#ifdef DGL_USE_NCCL
const ncclDataType_t type = NCCLType<IdType>();
NCCL_CALL(ncclGroupStart());
for (int r = 0; r < size_; ++r) {
NCCL_CALL(ncclSend(send + (r * count), count, type, r, comm_, stream));
NCCL_CALL(ncclRecv(recv + (r * count), count, type, r, comm_, stream));
}
NCCL_CALL(ncclGroupEnd());
#else
int dev_id;
CUDA_CALL(cudaGetDevice(&dev_id));
DGLContext ctx{kDGLCUDA, dev_id};
auto device = runtime::DeviceAPI::Get(ctx);
auto dtype = DGLDataTypeTraits<IdType>::dtype;
// copy using the same stream (local current stream), no need to sync
device->CopyDataFromTo(send, 0, recv, 0, count, ctx, ctx, dtype);
#endif
}
template void NCCLCommunicator::AllToAll<int32_t>(
const int32_t* const send, int32_t* const recv, const int64_t count,
cudaStream_t stream);
template void NCCLCommunicator::AllToAll<int64_t>(
const int64_t* const send, int64_t* const recv, const int64_t count,
cudaStream_t stream);
template <typename IdType, typename DType>
void NCCLCommunicator::SparseAllToAll(
const IdType* const send_idx, const DType* const send_value,
const int64_t num_feat, const int64_t* const send_prefix,
IdType* const recv_idx, DType* const recv_value,
const int64_t* const recv_prefix, cudaStream_t stream) {
// idxs
AllToAllV(send_idx, send_prefix, recv_idx, recv_prefix, stream);
// scale prefixes by number of features
std::vector<int64_t> value_send_prefix(size_ + 1);
for (int r = 0; r < size_ + 1; ++r) {
value_send_prefix[r] = send_prefix[r] * num_feat;
}
std::vector<int64_t> value_recv_prefix(size_ + 1);
for (int r = 0; r < size_ + 1; ++r) {
value_recv_prefix[r] = recv_prefix[r] * num_feat;
}
AllToAllV(
send_value, value_send_prefix.data(), recv_value,
value_recv_prefix.data(), stream);
}
template void NCCLCommunicator::SparseAllToAll<int32_t, __half>(
const int32_t* const send_idx, const __half* const send_value,
const int64_t num_feat, const int64_t* const send_prefix,
int32_t* const recv_idx, __half* const recv_value,
const int64_t* const recv_prefix, cudaStream_t stream);
template void NCCLCommunicator::SparseAllToAll<int64_t, __half>(
const int64_t* const send_idx, const __half* const send_value,
const int64_t num_feat, const int64_t* const send_prefix,
int64_t* const recv_idx, __half* const recv_value,
const int64_t* const recv_prefix, cudaStream_t stream);
int NCCLCommunicator::size() const { return size_; }
int NCCLCommunicator::rank() const { return rank_; }
/* CAPI **********************************************************************/
DGL_REGISTER_GLOBAL("cuda.nccl._CAPI_DGLNCCLGetUniqueId")
.set_body([](DGLArgs args, DGLRetValue* rv) {
*rv = NCCLUniqueIdRef(std::make_shared<NCCLUniqueId>());
});
DGL_REGISTER_GLOBAL("cuda.nccl._CAPI_DGLNCCLUniqueIdToString")
.set_body([](DGLArgs args, DGLRetValue* rv) {
NCCLUniqueIdRef idObj = args[0];
*rv = idObj->ToString();
});
DGL_REGISTER_GLOBAL("cuda.nccl._CAPI_DGLNCCLUniqueIdFromString")
.set_body([](DGLArgs args, DGLRetValue* rv) {
const std::string str = args[0];
NCCLUniqueIdRef ref(std::make_shared<NCCLUniqueId>());
ref->FromString(str);
*rv = ref;
});
DGL_REGISTER_GLOBAL("cuda.nccl._CAPI_DGLNCCLCreateComm")
.set_body([](DGLArgs args, DGLRetValue* rv) {
const int size = args[0];
const int rank = args[1];
NCCLUniqueIdRef idObj = args[2];
*rv = NCCLCommunicatorRef(
std::make_shared<NCCLCommunicator>(size, rank, idObj->Get()));
});
DGL_REGISTER_GLOBAL("cuda.nccl._CAPI_DGLNCCLSparseAllToAllPush")
.set_body([](DGLArgs args, DGLRetValue* rv) {
NCCLCommunicatorRef comm = args[0];
IdArray in_idx = args[1];
NDArray in_values = args[2];
NDArrayPartitionRef part = args[3];
List<ObjectRef> ret;
ATEN_ID_TYPE_SWITCH(in_idx->dtype, IdType, {
ATEN_DTYPE_SWITCH(in_values->dtype, DType, "values", {
auto result =
SparsePush<IdType, DType>(comm, in_idx, in_values, part);
ret.push_back(Value(MakeValue(result.first)));
ret.push_back(Value(MakeValue(result.second)));
});
});
*rv = ret;
});
DGL_REGISTER_GLOBAL("cuda.nccl._CAPI_DGLNCCLSparseAllToAllPull")
.set_body([](DGLArgs args, DGLRetValue* rv) {
NCCLCommunicatorRef comm = args[0];
// the indexes this process is requesting from others
IdArray req_idx = args[1];
// the tensor this process has to fulfill other requests
NDArray tensor = args[2];
NDArrayPartitionRef part = args[3];
ATEN_ID_TYPE_SWITCH(req_idx->dtype, IdType, {
ATEN_DTYPE_SWITCH(tensor->dtype, DType, "values", {
*rv = SparsePull<IdType, DType>(comm, req_idx, tensor, part);
});
});
});
DGL_REGISTER_GLOBAL("cuda.nccl._CAPI_DGLNCCLHasSupport")
.set_body([](DGLArgs args, DGLRetValue* rv) {
#ifndef DGL_USE_NCCL
return false;
#else
return true;
#endif
});
} // namespace cuda
} // namespace runtime
} // namespace dgl
/**
* Copyright (c) 2021-2022 by Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @file nccl_api.h
* @brief Wrapper around NCCL routines.
*/
#ifndef DGL_RUNTIME_CUDA_NCCL_API_H_
#define DGL_RUNTIME_CUDA_NCCL_API_H_
#ifdef DGL_USE_NCCL
#include "nccl.h"
#else
// if not compiling with NCCL, this class will only support communicators of
// size 1.
#define NCCL_UNIQUE_ID_BYTES 128
typedef struct {
char internal[NCCL_UNIQUE_ID_BYTES];
} ncclUniqueId;
typedef int ncclComm_t;
#endif
#include <dgl/runtime/object.h>
#include <string>
namespace dgl {
namespace runtime {
namespace cuda {
class NCCLUniqueId : public runtime::Object {
public:
NCCLUniqueId();
static constexpr const char* _type_key = "cuda.NCCLUniqueId";
DGL_DECLARE_OBJECT_TYPE_INFO(NCCLUniqueId, Object);
ncclUniqueId Get() const;
std::string ToString() const;
void FromString(const std::string& str);
private:
ncclUniqueId id_;
};
DGL_DEFINE_OBJECT_REF(NCCLUniqueIdRef, NCCLUniqueId);
class NCCLCommunicator : public runtime::Object {
public:
NCCLCommunicator(int size, int rank, ncclUniqueId id);
~NCCLCommunicator();
// disable copying
NCCLCommunicator(const NCCLCommunicator& other) = delete;
NCCLCommunicator& operator=(const NCCLCommunicator& other);
ncclComm_t Get();
/**
* @brief Perform an all-to-all communication.
*
* @param send The continous array of data to send.
* @param recv The continous array of data to recieve.
* @param count The size of data to send to each rank.
* @param stream The stream to operate on.
*/
template <typename IdType>
void AllToAll(
const IdType* send, IdType* recv, int64_t count, cudaStream_t stream);
/**
* @brief Perform an all-to-all variable sized communication.
*
* @tparam DType The type of value to send.
* @param send The arrays of data to send.
* @param send_prefix The prefix of each array to send.
* @param recv The arrays of data to recieve.
* @param recv_prefix The prefix of each array to recieve.
* @param type The type of data to send.
* @param stream The stream to operate on.
*/
template <typename DType>
void AllToAllV(
const DType* const send, const int64_t* send_prefix, DType* const recv,
const int64_t* recv_prefix, cudaStream_t stream);
/**
* @brief Perform an all-to-all with sparse data (idx and value pairs). By
* necessity, the sizes of each message are variable.
*
* @tparam IdType The type of index.
* @tparam DType The type of value.
* @param send_idx The set of indexes to send on the device.
* @param send_value The set of values to send on the device.
* @param num_feat The number of values per index.
* @param send_prefix The exclusive prefix sum of elements to send on the
* host.
* @param recv_idx The set of indexes to recieve on the device.
* @param recv_value The set of values to recieve on the device.
* @param recv_prefix The exclusive prefix sum of the number of elements to
* recieve on the host.
* @param stream The stream to communicate on.
*/
template <typename IdType, typename DType>
void SparseAllToAll(
const IdType* send_idx, const DType* send_value, const int64_t num_feat,
const int64_t* send_prefix, IdType* recv_idx, DType* recv_value,
const int64_t* recv_prefix, cudaStream_t stream);
int size() const;
int rank() const;
static constexpr const char* _type_key = "cuda.NCCLCommunicator";
DGL_DECLARE_OBJECT_TYPE_INFO(NCCLCommunicator, Object);
private:
ncclComm_t comm_;
int size_;
int rank_;
};
DGL_DEFINE_OBJECT_REF(NCCLCommunicatorRef, NCCLCommunicator);
} // namespace cuda
} // namespace runtime
} // namespace dgl
#endif // DGL_RUNTIME_CUDA_NCCL_API_H_
......@@ -24,11 +24,13 @@ def test_get_node_partition_from_book(idtype):
assert partition.num_parts() == 3
assert partition.array_size() == 11
# Test map_to_local
test_ids = F.copy_to(F.tensor([0, 2, 6, 7, 10], dtype=idtype), F.ctx())
act_ids = partition.map_to_local(test_ids)
exp_ids = F.copy_to(F.tensor([0, 2, 0, 1, 4], dtype=idtype), F.ctx())
assert F.array_equal(act_ids, exp_ids)
# Test map_to_global
test_ids = F.copy_to(F.tensor([0, 2], dtype=idtype), F.ctx())
act_ids = partition.map_to_global(test_ids, 0)
exp_ids = F.copy_to(F.tensor([0, 2], dtype=idtype), F.ctx())
......@@ -43,3 +45,11 @@ def test_get_node_partition_from_book(idtype):
act_ids = partition.map_to_global(test_ids, 2)
exp_ids = F.copy_to(F.tensor([6, 7, 10], dtype=idtype), F.ctx())
assert F.array_equal(act_ids, exp_ids)
# Test generate_permutation
test_ids = F.copy_to(F.tensor([6, 0, 7, 2, 10], dtype=idtype), F.ctx())
perm, split_sum = partition.generate_permutation(test_ids)
exp_perm = F.copy_to(F.tensor([1, 3, 0, 2, 4], dtype=idtype), F.ctx())
exp_sum = F.copy_to(F.tensor([2, 0, 3]), F.ctx())
assert F.array_equal(perm, exp_perm)
assert F.array_equal(split_sum, exp_sum)
import unittest
import backend as F
import torch
import torch.distributed as dist
from dgl.cuda import nccl
from dgl.partition import NDArrayPartition
def gen_test_id():
return "{:0256x}".format(78236728318467363)
@unittest.skipIf(
F._default_context_str == "cpu", reason="NCCL only runs on GPU."
)
def test_nccl_id():
nccl_id = nccl.UniqueId()
text = str(nccl_id)
nccl_id2 = nccl.UniqueId(id_str=text)
assert nccl_id == nccl_id2
nccl_id2 = nccl.UniqueId(gen_test_id())
assert nccl_id2 != nccl_id
nccl_id3 = nccl.UniqueId(str(nccl_id2))
assert nccl_id2 == nccl_id3
@unittest.skipIf(
F._default_context_str == "cpu", reason="NCCL only runs on GPU."
)
def test_nccl_sparse_push_single_remainder():
nccl_id = nccl.UniqueId()
comm = nccl.Communicator(1, 0, nccl_id)
torch.cuda.set_device("cuda:0")
dist.init_process_group(
backend="nccl",
init_method="tcp://127.0.0.1:12345",
world_size=1,
rank=0,
)
index = F.randint([10000], F.int32, F.ctx(), 0, 10000)
value = F.uniform([10000, 100], F.float32, F.ctx(), -1.0, 1.0)
part = NDArrayPartition(10000, 1, "remainder")
ri, rv = comm.sparse_all_to_all_push(index, value, part)
ri, rv = nccl.sparse_all_to_all_push(index, value, part)
assert F.array_equal(ri, index)
assert F.array_equal(rv, value)
dist.destroy_process_group()
@unittest.skipIf(
F._default_context_str == "cpu", reason="NCCL only runs on GPU."
)
def test_nccl_sparse_pull_single_remainder():
nccl_id = nccl.UniqueId()
comm = nccl.Communicator(1, 0, nccl_id)
torch.cuda.set_device("cuda:0")
dist.init_process_group(
backend="nccl",
init_method="tcp://127.0.0.1:12345",
world_size=1,
rank=0,
)
req_index = F.randint([10000], F.int64, F.ctx(), 0, 100000)
value = F.uniform([100000, 100], F.float32, F.ctx(), -1.0, 1.0)
part = NDArrayPartition(100000, 1, "remainder")
rv = comm.sparse_all_to_all_pull(req_index, value, part)
rv = nccl.sparse_all_to_all_pull(req_index, value, part)
exp_rv = F.gather_row(value, req_index)
assert F.array_equal(rv, exp_rv)
dist.destroy_process_group()
@unittest.skipIf(
F._default_context_str == "cpu", reason="NCCL only runs on GPU."
)
def test_nccl_sparse_push_single_range():
nccl_id = nccl.UniqueId()
comm = nccl.Communicator(1, 0, nccl_id)
torch.cuda.set_device("cuda:0")
dist.init_process_group(
backend="nccl",
init_method="tcp://127.0.0.1:12345",
world_size=1,
rank=0,
)
index = F.randint([10000], F.int32, F.ctx(), 0, 10000)
value = F.uniform([10000, 100], F.float32, F.ctx(), -1.0, 1.0)
......@@ -78,17 +76,24 @@ def test_nccl_sparse_push_single_range():
)
part = NDArrayPartition(10000, 1, "range", part_ranges=part_ranges)
ri, rv = comm.sparse_all_to_all_push(index, value, part)
ri, rv = nccl.sparse_all_to_all_push(index, value, part)
assert F.array_equal(ri, index)
assert F.array_equal(rv, value)
dist.destroy_process_group()
@unittest.skipIf(
F._default_context_str == "cpu", reason="NCCL only runs on GPU."
)
def test_nccl_sparse_pull_single_range():
nccl_id = nccl.UniqueId()
comm = nccl.Communicator(1, 0, nccl_id)
torch.cuda.set_device("cuda:0")
dist.init_process_group(
backend="nccl",
init_method="tcp://127.0.0.1:12345",
world_size=1,
rank=0,
)
req_index = F.randint([10000], F.int64, F.ctx(), 0, 100000)
value = F.uniform([100000, 100], F.float32, F.ctx(), -1.0, 1.0)
......@@ -98,21 +103,15 @@ def test_nccl_sparse_pull_single_range():
)
part = NDArrayPartition(100000, 1, "range", part_ranges=part_ranges)
rv = comm.sparse_all_to_all_pull(req_index, value, part)
rv = nccl.sparse_all_to_all_pull(req_index, value, part)
exp_rv = F.gather_row(value, req_index)
assert F.array_equal(rv, exp_rv)
@unittest.skipIf(
F._default_context_str == "cpu", reason="NCCL only runs on GPU."
)
def test_nccl_support():
# this is just a smoke test, as we don't have any other way to know
# if NCCL support is compiled in right now.
nccl.is_supported()
dist.destroy_process_group()
if __name__ == "__main__":
test_nccl_id()
test_nccl_sparse_push_single()
test_nccl_sparse_pull_single()
test_nccl_sparse_push_single_remainder()
test_nccl_sparse_pull_single_remainder()
test_nccl_sparse_push_single_range()
test_nccl_sparse_pull_single_range()
......@@ -28,7 +28,7 @@ if [[ $arch == *"x86"* ]]; then
fi
if [[ $1 != "cpu" ]]; then
CMAKE_VARS="-DUSE_CUDA=ON -DUSE_NCCL=ON $CMAKE_VARS"
CMAKE_VARS="-DUSE_CUDA=ON $CMAKE_VARS"
fi
if [ -d build ]; then
......
Subproject commit e11238b3029795d33f958b5868d47c90c4f22628
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment