Unverified Commit 738e8318 authored by Xin Yao's avatar Xin Yao Committed by GitHub
Browse files

[Feature] CUDA UVA sampling for MultiLayerNeighborSampler (#3674)



* implement pin_memory/unpin_memory/is_pinned for dgl.graph

* update python docstring

* update c++ docstring

* add test

* fix the broken UnifiedTensor

* XPU_SWITCH for kDLCPUPinned

* a rough version ready for testing

* eliminate extra context parameter for pin/unpin

* update train_sampling

* fix linting

* fix typo

* multi-gpu uva sampling case

* disable new format materialization for pinned graphs

* update python doc for pin_memory_

* fix unit test

* UVA sampling for link prediction

* dispatch most csr ops

* update graphsage example to combine uva sampling and UnifiedTensor

* update graphsage example to combine uva sampling and UnifiedTensor

* update graphsage example to combine uva sampling and UnifiedTensor

* update doc

* update examples

* change unitgraph and heterograph's PinMemory to in-place

* update examples for multi-gpu uva sampling

* update doc

* fix linting

* fix cpu build

* fix is_pinned for DistGraph

* fix is_pinned for DistGraph

* update graphsage unsupervised example

* update doc for gpu sampling

* update some check for sampling device switching

* fix linting

* adapt for new dataloader

* fix linting

* fix

* fix some name issue

* adjust device check

* add unit test for uva sampling & fix some zero_copy bug

* fix linting

* update num_threads in graphsage examples
Co-authored-by: default avatarQuan (Andy) Gan <coin2028@hotmail.com>
Co-authored-by: default avatarJinjing Zhou <VoVAllen@users.noreply.github.com>
parent fa343873
...@@ -4,33 +4,50 @@ ...@@ -4,33 +4,50 @@
--------------------------------------- ---------------------------------------
DGL since 0.7 has been supporting GPU-based neighborhood sampling, which has a significant DGL since 0.7 has been supporting GPU-based neighborhood sampling, which has a significant
speed advantage over CPU-based neighborhood sampling. If you estimate that your graph and speed advantage over CPU-based neighborhood sampling. If you estimate that your graph
its features can fit onto GPU and your model does not take a lot of GPU memory, then it is can fit onto GPU and your model does not take a lot of GPU memory, then it is best to
best to put the GPU into memory and use GPU-based neighbor sampling. put the graph onto GPU memory and use GPU-based neighbor sampling.
For example, `OGB Products <https://ogb.stanford.edu/docs/nodeprop/#ogbn-products>`_ has For example, `OGB Products <https://ogb.stanford.edu/docs/nodeprop/#ogbn-products>`_ has
2.4M nodes and 61M edges, each node having 100-dimensional features. The node feature 2.4M nodes and 61M edges. The graph takes less than 1GB since the memory consumption of
themselves take less than 1GB memory, and the graph also takes less than 1GB since the a graph depends on the number of edges. Therefore it is entirely possible to fit the
memory consumption of a graph depends on the number of edges. Therefore it is entirely whole graph onto GPU.
possible to fit the whole graph onto GPU.
.. note:: Put the node features onto GPU memory
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
If the node features can also fit onto GPU memory, it is recommended to put them onto GPU
to reduce the time for data transfer from CPU to GPU, which usually becomes a bottleneck
when using GPU for sampling. For exampling, in the above OGB Products, each node has
100-dimensional features and they take less than 1GB memory in total. It is easy to
transfer these features to GPU before training via the following code.
.. code:: python
# pop the features and labels
features = g.ndata.pop('features')
labels = g.ndata.pop('labels')
# put them onto GPU
features = features.to('cuda:0')
labels = labels.to('cuda:0')
If the node features are too large to fit onto GPU memory, :class:`~dgl.contrib.UnifiedTensor`
enables GPU zero-copy access to the features stored on CPU memory and greatly reduces
the time for data transfer from CPU to GPU.
This feature is experimental and a work-in-progress. Please stay tuned for further
updates.
Using GPU-based neighborhood sampling in DGL data loaders Using GPU-based neighborhood sampling in DGL data loaders
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
One can use GPU-based neighborhood sampling with DGL data loaders via One can use GPU-based neighborhood sampling with DGL data loaders via:
* Putting the graph onto GPU. * Putting the graph onto GPU.
* Set ``device`` argument to a GPU device.
* Set ``num_workers`` argument to 0, because CUDA does not allow multiple processes * Set ``num_workers`` argument to 0, because CUDA does not allow multiple processes
accessing the same context. accessing the same context.
* Set ``device`` argument to a GPU device.
All the other arguments for the :class:`~dgl.dataloading.pytorch.NodeDataLoader` can be All the other arguments for the :class:`~dgl.dataloading.pytorch.NodeDataLoader` can be
the same as the other user guides and tutorials. the same as the other user guides and tutorials.
...@@ -47,22 +64,79 @@ the same as the other user guides and tutorials. ...@@ -47,22 +64,79 @@ the same as the other user guides and tutorials.
drop_last=False, drop_last=False,
shuffle=True) shuffle=True)
GPU-based neighbor sampling also works for custom neighborhood samplers as long as GPU-based neighbor sampling also works for :class:`~dgl.dataloading.pytorch.EdgeDataLoader` since DGL 0.8.
(1) your sampler is subclassed from :class:`~dgl.dataloading.BlockSampler`, and (2)
your sampler entirely works on GPU. .. note::
GPU-based neighbor sampling also works for custom neighborhood samplers as long as
(1) your sampler is subclassed from :class:`~dgl.dataloading.BlockSampler`, and (2)
your sampler entirely works on GPU.
Using CUDA UVA-based neighborhood sampling in DGL data loaders
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. note:: .. note::
New feature introduced in DGL 0.8.
For the case where the graph is too large to fit onto the GPU memory, we introduce the
CUDA UVA (Unified Virtual Addressing)-based sampling, in which GPUs perform the sampling
on the graph pinned on CPU memory via zero-copy access.
You can enable UVA-based neighborhood sampling in DGL data loaders via:
* Pin the graph to page-locked memory via :func:`dgl.DGLGraph.pin_memory_`.
* Set ``device`` argument to a GPU device.
* Set ``num_workers`` argument to 0, because CUDA does not allow multiple processes
accessing the same context.
All the other arguments for the :class:`~dgl.dataloading.pytorch.NodeDataLoader` can be
the same as the other user guides and tutorials.
UVA-based neighbor sampling also works for :class:`~dgl.dataloading.pytorch.EdgeDataLoader`.
.. code:: python
g = g.pin_memory_()
dataloader = dgl.dataloading.NodeDataLoader(
g, # The graph must be pinned.
train_nid,
sampler,
device=torch.device('cuda:0'), # The device argument must be GPU.
num_workers=0, # Number of workers must be 0.
batch_size=1000,
drop_last=False,
shuffle=True)
UVA-based sampling is the recommended solution for mini-batch training on large graphs,
especially for multi-GPU training.
.. note::
To use UVA-based sampling in multi-GPU training, you should first materialize all the
necessary sparse formats of the graph and copy them to the shared memory explicitly
before spawning training processes. Then you should pin the shared graph in each training
process respectively. Refer to our `GraphSAGE example <https://github.com/dmlc/dgl/blob/master/examples/pytorch/graphsage/train_sampling_multi_gpu.py>`_ for more details.
Currently :class:`~dgl.dataloading.pytorch.EdgeDataLoader` and heterogeneous graphs
are not supported.
Using GPU-based neighbor sampling with DGL functions Using GPU-based neighbor sampling with DGL functions
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The following sampling functions support operating on GPU: You can build your own GPU sampling pipelines with the following functions that support
operating on GPU:
* :func:`dgl.sampling.sample_neighbors` * :func:`dgl.sampling.sample_neighbors`
* Only has support for uniform sampling; non-uniform sampling can only run on CPU. * Only has support for uniform sampling; non-uniform sampling can only run on CPU.
Besides the functions above, :func:`dgl.to_block` can also run on GPU. Subgraph extraction ops:
* :func:`dgl.node_subgraph`
* :func:`dgl.edge_subgraph`
* :func:`dgl.in_subgraph`
* :func:`dgl.out_subgraph`
Graph transform ops for subgraph construction:
* :func:`dgl.to_block`
* :func:`dgl.compact_graph`
...@@ -71,6 +71,20 @@ Notably, ...@@ -71,6 +71,20 @@ Notably,
Micro F1 score reaches 0.9212 on test set. Micro F1 score reaches 0.9212 on test set.
### Use GPU sampling and CUDA UVA sampling
For training scripts `train_sampling.py`, `train_sampling_multi_gpu.py` and `train_sampling_unsupervised.py`, we provide arguments `--graph-device` and `--data-device`.
For `--graph-device`, we provide the following choices:
- `cpu` (default): Use CPU to sample the graph structure stored in host memory.
- `gpu`: Use GPU to sample the graph structure stored in GPU device memory. You have to copy the graph structure (only the `csc` format is needed) to GPU before passing it to the dataloader. This is the fastest way for sampling but requires storing the whole graph structure in GPU memory and will duplicate it in each GPU in multi-GPU training.
- `uva`: Use GPU to sample the graph structure stored in **pinned** host memory through zero-copy access. You have to pin the graph structure before passing it to the dataloader. This is much faster than CPU sampling and especially useful when the graph structure is too large to fit into the GPU memory.
For `--data-device`, we provide the following choices:
- `cpu`: Node features are stored in host memory. It will take a lot time for slicing and transfering node features to GPU during training.
- `gpu` (default): Node features are stored in GPU device memory. This is the fastest way for feature slicing and transfering but cosumes a lot of GPU memory.
- `uva`: Use GPU to slice and access the node features stored in **pinned** host memory (also called `UnifiedTensor`) through zero-copy access. This is especially useful when the node features are too large to fit into the GPU memory.
### Training with PyTorch Lightning ### Training with PyTorch Lightning
We also provide minibatch training scripts with PyTorch Lightning in `train_lightning.py` and `train_lightning_unsupervised.py`. We also provide minibatch training scripts with PyTorch Lightning in `train_lightning.py` and `train_lightning_unsupervised.py`.
......
import dgl import dgl
import torch as th import torch as th
def load_reddit(): def load_reddit(self_loop=True):
from dgl.data import RedditDataset from dgl.data import RedditDataset
# load reddit data # load reddit data
data = RedditDataset(self_loop=True) data = RedditDataset(self_loop=self_loop)
g = data[0] g = data[0]
g.ndata['features'] = g.ndata['feat'] g.ndata['features'] = g.ndata.pop('feat')
g.ndata['labels'] = g.ndata['label'] g.ndata['labels'] = g.ndata.pop('label')
return g, data.num_classes return g, data.num_classes
def load_ogb(name): def load_ogb(name, root='dataset'):
from ogb.nodeproppred import DglNodePropPredDataset from ogb.nodeproppred import DglNodePropPredDataset
print('load', name) print('load', name)
data = DglNodePropPredDataset(name=name) data = DglNodePropPredDataset(name=name, root=root)
print('finish loading', name) print('finish loading', name)
splitted_idx = data.get_idx_split() splitted_idx = data.get_idx_split()
graph, labels = data[0] graph, labels = data[0]
labels = labels[:, 0] labels = labels[:, 0]
graph.ndata['features'] = graph.ndata['feat'] graph.ndata['features'] = graph.ndata.pop('feat')
graph.ndata['labels'] = labels graph.ndata['labels'] = labels
in_feats = graph.ndata['features'].shape[1] in_feats = graph.ndata['features'].shape[1]
num_labels = len(th.unique(labels[th.logical_not(th.isnan(labels))])) num_labels = len(th.unique(labels[th.logical_not(th.isnan(labels))]))
......
...@@ -2,8 +2,10 @@ import torch as th ...@@ -2,8 +2,10 @@ import torch as th
import dgl import dgl
class NegativeSampler(object): class NegativeSampler(object):
def __init__(self, g, k, neg_share=False): def __init__(self, g, k, neg_share=False, device=None):
self.weights = g.in_degrees().float() ** 0.75 if device is None:
device = g.device
self.weights = g.in_degrees().float().to(device) ** 0.75
self.k = k self.k = k
self.neg_share = neg_share self.neg_share = neg_share
......
...@@ -48,17 +48,22 @@ def run(args, device, data): ...@@ -48,17 +48,22 @@ def run(args, device, data):
n_classes, train_g, val_g, test_g, train_nfeat, train_labels, \ n_classes, train_g, val_g, test_g, train_nfeat, train_labels, \
val_nfeat, val_labels, test_nfeat, test_labels = data val_nfeat, val_labels, test_nfeat, test_labels = data
in_feats = train_nfeat.shape[1] in_feats = train_nfeat.shape[1]
train_nid = th.nonzero(train_g.ndata['train_mask'], as_tuple=True)[0] test_nid = test_g.ndata.pop('test_mask',
val_nid = th.nonzero(val_g.ndata['val_mask'], as_tuple=True)[0] ~(test_g.ndata['train_mask'] | test_g.ndata['val_mask'])).nonzero().squeeze()
test_nid = th.nonzero(~(test_g.ndata['train_mask'] | test_g.ndata['val_mask']), as_tuple=True)[0] train_nid = train_g.ndata.pop('train_mask').nonzero().squeeze()
val_nid = val_g.ndata.pop('val_mask').nonzero().squeeze()
dataloader_device = th.device('cpu') if args.graph_device == 'gpu':
if args.sample_gpu:
train_nid = train_nid.to(device) train_nid = train_nid.to(device)
# copy only the csc to the GPU # copy only the csc to the GPU
train_g = train_g.formats(['csc']) train_g = train_g.formats(['csc'])
train_g = train_g.to(device) train_g = train_g.to(device)
dataloader_device = device args.num_workers = 0
elif args.graph_device == 'uva':
train_nid = train_nid.to(device)
train_g = train_g.formats(['csc'])
train_g.pin_memory_()
args.num_workers = 0
# Create PyTorch DataLoader for constructing blocks # Create PyTorch DataLoader for constructing blocks
sampler = dgl.dataloading.MultiLayerNeighborSampler( sampler = dgl.dataloading.MultiLayerNeighborSampler(
...@@ -67,7 +72,7 @@ def run(args, device, data): ...@@ -67,7 +72,7 @@ def run(args, device, data):
train_g, train_g,
train_nid, train_nid,
sampler, sampler,
device=dataloader_device, device=device,
batch_size=args.batch_size, batch_size=args.batch_size,
shuffle=True, shuffle=True,
drop_last=False, drop_last=False,
...@@ -137,21 +142,28 @@ if __name__ == '__main__': ...@@ -137,21 +142,28 @@ if __name__ == '__main__':
argparser.add_argument('--dropout', type=float, default=0.5) argparser.add_argument('--dropout', type=float, default=0.5)
argparser.add_argument('--num-workers', type=int, default=4, argparser.add_argument('--num-workers', type=int, default=4,
help="Number of sampling processes. Use 0 for no extra process.") help="Number of sampling processes. Use 0 for no extra process.")
argparser.add_argument('--sample-gpu', action='store_true',
help="Perform the sampling process on the GPU. Must have 0 workers.")
argparser.add_argument('--inductive', action='store_true', argparser.add_argument('--inductive', action='store_true',
help="Inductive learning setting") help="Inductive learning setting")
argparser.add_argument('--data-cpu', action='store_true', argparser.add_argument('--graph-device', choices=('cpu', 'gpu', 'uva'), default='cpu',
help="Device to perform the sampling. "
"Must have 0 workers for 'gpu' and 'uva'")
argparser.add_argument('--data-device', choices=('cpu', 'gpu', 'uva'), default='gpu',
help="By default the script puts all node features and labels " help="By default the script puts all node features and labels "
"on GPU when using it to save time for data copy. This may " "on GPU when using it to save time for data copy. This may "
"be undesired if they cannot fit in GPU memory at once. " "be undesired if they cannot fit in GPU memory at once. "
"This flag disables that.") "Use 'cpu' to keep the features on host memory and "
"'uva' to enable UnifiedTensor (GPU zero-copy access on "
"pinned host memory).")
args = argparser.parse_args() args = argparser.parse_args()
if args.gpu >= 0: if args.gpu >= 0:
device = th.device('cuda:%d' % args.gpu) device = th.device('cuda:%d' % args.gpu)
else: else:
device = th.device('cpu') device = th.device('cpu')
assert args.graph_device == 'cpu', \
f"Must have GPUs to enable {args.graph_device} sampling."
assert args.data_device == 'cpu', \
f"Must have GPUs to enable {args.data_device} feature storage."
if args.dataset == 'reddit': if args.dataset == 'reddit':
g, n_classes = load_reddit() g, n_classes = load_reddit()
...@@ -173,9 +185,12 @@ if __name__ == '__main__': ...@@ -173,9 +185,12 @@ if __name__ == '__main__':
train_nfeat = val_nfeat = test_nfeat = g.ndata.pop('features') train_nfeat = val_nfeat = test_nfeat = g.ndata.pop('features')
train_labels = val_labels = test_labels = g.ndata.pop('labels') train_labels = val_labels = test_labels = g.ndata.pop('labels')
if not args.data_cpu: if args.data_device == 'gpu':
train_nfeat = train_nfeat.to(device) train_nfeat = train_nfeat.to(device)
train_labels = train_labels.to(device) train_labels = train_labels.to(device)
elif args.data_device == 'uva':
train_nfeat = dgl.contrib.UnifiedTensor(train_nfeat, device=device)
train_labels = dgl.contrib.UnifiedTensor(train_labels, device=device)
# Pack data # Pack data
data = n_classes, train_g, val_g, test_g, train_nfeat, train_labels, \ data = n_classes, train_g, val_g, test_g, train_nfeat, train_labels, \
......
import os
import dgl import dgl
import numpy as np import numpy as np
import torch as th import torch as th
...@@ -48,7 +49,9 @@ def load_subtensor(nfeat, labels, seeds, input_nodes, dev_id): ...@@ -48,7 +49,9 @@ def load_subtensor(nfeat, labels, seeds, input_nodes, dev_id):
def run(proc_id, n_gpus, args, devices, data): def run(proc_id, n_gpus, args, devices, data):
# Start up distributed training, if enabled. # Start up distributed training, if enabled.
dev_id = devices[proc_id] device = th.device(devices[proc_id])
if n_gpus > 0:
th.cuda.set_device(device)
if n_gpus > 1: if n_gpus > 1:
dist_init_method = 'tcp://{master_ip}:{master_port}'.format( dist_init_method = 'tcp://{master_ip}:{master_port}'.format(
master_ip='127.0.0.1', master_port='12345') master_ip='127.0.0.1', master_port='12345')
...@@ -57,34 +60,29 @@ def run(proc_id, n_gpus, args, devices, data): ...@@ -57,34 +60,29 @@ def run(proc_id, n_gpus, args, devices, data):
init_method=dist_init_method, init_method=dist_init_method,
world_size=world_size, world_size=world_size,
rank=proc_id) rank=proc_id)
th.cuda.set_device(dev_id)
# Unpack data # Unpack data
n_classes, train_g, val_g, test_g = data n_classes, train_g, val_g, test_g, train_nfeat, val_nfeat, test_nfeat, \
train_labels, val_labels, test_labels, train_nid, val_nid, test_nid = data
if args.inductive: if args.data_device == 'gpu':
train_nfeat = train_g.ndata.pop('features') train_nfeat = train_nfeat.to(device)
val_nfeat = val_g.ndata.pop('features') train_labels = train_labels.to(device)
test_nfeat = test_g.ndata.pop('features') elif args.data_device == 'uva':
train_labels = train_g.ndata.pop('labels') train_nfeat = dgl.contrib.UnifiedTensor(train_nfeat, device=device)
val_labels = val_g.ndata.pop('labels') train_labels = dgl.contrib.UnifiedTensor(train_labels, device=device)
test_labels = test_g.ndata.pop('labels')
else:
train_nfeat = val_nfeat = test_nfeat = g.ndata.pop('features')
train_labels = val_labels = test_labels = g.ndata.pop('labels')
if not args.data_cpu:
train_nfeat = train_nfeat.to(dev_id)
train_labels = train_labels.to(dev_id)
in_feats = train_nfeat.shape[1] in_feats = train_nfeat.shape[1]
train_mask = train_g.ndata['train_mask'] if args.graph_device == 'gpu':
val_mask = val_g.ndata['val_mask'] train_nid = train_nid.to(device)
test_mask = ~(test_g.ndata['train_mask'] | test_g.ndata['val_mask']) train_g = train_g.formats(['csc'])
train_nid = train_mask.nonzero().squeeze() train_g = train_g.to(device)
val_nid = val_mask.nonzero().squeeze() args.num_workers = 0
test_nid = test_mask.nonzero().squeeze() elif args.graph_device == 'uva':
train_nid = train_nid.to(device)
train_g.pin_memory_()
args.num_workers = 0
# Create PyTorch DataLoader for constructing blocks # Create PyTorch DataLoader for constructing blocks
sampler = dgl.dataloading.MultiLayerNeighborSampler( sampler = dgl.dataloading.MultiLayerNeighborSampler(
...@@ -94,7 +92,7 @@ def run(proc_id, n_gpus, args, devices, data): ...@@ -94,7 +92,7 @@ def run(proc_id, n_gpus, args, devices, data):
train_nid, train_nid,
sampler, sampler,
use_ddp=n_gpus > 1, use_ddp=n_gpus > 1,
device=dev_id if args.num_workers == 0 else None, device=device,
batch_size=args.batch_size, batch_size=args.batch_size,
shuffle=True, shuffle=True,
drop_last=False, drop_last=False,
...@@ -102,9 +100,9 @@ def run(proc_id, n_gpus, args, devices, data): ...@@ -102,9 +100,9 @@ def run(proc_id, n_gpus, args, devices, data):
# Define model and optimizer # Define model and optimizer
model = SAGE(in_feats, args.num_hidden, n_classes, args.num_layers, F.relu, args.dropout) model = SAGE(in_feats, args.num_hidden, n_classes, args.num_layers, F.relu, args.dropout)
model = model.to(dev_id) model = model.to(device)
if n_gpus > 1: if n_gpus > 1:
model = DistributedDataParallel(model, device_ids=[dev_id], output_device=dev_id) model = DistributedDataParallel(model, device_ids=[device], output_device=device)
loss_fcn = nn.CrossEntropyLoss() loss_fcn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=args.lr) optimizer = optim.Adam(model.parameters(), lr=args.lr)
...@@ -112,8 +110,6 @@ def run(proc_id, n_gpus, args, devices, data): ...@@ -112,8 +110,6 @@ def run(proc_id, n_gpus, args, devices, data):
avg = 0 avg = 0
iter_tput = [] iter_tput = []
for epoch in range(args.num_epochs): for epoch in range(args.num_epochs):
if n_gpus > 1:
dataloader.set_epoch(epoch)
tic = time.time() tic = time.time()
# Loop over the dataloader to sample the computation dependency graph as a list of # Loop over the dataloader to sample the computation dependency graph as a list of
...@@ -124,8 +120,8 @@ def run(proc_id, n_gpus, args, devices, data): ...@@ -124,8 +120,8 @@ def run(proc_id, n_gpus, args, devices, data):
# Load the input features as well as output labels # Load the input features as well as output labels
batch_inputs, batch_labels = load_subtensor(train_nfeat, train_labels, batch_inputs, batch_labels = load_subtensor(train_nfeat, train_labels,
seeds, input_nodes, dev_id) seeds, input_nodes, device)
blocks = [block.int().to(dev_id) for block in blocks] blocks = [block.int().to(device) for block in blocks]
# Compute loss and prediction # Compute loss and prediction
batch_pred = model(blocks, batch_inputs) batch_pred = model(blocks, batch_inputs)
loss = loss_fcn(batch_pred, batch_labels) loss = loss_fcn(batch_pred, batch_labels)
...@@ -162,7 +158,6 @@ def run(proc_id, n_gpus, args, devices, data): ...@@ -162,7 +158,6 @@ def run(proc_id, n_gpus, args, devices, data):
print('Eval Acc {:.4f}'.format(eval_acc)) print('Eval Acc {:.4f}'.format(eval_acc))
print('Test Acc: {:.4f}'.format(test_acc)) print('Test Acc: {:.4f}'.format(test_acc))
if n_gpus > 1: if n_gpus > 1:
th.distributed.barrier() th.distributed.barrier()
if proc_id == 0: if proc_id == 0:
...@@ -186,11 +181,16 @@ if __name__ == '__main__': ...@@ -186,11 +181,16 @@ if __name__ == '__main__':
help="Number of sampling processes. Use 0 for no extra process.") help="Number of sampling processes. Use 0 for no extra process.")
argparser.add_argument('--inductive', action='store_true', argparser.add_argument('--inductive', action='store_true',
help="Inductive learning setting") help="Inductive learning setting")
argparser.add_argument('--data-cpu', action='store_true', argparser.add_argument('--graph-device', choices=('cpu', 'gpu', 'uva'), default='cpu',
help="Device to perform the sampling. "
"Must have 0 workers for 'gpu' and 'uva'")
argparser.add_argument('--data-device', choices=('cpu', 'gpu', 'uva'), default='gpu',
help="By default the script puts all node features and labels " help="By default the script puts all node features and labels "
"on GPU when using it to save time for data copy. This may " "on GPU when using it to save time for data copy. This may "
"be undesired if they cannot fit in GPU memory at once. " "be undesired if they cannot fit in GPU memory at once. "
"This flag disables that.") "Use 'cpu' to keep the features on host memory and "
"'uva' to enable UnifiedTensor (GPU zero-copy access on "
"pinned host memory).")
args = argparser.parse_args() args = argparser.parse_args()
devices = list(map(int, args.gpu.split(','))) devices = list(map(int, args.gpu.split(',')))
...@@ -200,26 +200,63 @@ if __name__ == '__main__': ...@@ -200,26 +200,63 @@ if __name__ == '__main__':
g, n_classes = load_reddit() g, n_classes = load_reddit()
elif args.dataset == 'ogbn-products': elif args.dataset == 'ogbn-products':
g, n_classes = load_ogb('ogbn-products') g, n_classes = load_ogb('ogbn-products')
elif args.dataset == 'ogbn-papers100M':
g, n_classes = load_ogb('ogbn-papers100M')
g = dgl.add_reverse_edges(g)
# convert labels to integer
g.ndata['labels'] = th.as_tensor(g.ndata['labels'], dtype=th.int64)
g.ndata.pop('year')
else: else:
raise Exception('unknown dataset') raise Exception('unknown dataset')
# Construct graph
g = dgl.as_heterograph(g)
if args.inductive: if args.inductive:
train_g, val_g, test_g = inductive_split(g) train_g, val_g, test_g = inductive_split(g)
train_nfeat = train_g.ndata.pop('features')
val_nfeat = val_g.ndata.pop('features')
test_nfeat = test_g.ndata.pop('features')
train_labels = train_g.ndata.pop('labels')
val_labels = val_g.ndata.pop('labels')
test_labels = test_g.ndata.pop('labels')
else: else:
train_g = val_g = test_g = g train_g = val_g = test_g = g
train_nfeat = val_nfeat = test_nfeat = g.ndata.pop('features')
train_labels = val_labels = test_labels = g.ndata.pop('labels')
test_nid = test_g.ndata.pop('test_mask',
~(test_g.ndata['train_mask'] | test_g.ndata['val_mask'])).nonzero().squeeze()
train_nid = train_g.ndata.pop('train_mask').nonzero().squeeze()
val_nid = val_g.ndata.pop('val_mask').nonzero().squeeze()
# Create csr/coo/csc formats before launching training processes with multi-gpu. # Create csr/coo/csc formats before launching training processes with multi-gpu.
# This avoids creating certain formats in each sub-process, which saves momory and CPU. # This avoids creating certain formats in each sub-process, which saves momory and CPU.
train_g.create_formats_() train_g.create_formats_()
val_g.create_formats_() val_g.create_formats_()
test_g.create_formats_() test_g.create_formats_()
# this to avoid competition overhead on machines with many cores.
# Change it to a proper number on your machine, especially for multi-GPU training.
os.environ['OMP_NUM_THREADS'] = str(mp.cpu_count() // 2 // n_gpus)
if n_gpus > 1:
# Copy the graph to shared memory explicitly before pinning.
# In other cases, we can just rely on fork's copy-on-write.
# TODO: the original train_g is not freed.
if args.graph_device == 'uva':
train_g = train_g.shared_memory('train_g')
if args.data_device == 'uva':
train_nfeat = train_nfeat.share_memory_()
train_labels = train_labels.share_memory_()
# Pack data # Pack data
data = n_classes, train_g, val_g, test_g data = n_classes, train_g, val_g, test_g, train_nfeat, val_nfeat, test_nfeat, \
train_labels, val_labels, test_labels, train_nid, val_nid, test_nid
if n_gpus == 1: if devices[0] == -1:
assert args.graph_device == 'cpu', \
f"Must have GPUs to enable {args.graph_device} sampling."
assert args.data_device == 'cpu', \
f"Must have GPUs to enable {args.data_device} feature storage."
run(0, 0, args, ['cpu'], data)
elif n_gpus == 1:
run(0, n_gpus, args, devices, data) run(0, n_gpus, args, devices, data)
else: else:
procs = [] procs = []
......
import dgl
import numpy as np
import torch as th
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import dgl.nn.pytorch as dglnn
import time
import argparse
import tqdm
from model import SAGE
from load_graph import load_reddit, inductive_split, load_ogb
def compute_acc(pred, labels):
"""
Compute the accuracy of prediction given the labels.
"""
labels = labels.long()
return (th.argmax(pred, dim=1) == labels).float().sum() / len(pred)
def evaluate(model, g, nfeat, labels, val_nid, device):
"""
Evaluate the model on the validation set specified by ``val_nid``.
g : The entire graph.
inputs : The features of all the nodes.
labels : The labels of all the nodes.
val_nid : the node Ids for validation.
device : The GPU device to evaluate on.
"""
model.eval()
with th.no_grad():
pred = model.inference(g, nfeat, device, args.batch_size, args.num_workers)
model.train()
return compute_acc(pred[val_nid], labels[val_nid].to(pred.device))
def load_subtensor(nfeat, labels, seeds, input_nodes, device):
"""
Extracts features and labels for a subset of nodes
"""
batch_inputs = nfeat[input_nodes.to(device)]
batch_labels = labels[seeds].to(device)
return batch_inputs, batch_labels
#### Entry point
def run(args, device, data):
# Unpack data
n_classes, train_g, val_g, test_g, train_nfeat, train_labels, \
val_nfeat, val_labels, test_nfeat, test_labels = data
in_feats = train_nfeat.shape[1]
train_nid = th.nonzero(train_g.ndata['train_mask'], as_tuple=True)[0]
val_nid = th.nonzero(val_g.ndata['val_mask'], as_tuple=True)[0]
test_nid = th.nonzero(~(test_g.ndata['train_mask'] | test_g.ndata['val_mask']), as_tuple=True)[0]
dataloader_device = th.device('cpu')
if args.sample_gpu:
train_nid = train_nid.to(device)
# copy only the csc to the GPU
train_g = train_g.formats(['csc'])
train_g = train_g.to(device)
dataloader_device = device
# Create PyTorch DataLoader for constructing blocks
sampler = dgl.dataloading.MultiLayerNeighborSampler(
[int(fanout) for fanout in args.fan_out.split(',')])
dataloader = dgl.dataloading.NodeDataLoader(
train_g,
train_nid,
sampler,
device=dataloader_device,
batch_size=args.batch_size,
shuffle=True,
drop_last=False,
num_workers=args.num_workers)
if args.data_cpu:
# Convert input feature tensor to unified tensor
train_nfeat = dgl.contrib.UnifiedTensor(train_nfeat, device=device)
# Define model and optimizer
model = SAGE(in_feats, args.num_hidden, n_classes, args.num_layers, F.relu, args.dropout)
model = model.to(device)
loss_fcn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=args.lr)
# Training loop
avg = 0
iter_tput = []
for epoch in range(args.num_epochs):
tic = time.time()
# Loop over the dataloader to sample the computation dependency graph as a list of
# blocks.
tic_step = time.time()
for step, (input_nodes, seeds, blocks) in enumerate(dataloader):
# Load the input features as well as output labels
batch_inputs, batch_labels = load_subtensor(train_nfeat, train_labels,
seeds, input_nodes, device)
blocks = [block.int().to(device) for block in blocks]
# Compute loss and prediction
batch_pred = model(blocks, batch_inputs)
loss = loss_fcn(batch_pred, batch_labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
iter_tput.append(len(seeds) / (time.time() - tic_step))
if step % args.log_every == 0:
acc = compute_acc(batch_pred, batch_labels)
gpu_mem_alloc = th.cuda.max_memory_allocated() / 1000000 if th.cuda.is_available() else 0
print('Epoch {:05d} | Step {:05d} | Loss {:.4f} | Train Acc {:.4f} | Speed (samples/sec) {:.4f} | GPU {:.1f} MB'.format(
epoch, step, loss.item(), acc.item(), np.mean(iter_tput[3:]), gpu_mem_alloc))
tic_step = time.time()
toc = time.time()
print('Epoch Time(s): {:.4f}'.format(toc - tic))
if epoch >= 5:
avg += toc - tic
if epoch % args.eval_every == 0 and epoch != 0:
eval_acc = evaluate(model, val_g, val_nfeat, val_labels, val_nid, device)
print('Eval Acc {:.4f}'.format(eval_acc))
test_acc = evaluate(model, test_g, test_nfeat, test_labels, test_nid, device)
print('Test Acc: {:.4f}'.format(test_acc))
print('Avg epoch time: {}'.format(avg / (epoch - 4)))
if __name__ == '__main__':
argparser = argparse.ArgumentParser()
argparser.add_argument('--gpu', type=int, default=0,
help="GPU device ID. Use -1 for CPU training")
argparser.add_argument('--dataset', type=str, default='reddit')
argparser.add_argument('--num-epochs', type=int, default=20)
argparser.add_argument('--num-hidden', type=int, default=16)
argparser.add_argument('--num-layers', type=int, default=2)
argparser.add_argument('--fan-out', type=str, default='10,25')
argparser.add_argument('--batch-size', type=int, default=1000)
argparser.add_argument('--log-every', type=int, default=20)
argparser.add_argument('--eval-every', type=int, default=5)
argparser.add_argument('--lr', type=float, default=0.003)
argparser.add_argument('--dropout', type=float, default=0.5)
argparser.add_argument('--num-workers', type=int, default=4,
help="Number of sampling processes. Use 0 for no extra process.")
argparser.add_argument('--sample-gpu', action='store_true',
help="Perform the sampling process on the GPU. Must have 0 workers.")
argparser.add_argument('--inductive', action='store_true',
help="Inductive learning setting")
argparser.add_argument('--data-cpu', action='store_true',
help="By default the script puts all node features and labels "
"on GPU when using it to save time for data copy. This may "
"be undesired if they cannot fit in GPU memory at once. "
"Setting this flag makes all node features to be located"
"in the unified tensor instead.")
args = argparser.parse_args()
if args.gpu >= 0:
device = th.device('cuda:%d' % args.gpu)
else:
device = th.device('cpu')
if args.dataset == 'reddit':
g, n_classes = load_reddit()
elif args.dataset == 'ogbn-products':
g, n_classes = load_ogb('ogbn-products')
else:
raise Exception('unknown dataset')
if args.inductive:
train_g, val_g, test_g = inductive_split(g)
train_nfeat = train_g.ndata.pop('features')
val_nfeat = val_g.ndata.pop('features')
test_nfeat = test_g.ndata.pop('features')
train_labels = train_g.ndata.pop('labels')
val_labels = val_g.ndata.pop('labels')
test_labels = test_g.ndata.pop('labels')
else:
train_g = val_g = test_g = g
train_nfeat = val_nfeat = test_nfeat = g.ndata.pop('features')
train_labels = val_labels = test_labels = g.ndata.pop('labels')
if not args.data_cpu:
train_nfeat = train_nfeat.to(device)
train_labels = train_labels.to(device)
# Pack data
data = n_classes, train_g, val_g, test_g, train_nfeat, train_labels, \
val_nfeat, val_labels, test_nfeat, test_labels
run(args, device, data)
import os
import dgl import dgl
import numpy as np import numpy as np
import torch as th import torch as th
...@@ -9,12 +10,12 @@ import dgl.function as fn ...@@ -9,12 +10,12 @@ import dgl.function as fn
import dgl.nn.pytorch as dglnn import dgl.nn.pytorch as dglnn
import time import time
import argparse import argparse
from dgl.data import RedditDataset
from torch.nn.parallel import DistributedDataParallel from torch.nn.parallel import DistributedDataParallel
import tqdm import tqdm
from model import SAGE, compute_acc_unsupervised as compute_acc from model import SAGE, compute_acc_unsupervised as compute_acc
from negative_sampler import NegativeSampler from negative_sampler import NegativeSampler
from load_graph import load_reddit, load_ogb
class CrossEntropyLoss(nn.Module): class CrossEntropyLoss(nn.Module):
def forward(self, block_outputs, pos_graph, neg_graph): def forward(self, block_outputs, pos_graph, neg_graph):
...@@ -55,7 +56,9 @@ def evaluate(model, g, nfeat, labels, train_nids, val_nids, test_nids, device): ...@@ -55,7 +56,9 @@ def evaluate(model, g, nfeat, labels, train_nids, val_nids, test_nids, device):
#### Entry point #### Entry point
def run(proc_id, n_gpus, args, devices, data): def run(proc_id, n_gpus, args, devices, data):
# Unpack data # Unpack data
device = devices[proc_id] device = th.device(devices[proc_id])
if n_gpus > 0:
th.cuda.set_device(device)
if n_gpus > 1: if n_gpus > 1:
dist_init_method = 'tcp://{master_ip}:{master_port}'.format( dist_init_method = 'tcp://{master_ip}:{master_port}'.format(
master_ip='127.0.0.1', master_port='12345') master_ip='127.0.0.1', master_port='12345')
...@@ -64,26 +67,28 @@ def run(proc_id, n_gpus, args, devices, data): ...@@ -64,26 +67,28 @@ def run(proc_id, n_gpus, args, devices, data):
init_method=dist_init_method, init_method=dist_init_method,
world_size=world_size, world_size=world_size,
rank=proc_id) rank=proc_id)
train_mask, val_mask, test_mask, n_classes, g = data train_nid, val_nid, test_nid, n_classes, g, nfeat, labels = data
nfeat = g.ndata.pop('feat')
labels = g.ndata.pop('label') if args.data_device == 'gpu':
if not args.data_cpu:
nfeat = nfeat.to(device) nfeat = nfeat.to(device)
labels = labels.to(device) labels = labels.to(device)
elif args.data_device == 'uva':
nfeat = dgl.contrib.UnifiedTensor(nfeat, device=device)
labels = dgl.contrib.UnifiedTensor(labels, device=device)
in_feats = nfeat.shape[1] in_feats = nfeat.shape[1]
train_nid = th.LongTensor(np.nonzero(train_mask)).squeeze()
val_nid = th.LongTensor(np.nonzero(val_mask)).squeeze()
test_nid = th.LongTensor(np.nonzero(test_mask)).squeeze()
# Create PyTorch DataLoader for constructing blocks # Create PyTorch DataLoader for constructing blocks
n_edges = g.num_edges() n_edges = g.num_edges()
train_seeds = th.arange(n_edges) train_seeds = th.arange(n_edges)
if args.sample_gpu: if args.graph_device == 'gpu':
assert n_gpus > 0, "Must have GPUs to enable GPU sampling"
train_seeds = train_seeds.to(device) train_seeds = train_seeds.to(device)
g = g.to(device) g = g.to(device)
args.num_workers = 0
elif args.graph_device == 'uva':
train_seeds = train_seeds.to(device)
g.pin_memory_()
args.num_workers = 0
# Create sampler # Create sampler
sampler = dgl.dataloading.MultiLayerNeighborSampler( sampler = dgl.dataloading.MultiLayerNeighborSampler(
...@@ -94,7 +99,8 @@ def run(proc_id, n_gpus, args, devices, data): ...@@ -94,7 +99,8 @@ def run(proc_id, n_gpus, args, devices, data):
reverse_eids=th.cat([ reverse_eids=th.cat([
th.arange(n_edges // 2, n_edges), th.arange(n_edges // 2, n_edges),
th.arange(0, n_edges // 2)]).to(train_seeds), th.arange(0, n_edges // 2)]).to(train_seeds),
negative_sampler=NegativeSampler(g, args.num_negs, args.neg_share), negative_sampler=NegativeSampler(g, args.num_negs, args.neg_share,
device if args.graph_device == 'uva' else None),
device=device, device=device,
use_ddp=n_gpus > 1, use_ddp=n_gpus > 1,
batch_size=args.batch_size, batch_size=args.batch_size,
...@@ -119,13 +125,10 @@ def run(proc_id, n_gpus, args, devices, data): ...@@ -119,13 +125,10 @@ def run(proc_id, n_gpus, args, devices, data):
best_eval_acc = 0 best_eval_acc = 0
best_test_acc = 0 best_test_acc = 0
for epoch in range(args.num_epochs): for epoch in range(args.num_epochs):
if n_gpus > 1:
dataloader.set_epoch(epoch)
tic = time.time() tic = time.time()
# Loop over the dataloader to sample the computation dependency graph as a list of # Loop over the dataloader to sample the computation dependency graph as a list of
# blocks. # blocks.
tic_step = time.time() tic_step = time.time()
for step, (input_nodes, pos_graph, neg_graph, blocks) in enumerate(dataloader): for step, (input_nodes, pos_graph, neg_graph, blocks) in enumerate(dataloader):
batch_inputs = nfeat[input_nodes].to(device) batch_inputs = nfeat[input_nodes].to(device)
...@@ -148,7 +151,7 @@ def run(proc_id, n_gpus, args, devices, data): ...@@ -148,7 +151,7 @@ def run(proc_id, n_gpus, args, devices, data):
iter_neg.append(neg_edges / (t - tic_step)) iter_neg.append(neg_edges / (t - tic_step))
iter_d.append(d_step - tic_step) iter_d.append(d_step - tic_step)
iter_t.append(t - d_step) iter_t.append(t - d_step)
if step % args.log_every == 0: if step % args.log_every == 0 and proc_id == 0:
gpu_mem_alloc = th.cuda.max_memory_allocated() / 1000000 if th.cuda.is_available() else 0 gpu_mem_alloc = th.cuda.max_memory_allocated() / 1000000 if th.cuda.is_available() else 0
print('[{}]Epoch {:05d} | Step {:05d} | Loss {:.4f} | Speed (samples/sec) {:.4f}|{:.4f} | Load {:.4f}| train {:.4f} | GPU {:.1f} MB'.format( print('[{}]Epoch {:05d} | Step {:05d} | Loss {:.4f} | Speed (samples/sec) {:.4f}|{:.4f} | Load {:.4f}| train {:.4f} | GPU {:.1f} MB'.format(
proc_id, epoch, step, loss.item(), np.mean(iter_pos[3:]), np.mean(iter_neg[3:]), np.mean(iter_d[3:]), np.mean(iter_t[3:]), gpu_mem_alloc)) proc_id, epoch, step, loss.item(), np.mean(iter_pos[3:]), np.mean(iter_neg[3:]), np.mean(iter_d[3:]), np.mean(iter_t[3:]), gpu_mem_alloc))
...@@ -172,23 +175,49 @@ def run(proc_id, n_gpus, args, devices, data): ...@@ -172,23 +175,49 @@ def run(proc_id, n_gpus, args, devices, data):
if proc_id == 0: if proc_id == 0:
print('Avg epoch time: {}'.format(avg / (epoch - 4))) print('Avg epoch time: {}'.format(avg / (epoch - 4)))
def main(args, devices): def main(args):
# load reddit data devices = list(map(int, args.gpu.split(',')))
data = RedditDataset(self_loop=False) n_gpus = len(devices)
n_classes = data.num_classes
g = data[0] # load dataset
train_mask = g.ndata['train_mask'] if args.dataset == 'reddit':
val_mask = g.ndata['val_mask'] g, n_classes = load_reddit(self_loop=False)
test_mask = g.ndata['test_mask'] elif args.dataset == 'ogbn-products':
g, n_classes = load_ogb('ogbn-products')
else:
raise Exception('unknown dataset')
train_nid = g.ndata.pop('train_mask').nonzero().squeeze()
val_nid = g.ndata.pop('val_mask').nonzero().squeeze()
test_nid = g.ndata.pop('test_mask').nonzero().squeeze()
nfeat = g.ndata.pop('features')
labels = g.ndata.pop('labels')
# Create csr/coo/csc formats before launching training processes with multi-gpu. # Create csr/coo/csc formats before launching training processes with multi-gpu.
# This avoids creating certain formats in each sub-process, which saves memory and CPU. # This avoids creating certain formats in each sub-process, which saves memory and CPU.
g.create_formats_() g.create_formats_()
# this to avoid competition overhead on machines with many cores.
# Change it to a proper number on your machine, especially for multi-GPU training.
os.environ['OMP_NUM_THREADS'] = str(mp.cpu_count() // 2 // n_gpus)
if n_gpus > 1:
# Copy the graph to shared memory explicitly before pinning.
# In other cases, we can just rely on fork's copy-on-write.
# TODO: the original graph g is not freed.
if args.graph_device == 'uva':
g = g.shared_memory('g')
if args.data_device == 'uva':
nfeat = nfeat.share_memory_()
labels = labels.share_memory_()
# Pack data # Pack data
data = train_mask, val_mask, test_mask, n_classes, g data = train_nid, val_nid, test_nid, n_classes, g, nfeat, labels
n_gpus = len(devices)
if devices[0] == -1: if devices[0] == -1:
assert args.graph_device == 'cpu', \
f"Must have GPUs to enable {args.graph_device} sampling."
assert args.data_device == 'cpu', \
f"Must have GPUs to enable {args.data_device} feature storage."
run(0, 0, args, ['cpu'], data) run(0, 0, args, ['cpu'], data)
elif n_gpus == 1: elif n_gpus == 1:
run(0, n_gpus, args, devices, data) run(0, n_gpus, args, devices, data)
...@@ -207,6 +236,8 @@ if __name__ == '__main__': ...@@ -207,6 +236,8 @@ if __name__ == '__main__':
argparser.add_argument("--gpu", type=str, default='0', argparser.add_argument("--gpu", type=str, default='0',
help="GPU, can be a list of gpus for multi-gpu training," help="GPU, can be a list of gpus for multi-gpu training,"
" e.g., 0,1,2,3; -1 for CPU") " e.g., 0,1,2,3; -1 for CPU")
argparser.add_argument('--dataset', type=str, default='reddit',
choices=('reddit', 'ogbn-products'))
argparser.add_argument('--num-epochs', type=int, default=20) argparser.add_argument('--num-epochs', type=int, default=20)
argparser.add_argument('--num-hidden', type=int, default=16) argparser.add_argument('--num-hidden', type=int, default=16)
argparser.add_argument('--num-layers', type=int, default=2) argparser.add_argument('--num-layers', type=int, default=2)
...@@ -221,15 +252,16 @@ if __name__ == '__main__': ...@@ -221,15 +252,16 @@ if __name__ == '__main__':
argparser.add_argument('--dropout', type=float, default=0.5) argparser.add_argument('--dropout', type=float, default=0.5)
argparser.add_argument('--num-workers', type=int, default=0, argparser.add_argument('--num-workers', type=int, default=0,
help="Number of sampling processes. Use 0 for no extra process.") help="Number of sampling processes. Use 0 for no extra process.")
argparser.add_argument('--sample-gpu', action='store_true', argparser.add_argument('--graph-device', choices=('cpu', 'gpu', 'uva'), default='cpu',
help="Perform the sampling process on the GPU. Must have 0 workers.") help="Device to perform the sampling. "
argparser.add_argument('--data-cpu', action='store_true', "Must have 0 workers for 'gpu' and 'uva'")
argparser.add_argument('--data-device', choices=('cpu', 'gpu', 'uva'), default='gpu',
help="By default the script puts all node features and labels " help="By default the script puts all node features and labels "
"on GPU when using it to save time for data copy. This may " "on GPU when using it to save time for data copy. This may "
"be undesired if they cannot fit in GPU memory at once. " "be undesired if they cannot fit in GPU memory at once. "
"This flag disables that.") "Use 'cpu' to keep the features on host memory and "
"'uva' to enable UnifiedTensor (GPU zero-copy access on "
"pinned host memory).")
args = argparser.parse_args() args = argparser.parse_args()
devices = list(map(int, args.gpu.split(','))) main(args)
main(args, devices)
...@@ -232,6 +232,28 @@ ...@@ -232,6 +232,28 @@
}); \ }); \
}); });
#define CHECK_VALID_CONTEXT(VAR1, VAR2) \
CHECK(((VAR1)->ctx == (VAR2)->ctx) || ((VAR1)->ctx.device_type == kDLCPUPinned)) \
<< "Expected " << (#VAR2) << "(" << (VAR2)->ctx << ")" << " to have the same device " \
<< "context as " << (#VAR1) << "(" << (VAR1)->ctx << "). " \
<< "Or " << (#VAR1) << "(" << (VAR1)->ctx << ")" << " is pinned";
/*
* Macro to dispatch according to the context of array and dtype of csr
* to enable CUDA UVA ops.
* Context check is covered here to avoid confusion with CHECK_SAME_CONTEXT.
* If csr has the same context with array, same behivor as ATEN_CSR_SWITCH_CUDA.
* If csr is pinned, array's context will conduct the actual operation.
*/
#define ATEN_CSR_SWITCH_CUDA_UVA(csr, array, XPU, IdType, op, ...) do { \
CHECK_VALID_CONTEXT(csr.indices, array); \
ATEN_XPU_SWITCH_CUDA(array->ctx.device_type, XPU, op, { \
ATEN_ID_TYPE_SWITCH((csr).indptr->dtype, IdType, { \
{__VA_ARGS__} \
}); \
}); \
} while (0)
// Macro to dispatch according to device context (allowing cuda) // Macro to dispatch according to device context (allowing cuda)
#ifdef DGL_USE_CUDA #ifdef DGL_USE_CUDA
#define ATEN_CSR_SWITCH_CUDA(csr, XPU, IdType, op, ...) \ #define ATEN_CSR_SWITCH_CUDA(csr, XPU, IdType, op, ...) \
......
...@@ -126,7 +126,8 @@ class MultiLayerNeighborSampler(NeighborSamplingMixin, BlockSampler): ...@@ -126,7 +126,8 @@ class MultiLayerNeighborSampler(NeighborSamplingMixin, BlockSampler):
@classmethod @classmethod
def exclude_edges_in_frontier(cls, g): def exclude_edges_in_frontier(cls, g):
return not isinstance(g, distributed.DistGraph) and g.device == F.cpu() return not isinstance(g, distributed.DistGraph) and g.device == F.cpu() \
and not g.is_pinned()
def sample_frontier(self, block_id, g, seed_nodes, exclude_eids=None): def sample_frontier(self, block_id, g, seed_nodes, exclude_eids=None):
fanout = self.fanouts[block_id] fanout = self.fanouts[block_id]
......
...@@ -12,8 +12,8 @@ from ..dataloader import NodeCollator, EdgeCollator, GraphCollator, SubgraphIter ...@@ -12,8 +12,8 @@ from ..dataloader import NodeCollator, EdgeCollator, GraphCollator, SubgraphIter
from ...distributed import DistGraph from ...distributed import DistGraph
from ...ndarray import NDArray as DGLNDArray from ...ndarray import NDArray as DGLNDArray
from ... import backend as F from ... import backend as F
from ...base import DGLError from ...base import DGLError, dgl_warning
from ...utils import to_dgl_context from ...utils import to_dgl_context, check_device
from ..._ffi import streams as FS from ..._ffi import streams as FS
__all__ = ['NodeDataLoader', 'EdgeDataLoader', 'GraphDataLoader', __all__ = ['NodeDataLoader', 'EdgeDataLoader', 'GraphDataLoader',
...@@ -454,7 +454,7 @@ def _init_dataloader(collator, device, dataloader_kwargs, use_ddp, ddp_seed): ...@@ -454,7 +454,7 @@ def _init_dataloader(collator, device, dataloader_kwargs, use_ddp, ddp_seed):
use_scalar_batcher = False use_scalar_batcher = False
scalar_batcher = None scalar_batcher = None
if th.device(device) != th.device('cpu') and dataloader_kwargs.get('num_workers', 0) == 0: if device.type == 'cuda' and dataloader_kwargs.get('num_workers', 0) == 0:
batch_size = dataloader_kwargs.get('batch_size', 1) batch_size = dataloader_kwargs.get('batch_size', 1)
if batch_size > 1: if batch_size > 1:
...@@ -579,7 +579,8 @@ class NodeDataLoader(DataLoader): ...@@ -579,7 +579,8 @@ class NodeDataLoader(DataLoader):
depending on the value of :attr:`num_workers`: depending on the value of :attr:`num_workers`:
- If :attr:`num_workers` is set to 0, the sampling will happen on the CPU, and then the - If :attr:`num_workers` is set to 0, the sampling will happen on the CPU, and then the
subgraphs will be constructed directly on the GPU. This is the recommend setting in subgraphs will be constructed directly on the GPU. This hybrid mode is deprecated and
will be removed in the next release. Use UVA sampling instead, especially in
multi-GPU configurations. multi-GPU configurations.
- Otherwise, if :attr:`num_workers` is greater than 0, both the sampling and subgraph - Otherwise, if :attr:`num_workers` is greater than 0, both the sampling and subgraph
...@@ -599,9 +600,23 @@ class NodeDataLoader(DataLoader): ...@@ -599,9 +600,23 @@ class NodeDataLoader(DataLoader):
else: else:
dataloader_kwargs[k] = v dataloader_kwargs[k] = v
if device is None:
# default to the same device the graph is on # default to the same device the graph is on
device = th.device(g.device) device = th.device(g.device if device is None else device)
num_workers = dataloader_kwargs.get('num_workers', 0)
if g.device.type == 'cuda' or g.is_pinned():
sampling_type = 'UVA sampling' if g.is_pinned() else 'GPU sampling'
assert device.type == 'cuda', \
f"'device' must be a cuda device to enable {sampling_type}, got {device}."
assert check_device(nids, device), \
f"'nids' must be on {device} to use {sampling_type}."
assert num_workers == 0, \
f"'num_workers' must be 0 to use {sampling_type}."
# g is on CPU
elif device.type == 'cuda' and num_workers == 0:
dgl_warning('CPU-GPU hybrid sampling is deprecated and will be removed '
'in the next release. Use pure GPU sampling if your graph can '
'fit onto the GPU memory, or UVA sampling in other cases.')
if not g.is_homogeneous: if not g.is_homogeneous:
if load_input or load_output: if load_input or load_output:
...@@ -614,7 +629,6 @@ class NodeDataLoader(DataLoader): ...@@ -614,7 +629,6 @@ class NodeDataLoader(DataLoader):
# But if async_load is enabled, set_output_context should be skipped as # But if async_load is enabled, set_output_context should be skipped as
# we'd like to avoid any graph/data transfer graphs across devices in # we'd like to avoid any graph/data transfer graphs across devices in
# sampler. Such transfer will be handled in dataloader. # sampler. Such transfer will be handled in dataloader.
num_workers = dataloader_kwargs.get('num_workers', 0)
if ((not async_load) and if ((not async_load) and
callable(getattr(graph_sampler, "set_output_context", None)) and callable(getattr(graph_sampler, "set_output_context", None)) and
num_workers == 0): num_workers == 0):
...@@ -865,8 +879,8 @@ class EdgeDataLoader(DataLoader): ...@@ -865,8 +879,8 @@ class EdgeDataLoader(DataLoader):
* Link prediction on heterogeneous graph: RGCN for link prediction. * Link prediction on heterogeneous graph: RGCN for link prediction.
""" """
collator_arglist = inspect.getfullargspec(EdgeCollator).args collator_arglist = inspect.getfullargspec(EdgeCollator).args
def __init__(self, g, eids, graph_sampler, device='cpu', use_ddp=False, ddp_seed=0,
**kwargs): def __init__(self, g, eids, graph_sampler, device=None, use_ddp=False, ddp_seed=0, **kwargs):
_check_graph_type(g) _check_graph_type(g)
collator_kwargs = {} collator_kwargs = {}
dataloader_kwargs = {} dataloader_kwargs = {}
...@@ -876,13 +890,26 @@ class EdgeDataLoader(DataLoader): ...@@ -876,13 +890,26 @@ class EdgeDataLoader(DataLoader):
else: else:
dataloader_kwargs[k] = v dataloader_kwargs[k] = v
if device is None:
# default to the same device the graph is on # default to the same device the graph is on
device = th.device(g.device) device = th.device(g.device if device is None else device)
num_workers = dataloader_kwargs.get('num_workers', 0)
if g.device.type == 'cuda' or g.is_pinned():
sampling_type = 'UVA sampling' if g.is_pinned() else 'GPU sampling'
assert device.type == 'cuda', \
f"'device' must be a cuda device to enable {sampling_type}, got {device}."
assert check_device(eids, device), \
f"'eids' must be on {device} to use {sampling_type}."
assert num_workers == 0, \
f"'num_workers' must be 0 to use {sampling_type}."
# g is on CPU
elif device.type == 'cuda' and num_workers == 0:
dgl_warning('CPU-GPU hybrid sampling is deprecated and will be removed '
'in the next release. Use pure GPU sampling if your graph can '
'fit onto the GPU memory, or UVA sampling in other cases.')
# if the sampler supports it, tell it to output to the # if the sampler supports it, tell it to output to the
# specified device # specified device
num_workers = dataloader_kwargs.get('num_workers', 0)
if callable(getattr(graph_sampler, "set_output_context", None)) and num_workers == 0: if callable(getattr(graph_sampler, "set_output_context", None)) and num_workers == 0:
graph_sampler.set_output_context(to_dgl_context(device)) graph_sampler.set_output_context(to_dgl_context(device))
......
...@@ -82,6 +82,8 @@ class NDArrayBase(object): ...@@ -82,6 +82,8 @@ class NDArrayBase(object):
Indicates the alignment requirement when converting to dlpack. Will copy to a Indicates the alignment requirement when converting to dlpack. Will copy to a
new tensor if the alignment requirement is not satisfied. new tensor if the alignment requirement is not satisfied.
0 means no alignment requirement. 0 means no alignment requirement.
Will copy to a new tensor if the array is pinned because some backends,
e.g., pytorch, do not support kDLCPUPinned device type.
Returns Returns
......
...@@ -654,6 +654,17 @@ class DistGraph: ...@@ -654,6 +654,17 @@ class DistGraph:
# TODO(da?): describe when self._g is None and device shouldn't be called. # TODO(da?): describe when self._g is None and device shouldn't be called.
return F.cpu() return F.cpu()
def is_pinned(self):
"""Check if the graph structure is pinned to the page-locked memory.
Returns
-------
bool
True if the graph structure is pinned.
"""
# (Xin Yao): Currently we don't support pinning a DistGraph.
return False
@property @property
def ntypes(self): def ntypes(self):
"""Return the list of node types of this graph. """Return the list of node types of this graph.
......
...@@ -5467,7 +5467,7 @@ class DGLHeteroGraph(object): ...@@ -5467,7 +5467,7 @@ class DGLHeteroGraph(object):
return self.to(F.cpu()) return self.to(F.cpu())
def pin_memory_(self): def pin_memory_(self):
"""Pin the graph structure to the page-locked memory. """Pin the graph structure to the page-locked memory for GPU zero-copy access.
This is an **inplace** method. The graph structure must be on CPU to be pinned. This is an **inplace** method. The graph structure must be on CPU to be pinned.
If the graph struture is already pinned, the function directly returns it. If the graph struture is already pinned, the function directly returns it.
...@@ -5500,6 +5500,30 @@ class DGLHeteroGraph(object): ...@@ -5500,6 +5500,30 @@ class DGLHeteroGraph(object):
>>> g1 = g.formats(['csc']) >>> g1 = g.formats(['csc'])
>>> assert not g1.is_pinned() >>> assert not g1.is_pinned()
The pinned graph can be access from both CPU and GPU. The concrete device depends
on the context of ``query``. For example, ``eid`` in ``find_edges()`` is a query.
When ``eid`` is on CPU, ``find_edges()`` is executed on CPU, and the returned
values are CPU tensors
>>> g.unpin_memory_()
>>> g.create_formats_()
>>> g.pin_memory_()
>>> eid = torch.tensor([1])
>>> g.find_edges(eids)
(tensor([0]), tensor([2]))
Moving ``eid`` to GPU, ``find_edges()`` will be executed on GPU, and the returned
values are GPU tensors.
>>> eid = eid.to('cuda:0')
>>> g.find_edges(eids)
(tensor([0], device='cuda:0'), tensor([2], device='cuda:0'))
If you don't provide a ``query``, methods will be executed on CPU by default.
>>> g.in_degrees()
tensor([0, 1, 1])
""" """
if self._graph.is_pinned(): if self._graph.is_pinned():
return self return self
......
...@@ -287,7 +287,7 @@ def sample_neighbors(g, nodes, fanout, edge_dir='in', prob=None, replace=False, ...@@ -287,7 +287,7 @@ def sample_neighbors(g, nodes, fanout, edge_dir='in', prob=None, replace=False,
tensor([False, False, False]) tensor([False, False, False])
""" """
if g.device == F.cpu(): if F.device_type(g.device) == 'cpu' and not g.is_pinned():
frontier = _sample_neighbors( frontier = _sample_neighbors(
g, nodes, fanout, edge_dir=edge_dir, prob=prob, replace=replace, g, nodes, fanout, edge_dir=edge_dir, prob=prob, replace=replace,
copy_ndata=copy_ndata, copy_edata=copy_edata, exclude_edges=exclude_edges) copy_ndata=copy_ndata, copy_edata=copy_edata, exclude_edges=exclude_edges)
......
...@@ -30,7 +30,7 @@ def prepare_tensor(g, data, name): ...@@ -30,7 +30,7 @@ def prepare_tensor(g, data, name):
Data in tensor object. Data in tensor object.
""" """
if F.is_tensor(data): if F.is_tensor(data):
if F.dtype(data) != g.idtype or F.context(data) != g.device: if not g.is_pinned() and (F.dtype(data) != g.idtype or F.context(data) != g.device):
raise DGLError('Expect argument "{}" to have data type {} and device ' raise DGLError('Expect argument "{}" to have data type {} and device '
'context {}. But got {} and {}.'.format( 'context {}. But got {} and {}.'.format(
name, g.idtype, g.device, F.dtype(data), F.context(data))) name, g.idtype, g.device, F.dtype(data), F.context(data)))
...@@ -130,6 +130,26 @@ def check_all_same_idtype(glist, name): ...@@ -130,6 +130,26 @@ def check_all_same_idtype(glist, name):
raise DGLError('Expect {}[{}] to have {} type ID, but got {}.'.format( raise DGLError('Expect {}[{}] to have {} type ID, but got {}.'.format(
name, i, idtype, g.idtype)) name, i, idtype, g.idtype))
def check_device(data, device):
"""Check if data is on the target device.
Parameters
----------
data : Tensor or dict[str, Tensor]
device: Backend device.
Returns
-------
Bool: True if the data is on the target device.
"""
if isinstance(data, dict):
for v in data.values():
if v.device != device:
return False
elif data.device != device:
return False
return True
def check_all_same_device(glist, name): def check_all_same_device(glist, name):
"""Check all the graphs have the same device.""" """Check all the graphs have the same device."""
if len(glist) == 0: if len(glist) == 0:
......
...@@ -112,10 +112,12 @@ IdArray HStack(IdArray lhs, IdArray rhs) { ...@@ -112,10 +112,12 @@ IdArray HStack(IdArray lhs, IdArray rhs) {
NDArray IndexSelect(NDArray array, IdArray index) { NDArray IndexSelect(NDArray array, IdArray index) {
NDArray ret; NDArray ret;
CHECK_SAME_CONTEXT(array, index);
CHECK_GE(array->ndim, 1) << "Only support array with at least 1 dimension"; CHECK_GE(array->ndim, 1) << "Only support array with at least 1 dimension";
CHECK_EQ(index->ndim, 1) << "Index array must be an 1D array."; CHECK_EQ(index->ndim, 1) << "Index array must be an 1D array.";
ATEN_XPU_SWITCH_CUDA(array->ctx.device_type, XPU, "IndexSelect", { // if array is not pinned, index has the same context as array
// if array is pinned, op dispatching depends on the context of index
CHECK_VALID_CONTEXT(array, index);
ATEN_XPU_SWITCH_CUDA(index->ctx.device_type, XPU, "IndexSelect", {
ATEN_DTYPE_SWITCH(array->dtype, DType, "values", { ATEN_DTYPE_SWITCH(array->dtype, DType, "values", {
ATEN_ID_TYPE_SWITCH(index->dtype, IdType, { ATEN_ID_TYPE_SWITCH(index->dtype, IdType, {
ret = impl::IndexSelect<XPU, DType, IdType>(array, index); ret = impl::IndexSelect<XPU, DType, IdType>(array, index);
...@@ -343,9 +345,8 @@ NDArray CSRIsNonZero(CSRMatrix csr, NDArray row, NDArray col) { ...@@ -343,9 +345,8 @@ NDArray CSRIsNonZero(CSRMatrix csr, NDArray row, NDArray col) {
NDArray ret; NDArray ret;
CHECK_SAME_DTYPE(csr.indices, row); CHECK_SAME_DTYPE(csr.indices, row);
CHECK_SAME_DTYPE(csr.indices, col); CHECK_SAME_DTYPE(csr.indices, col);
CHECK_SAME_CONTEXT(csr.indices, row); CHECK_SAME_CONTEXT(row, col);
CHECK_SAME_CONTEXT(csr.indices, col); ATEN_CSR_SWITCH_CUDA_UVA(csr, row, XPU, IdType, "CSRIsNonZero", {
ATEN_CSR_SWITCH_CUDA(csr, XPU, IdType, "CSRIsNonZero", {
ret = impl::CSRIsNonZero<XPU, IdType>(csr, row, col); ret = impl::CSRIsNonZero<XPU, IdType>(csr, row, col);
}); });
return ret; return ret;
...@@ -371,8 +372,7 @@ int64_t CSRGetRowNNZ(CSRMatrix csr, int64_t row) { ...@@ -371,8 +372,7 @@ int64_t CSRGetRowNNZ(CSRMatrix csr, int64_t row) {
NDArray CSRGetRowNNZ(CSRMatrix csr, NDArray row) { NDArray CSRGetRowNNZ(CSRMatrix csr, NDArray row) {
NDArray ret; NDArray ret;
CHECK_SAME_DTYPE(csr.indices, row); CHECK_SAME_DTYPE(csr.indices, row);
CHECK_SAME_CONTEXT(csr.indices, row); ATEN_CSR_SWITCH_CUDA_UVA(csr, row, XPU, IdType, "CSRGetRowNNZ", {
ATEN_CSR_SWITCH_CUDA(csr, XPU, IdType, "CSRGetRowNNZ", {
ret = impl::CSRGetRowNNZ<XPU, IdType>(csr, row); ret = impl::CSRGetRowNNZ<XPU, IdType>(csr, row);
}); });
return ret; return ret;
...@@ -410,9 +410,8 @@ NDArray CSRGetData(CSRMatrix csr, NDArray rows, NDArray cols) { ...@@ -410,9 +410,8 @@ NDArray CSRGetData(CSRMatrix csr, NDArray rows, NDArray cols) {
NDArray ret; NDArray ret;
CHECK_SAME_DTYPE(csr.indices, rows); CHECK_SAME_DTYPE(csr.indices, rows);
CHECK_SAME_DTYPE(csr.indices, cols); CHECK_SAME_DTYPE(csr.indices, cols);
CHECK_SAME_CONTEXT(csr.indices, rows); CHECK_SAME_CONTEXT(rows, cols);
CHECK_SAME_CONTEXT(csr.indices, cols); ATEN_CSR_SWITCH_CUDA_UVA(csr, rows, XPU, IdType, "CSRGetData", {
ATEN_CSR_SWITCH_CUDA(csr, XPU, IdType, "CSRGetData", {
ret = impl::CSRGetData<XPU, IdType>(csr, rows, cols); ret = impl::CSRGetData<XPU, IdType>(csr, rows, cols);
}); });
return ret; return ret;
...@@ -423,10 +422,9 @@ NDArray CSRGetData(CSRMatrix csr, NDArray rows, NDArray cols, NDArray weights, D ...@@ -423,10 +422,9 @@ NDArray CSRGetData(CSRMatrix csr, NDArray rows, NDArray cols, NDArray weights, D
NDArray ret; NDArray ret;
CHECK_SAME_DTYPE(csr.indices, rows); CHECK_SAME_DTYPE(csr.indices, rows);
CHECK_SAME_DTYPE(csr.indices, cols); CHECK_SAME_DTYPE(csr.indices, cols);
CHECK_SAME_CONTEXT(csr.indices, rows); CHECK_SAME_CONTEXT(rows, cols);
CHECK_SAME_CONTEXT(csr.indices, cols); CHECK_SAME_CONTEXT(rows, weights);
CHECK_SAME_CONTEXT(csr.indices, weights); ATEN_CSR_SWITCH_CUDA_UVA(csr, rows, XPU, IdType, "CSRGetData", {
ATEN_CSR_SWITCH_CUDA(csr, XPU, IdType, "CSRGetData", {
ret = impl::CSRGetData<XPU, IdType, DType>(csr, rows, cols, weights, filler); ret = impl::CSRGetData<XPU, IdType, DType>(csr, rows, cols, weights, filler);
}); });
return ret; return ret;
...@@ -441,10 +439,9 @@ std::vector<NDArray> CSRGetDataAndIndices( ...@@ -441,10 +439,9 @@ std::vector<NDArray> CSRGetDataAndIndices(
CSRMatrix csr, NDArray rows, NDArray cols) { CSRMatrix csr, NDArray rows, NDArray cols) {
CHECK_SAME_DTYPE(csr.indices, rows); CHECK_SAME_DTYPE(csr.indices, rows);
CHECK_SAME_DTYPE(csr.indices, cols); CHECK_SAME_DTYPE(csr.indices, cols);
CHECK_SAME_CONTEXT(csr.indices, rows); CHECK_SAME_CONTEXT(rows, cols);
CHECK_SAME_CONTEXT(csr.indices, cols);
std::vector<NDArray> ret; std::vector<NDArray> ret;
ATEN_CSR_SWITCH_CUDA(csr, XPU, IdType, "CSRGetDataAndIndices", { ATEN_CSR_SWITCH_CUDA_UVA(csr, rows, XPU, IdType, "CSRGetDataAndIndices", {
ret = impl::CSRGetDataAndIndices<XPU, IdType>(csr, rows, cols); ret = impl::CSRGetDataAndIndices<XPU, IdType>(csr, rows, cols);
}); });
return ret; return ret;
...@@ -491,9 +488,8 @@ CSRMatrix CSRSliceRows(CSRMatrix csr, int64_t start, int64_t end) { ...@@ -491,9 +488,8 @@ CSRMatrix CSRSliceRows(CSRMatrix csr, int64_t start, int64_t end) {
CSRMatrix CSRSliceRows(CSRMatrix csr, NDArray rows) { CSRMatrix CSRSliceRows(CSRMatrix csr, NDArray rows) {
CHECK_SAME_DTYPE(csr.indices, rows); CHECK_SAME_DTYPE(csr.indices, rows);
CHECK_SAME_CONTEXT(csr.indices, rows);
CSRMatrix ret; CSRMatrix ret;
ATEN_CSR_SWITCH_CUDA(csr, XPU, IdType, "CSRSliceRows", { ATEN_CSR_SWITCH_CUDA_UVA(csr, rows, XPU, IdType, "CSRSliceRows", {
ret = impl::CSRSliceRows<XPU, IdType>(csr, rows); ret = impl::CSRSliceRows<XPU, IdType>(csr, rows);
}); });
return ret; return ret;
...@@ -502,10 +498,9 @@ CSRMatrix CSRSliceRows(CSRMatrix csr, NDArray rows) { ...@@ -502,10 +498,9 @@ CSRMatrix CSRSliceRows(CSRMatrix csr, NDArray rows) {
CSRMatrix CSRSliceMatrix(CSRMatrix csr, NDArray rows, NDArray cols) { CSRMatrix CSRSliceMatrix(CSRMatrix csr, NDArray rows, NDArray cols) {
CHECK_SAME_DTYPE(csr.indices, rows); CHECK_SAME_DTYPE(csr.indices, rows);
CHECK_SAME_DTYPE(csr.indices, cols); CHECK_SAME_DTYPE(csr.indices, cols);
CHECK_SAME_CONTEXT(csr.indices, rows); CHECK_SAME_CONTEXT(rows, cols);
CHECK_SAME_CONTEXT(csr.indices, cols);
CSRMatrix ret; CSRMatrix ret;
ATEN_CSR_SWITCH_CUDA(csr, XPU, IdType, "CSRSliceMatrix", { ATEN_CSR_SWITCH_CUDA_UVA(csr, rows, XPU, IdType, "CSRSliceMatrix", {
ret = impl::CSRSliceMatrix<XPU, IdType>(csr, rows, cols); ret = impl::CSRSliceMatrix<XPU, IdType>(csr, rows, cols);
}); });
return ret; return ret;
...@@ -554,7 +549,7 @@ COOMatrix CSRRowWiseSampling( ...@@ -554,7 +549,7 @@ COOMatrix CSRRowWiseSampling(
CSRMatrix mat, IdArray rows, int64_t num_samples, FloatArray prob, bool replace) { CSRMatrix mat, IdArray rows, int64_t num_samples, FloatArray prob, bool replace) {
COOMatrix ret; COOMatrix ret;
if (IsNullArray(prob)) { if (IsNullArray(prob)) {
ATEN_CSR_SWITCH_CUDA(mat, XPU, IdType, "CSRRowWiseSampling", { ATEN_CSR_SWITCH_CUDA_UVA(mat, rows, XPU, IdType, "CSRRowWiseSampling", {
ret = impl::CSRRowWiseSamplingUniform<XPU, IdType>(mat, rows, num_samples, replace); ret = impl::CSRRowWiseSamplingUniform<XPU, IdType>(mat, rows, num_samples, replace);
}); });
} else { } else {
......
...@@ -27,7 +27,8 @@ NDArray IndexSelect(NDArray array, IdArray index) { ...@@ -27,7 +27,8 @@ NDArray IndexSelect(NDArray array, IdArray index) {
shape.emplace_back(array->shape[d]); shape.emplace_back(array->shape[d]);
} }
NDArray ret = NDArray::Empty(shape, array->dtype, array->ctx); // use index->ctx for kDLCPUPinned array
NDArray ret = NDArray::Empty(shape, array->dtype, index->ctx);
if (len == 0) if (len == 0)
return ret; return ret;
DType* ret_data = static_cast<DType*>(ret->data); DType* ret_data = static_cast<DType*>(ret->data);
......
...@@ -254,7 +254,7 @@ COOMatrix CSRRowWiseSamplingUniform(CSRMatrix mat, ...@@ -254,7 +254,7 @@ COOMatrix CSRRowWiseSamplingUniform(CSRMatrix mat,
IdArray rows, IdArray rows,
const int64_t num_picks, const int64_t num_picks,
const bool replace) { const bool replace) {
const auto& ctx = mat.indptr->ctx; const auto& ctx = rows->ctx;
auto device = runtime::DeviceAPI::Get(ctx); auto device = runtime::DeviceAPI::Get(ctx);
// TODO(dlasalle): Once the device api supports getting the stream from the // TODO(dlasalle): Once the device api supports getting the stream from the
......
...@@ -264,14 +264,14 @@ CSRMatrix CSRSliceRows(CSRMatrix csr, NDArray rows) { ...@@ -264,14 +264,14 @@ CSRMatrix CSRSliceRows(CSRMatrix csr, NDArray rows) {
const int nb = (nnz + nt - 1) / nt; const int nb = (nnz + nt - 1) / nt;
// Copy indices. // Copy indices.
IdArray ret_indices = NDArray::Empty({nnz}, csr.indptr->dtype, csr.indptr->ctx); IdArray ret_indices = NDArray::Empty({nnz}, csr.indptr->dtype, rows->ctx);
CUDA_KERNEL_CALL(_SegmentCopyKernel, CUDA_KERNEL_CALL(_SegmentCopyKernel,
nb, nt, 0, thr_entry->stream, nb, nt, 0, thr_entry->stream,
csr.indptr.Ptr<IdType>(), csr.indices.Ptr<IdType>(), csr.indptr.Ptr<IdType>(), csr.indices.Ptr<IdType>(),
rows.Ptr<IdType>(), nnz, len, rows.Ptr<IdType>(), nnz, len,
ret_indptr.Ptr<IdType>(), ret_indices.Ptr<IdType>()); ret_indptr.Ptr<IdType>(), ret_indices.Ptr<IdType>());
// Copy data. // Copy data.
IdArray ret_data = NDArray::Empty({nnz}, csr.indptr->dtype, csr.indptr->ctx); IdArray ret_data = NDArray::Empty({nnz}, csr.indptr->dtype, rows->ctx);
CUDA_KERNEL_CALL(_SegmentCopyKernel, CUDA_KERNEL_CALL(_SegmentCopyKernel,
nb, nt, 0, thr_entry->stream, nb, nt, 0, thr_entry->stream,
csr.indptr.Ptr<IdType>(), CSRHasData(csr)? csr.data.Ptr<IdType>() : nullptr, csr.indptr.Ptr<IdType>(), CSRHasData(csr)? csr.data.Ptr<IdType>() : nullptr,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment