Unverified Commit 8b64ae59 authored by nv-dlasalle's avatar nv-dlasalle Committed by GitHub
Browse files

[Performance] Perform to_block on the GPU when the dataloader is created with...


[Performance] Perform to_block on the GPU when the dataloader is created with a GPU `device`. (#3016)

* add output device for dataloading

* Update dataloader

* Get sampler device from dataloader

* Fix line length

* Update examples

* Fix to_block GPU for empty relation types

* Handle the case where the DistGraph has None for the underlying graph
Co-authored-by: default avatarDa Zheng <zhengda1936@gmail.com>
parent 79b057f0
......@@ -94,6 +94,7 @@ def run(proc_id, n_gpus, args, devices, data):
train_nid,
sampler,
use_ddp=n_gpus > 1,
device=dev_id if args.num_workers == 0 else None,
batch_size=args.batch_size,
shuffle=True,
drop_last=False,
......
......@@ -187,6 +187,7 @@ def run(proc_id, n_gpus, n_cpus, args, devices, dataset, split, queue=None):
target_idx[train_idx],
sampler,
use_ddp=n_gpus > 1,
device=dev_id if args.num_workers == 0 else None,
batch_size=args.batch_size,
shuffle=True,
drop_last=False,
......@@ -198,6 +199,7 @@ def run(proc_id, n_gpus, n_cpus, args, devices, dataset, split, queue=None):
target_idx[val_idx],
sampler,
use_ddp=n_gpus > 1,
device=dev_id if args.num_workers == 0 else None,
batch_size=args.batch_size,
shuffle=False,
drop_last=False,
......@@ -210,6 +212,7 @@ def run(proc_id, n_gpus, n_cpus, args, devices, dataset, split, queue=None):
target_idx[test_idx],
test_sampler,
use_ddp=n_gpus > 1,
device=dev_id if args.num_workers == 0 else None,
batch_size=args.eval_batch_size,
shuffle=False,
drop_last=False,
......
......@@ -151,6 +151,11 @@ class BlockSampler(object):
return_eids : bool, default False
Whether to return the edge IDs involved in message passing in the MFG.
If True, the edge IDs will be stored as an edge feature named ``dgl.EID``.
output_ctx : DGLContext, default None
The context the sampled blocks will be stored on. This should only be
a CUDA context if multiprocessing is not used in the dataloader (e.g.,
num_workers is 0). If this is None, the sampled blocks will be stored
on the same device as the input graph.
Notes
-----
......@@ -158,9 +163,10 @@ class BlockSampler(object):
:ref:`User Guide Section 6 <guide-minibatch>` and
:doc:`Minibatch Training Tutorials <tutorials/large/L0_neighbor_sampling_overview>`.
"""
def __init__(self, num_layers, return_eids=False):
def __init__(self, num_layers, return_eids=False, output_ctx=None):
self.num_layers = num_layers
self.return_eids = return_eids
self.set_output_context(output_ctx)
def sample_frontier(self, block_id, g, seed_nodes):
"""Generate the frontier given the destination nodes.
......@@ -221,8 +227,23 @@ class BlockSampler(object):
blocks = []
exclude_eids = (
_tensor_or_dict_to_numpy(exclude_eids) if exclude_eids is not None else None)
if isinstance(g, DistGraph):
# TODO:(nv-dlasalle) dist graphs may not have an associated graph,
# causing an error when trying to fetch the device, so for now,
# always assume the distributed graph's device is CPU.
graph_device = F.cpu()
else:
graph_device = g.device
for block_id in reversed(range(self.num_layers)):
frontier = self.sample_frontier(block_id, g, seed_nodes)
seed_nodes_in = seed_nodes
if isinstance(seed_nodes_in, dict):
seed_nodes_in = {ntype: nodes.to(graph_device) \
for ntype, nodes in seed_nodes_in.items()}
else:
seed_nodes_in = seed_nodes_in.to(graph_device)
frontier = self.sample_frontier(block_id, g, seed_nodes_in)
# Removing edges from the frontier for link prediction training falls
# into the category of frontier postprocessing
......@@ -250,16 +271,44 @@ class BlockSampler(object):
new_eids[k] = F.gather_row(parent_eids[k], frontier.edges[k].data[EID])
frontier.edata[EID] = new_eids
block = transform.to_block(frontier, seed_nodes)
if self.output_device is not None:
frontier = frontier.to(self.output_device)
if isinstance(seed_nodes, dict):
seed_nodes_out = {ntype: nodes.to(self.output_device) \
for ntype, nodes in seed_nodes.items()}
else:
seed_nodes_out = seed_nodes.to(self.output_device)
else:
seed_nodes_out = seed_nodes
block = transform.to_block(frontier, seed_nodes_out)
if self.return_eids:
assign_block_eids(block, frontier)
seed_nodes = {ntype: block.srcnodes[ntype].data[NID] for ntype in block.srctypes}
blocks.insert(0, block)
return blocks
def set_output_context(self, ctx):
"""Set the device the generated block will be output to. This should
only be set to a cuda device, when multi-processing is not used in
the dataloader (e.g., num_workers is 0).
Parameters
----------
output_ctx : DGLContext, default None
The device context the sampled blocks will be stored on. This
should only be a CUDA context if multiprocessing is not used in
the dataloader (e.g., num_workers is 0). If this is None, the
sampled blocks will be stored on the same device as the input
graph.
"""
if ctx is not None:
self.output_device = F.to_backend_ctx(ctx)
else:
self.output_device = None
class Collator(ABC):
"""Abstract DGL collator for training GNNs on downstream tasks stochastically.
......
......@@ -12,6 +12,7 @@ from ...distributed import DistDataLoader
from ...ndarray import NDArray as DGLNDArray
from ... import backend as F
from ...base import DGLError
from ...utils import to_dgl_context
PYTORCH_VER = LooseVersion(th.__version__)
PYTORCH_16 = PYTORCH_VER >= LooseVersion("1.6.0")
......@@ -407,7 +408,7 @@ class NodeDataLoader:
"""
collator_arglist = inspect.getfullargspec(NodeCollator).args
def __init__(self, g, nids, block_sampler, device='cpu', use_ddp=False, ddp_seed=0, **kwargs):
def __init__(self, g, nids, block_sampler, device=None, use_ddp=False, ddp_seed=0, **kwargs):
collator_kwargs = {}
dataloader_kwargs = {}
for k, v in kwargs.items():
......@@ -417,6 +418,9 @@ class NodeDataLoader:
dataloader_kwargs[k] = v
if isinstance(g, DistGraph):
if device is None:
# for the distributed case default to the CPU
device = 'cpu'
assert device == 'cpu', 'Only cpu is supported in the case of a DistGraph.'
# Distributed DataLoader currently does not support heterogeneous graphs
# and does not copy features. Fallback to normal solution
......@@ -427,6 +431,15 @@ class NodeDataLoader:
**dataloader_kwargs)
self.is_distributed = True
else:
if device is None:
# default to the same device the graph is on
device = th.device(g.device)
# if the sampler supports it, tell it to output to the
# specified device
if callable(getattr(block_sampler, "set_output_context", None)):
block_sampler.set_output_context(to_dgl_context(device))
self.collator = _NodeCollator(g, nids, block_sampler, **collator_kwargs)
dataset = self.collator.dataset
use_scalar_batcher = False
......@@ -436,7 +449,7 @@ class NodeDataLoader:
# doens't seem to have a performance benefit on the CPU.
assert 'num_workers' not in dataloader_kwargs or \
dataloader_kwargs['num_workers'] == 0, \
'When performing dataloading from the GPU, num_workers ' \
'When performing dataloading on the GPU, num_workers ' \
'must be zero.'
batch_size = dataloader_kwargs.get('batch_size', 0)
......
......@@ -280,7 +280,7 @@ MapEdges(
const int64_t num_edge_sets = static_cast<int64_t>(edge_sets.size());
for (int64_t etype = 0; etype < num_edge_sets; ++etype) {
const EdgeArray& edges = edge_sets[etype];
if (edges.id.defined()) {
if (edges.id.defined() && edges.src->shape[0] > 0) {
const int64_t num_edges = edges.src->shape[0];
new_lhs.emplace_back(NewIdArray(num_edges, ctx, sizeof(IdType)*8));
......@@ -308,8 +308,10 @@ MapEdges(
node_map.RhsHashTable(dst_type).DeviceHandle());
CUDA_CALL(cudaGetLastError());
} else {
new_lhs.emplace_back(aten::NullArray());
new_rhs.emplace_back(aten::NullArray());
new_lhs.emplace_back(
aten::NullArray(DLDataType{kDLInt, sizeof(IdType)*8, 1}, ctx));
new_rhs.emplace_back(
aten::NullArray(DLDataType{kDLInt, sizeof(IdType)*8, 1}, ctx));
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment