Unverified Commit 701b4fcc authored by Quan (Andy) Gan's avatar Quan (Andy) Gan Committed by GitHub
Browse files

[Sampling] New sampling pipeline plus asynchronous prefetching (#3665)

* initial update

* more

* more

* multi-gpu example

* cluster gcn, finalize homogeneous

* more explanation

* fix

* bunch of fixes

* fix

* RGAT example and more fixes

* shadow-gnn sampler and some changes in unit test

* fix

* wth

* more fixes

* remove shadow+node/edge dataloader tests for possible ux changes

* lints

* add legacy dataloading import just in case

* fix

* update pylint for f-strings

* fix

* lint

* lint

* lint again

* cherry-picking commit fa9f494

* oops

* fix

* add sample_neighbors in dist_graph

* fix

* lint

* fix

* fix

* fix

* fix tutorial

* fix

* fix

* fix

* fix warning

* remove debug

* add get_foo_storage apis

* lint
parent 5152a879
...@@ -101,4 +101,4 @@ class AsyncTransferer(object): ...@@ -101,4 +101,4 @@ class AsyncTransferer(object):
return Transfer(transfer_id, self._handle) return Transfer(transfer_id, self._handle)
_init_api("dgl.dataloading.async_transferer") _init_api("dataloading.async_transferer", "dgl._dataloading.async_transferer")
"""Cluster-GCN subgraph iterators."""
import os
import pickle
import numpy as np
from ..transform import metis_partition_assignment
from .. import backend as F
from .dataloader import SubgraphIterator
class ClusterGCNSubgraphIterator(SubgraphIterator):
"""Subgraph sampler following that of ClusterGCN.
This sampler first partitions the graph with METIS partitioning, then it caches the nodes of
each partition to a file within the given cache directory.
This is used in conjunction with :class:`dgl.dataloading.pytorch.GraphDataLoader`.
Notes
-----
The graph must be homogeneous and on CPU.
Parameters
----------
g : DGLGraph
The original graph.
num_partitions : int
The number of partitions.
cache_directory : str
The path to the cache directory for storing the partition result.
refresh : bool
If True, recompute the partition.
Examples
--------
Assuming that you have a graph ``g``:
>>> sgiter = dgl.dataloading.ClusterGCNSubgraphIterator(
... g, num_partitions=100, cache_directory='.', refresh=True)
>>> dataloader = dgl.dataloading.GraphDataLoader(sgiter, batch_size=4, num_workers=0)
>>> for subgraph_batch in dataloader:
... train_on(subgraph_batch)
"""
def __init__(self, g, num_partitions, cache_directory, refresh=False):
if os.name == 'nt':
raise NotImplementedError("METIS partitioning is not supported on Windows yet.")
super().__init__(g)
# First see if the cache is already there. If so, directly read from cache.
if not refresh and self._load_parts(cache_directory):
return
# Otherwise, build the cache.
assignment = F.asnumpy(metis_partition_assignment(g, num_partitions))
self._save_parts(assignment, cache_directory)
def _cache_file_path(self, cache_directory):
return os.path.join(cache_directory, 'cluster_gcn_cache')
def _load_parts(self, cache_directory):
path = self._cache_file_path(cache_directory)
if not os.path.exists(path):
return False
with open(path, 'rb') as file_:
self.part_indptr, self.part_indices = pickle.load(file_)
return True
def _save_parts(self, assignment, cache_directory):
os.makedirs(cache_directory, exist_ok=True)
self.part_indices = np.argsort(assignment)
num_nodes_per_part = np.bincount(assignment)
self.part_indptr = np.insert(np.cumsum(num_nodes_per_part), 0, 0)
with open(self._cache_file_path(cache_directory), 'wb') as file_:
pickle.dump((self.part_indptr, self.part_indices), file_)
def __len__(self):
return self.part_indptr.shape[0] - 1
def __getitem__(self, i):
nodes = self.part_indices[self.part_indptr[i]:self.part_indptr[i+1]]
return self.g.subgraph(nodes)
"""Data loaders"""
from collections.abc import Mapping, Sequence
from abc import ABC, abstractproperty, abstractmethod
import re
import numpy as np
from .. import transform
from ..base import NID, EID
from .. import backend as F
from .. import utils
from ..batch import batch
from ..convert import heterograph
from ..heterograph import DGLHeteroGraph as DGLGraph
from ..distributed.dist_graph import DistGraph
from ..utils import to_device
def _tensor_or_dict_to_numpy(ids):
if isinstance(ids, Mapping):
return {k: F.zerocopy_to_numpy(v) for k, v in ids.items()}
else:
return F.zerocopy_to_numpy(ids)
def _locate_eids_to_exclude(frontier_parent_eids, exclude_eids):
"""Find the edges whose IDs in parent graph appeared in exclude_eids.
Note that both arguments are numpy arrays or numpy dicts.
"""
if isinstance(frontier_parent_eids, Mapping):
result = {
k: np.isin(frontier_parent_eids[k], exclude_eids[k]).nonzero()[0]
for k in frontier_parent_eids.keys() if k in exclude_eids.keys()}
return {k: F.zerocopy_from_numpy(v) for k, v in result.items()}
else:
result = np.isin(frontier_parent_eids, exclude_eids).nonzero()[0]
return F.zerocopy_from_numpy(result)
class _EidExcluder():
def __init__(self, exclude_eids):
device = None
if isinstance(exclude_eids, Mapping):
for _, v in exclude_eids.items():
if device is None:
device = F.context(v)
break
else:
device = F.context(exclude_eids)
self._exclude_eids = None
self._filter = None
if device == F.cpu():
# TODO(nv-dlasalle): Once Filter is implemented for the CPU, we
# should just use that irregardless of the device.
self._exclude_eids = (
_tensor_or_dict_to_numpy(exclude_eids) if exclude_eids is not None else None)
else:
if isinstance(exclude_eids, Mapping):
self._filter = {k: utils.Filter(v) for k, v in exclude_eids.items()}
else:
self._filter = utils.Filter(exclude_eids)
def _find_indices(self, parent_eids):
""" Find the set of edge indices to remove.
"""
if self._exclude_eids is not None:
parent_eids_np = _tensor_or_dict_to_numpy(parent_eids)
return _locate_eids_to_exclude(parent_eids_np, self._exclude_eids)
else:
assert self._filter is not None
if isinstance(parent_eids, Mapping):
located_eids = {k: self._filter[k].find_included_indices(parent_eids[k])
for k, v in parent_eids.items() if k in self._filter}
else:
located_eids = self._filter.find_included_indices(parent_eids)
return located_eids
def __call__(self, frontier):
parent_eids = frontier.edata[EID]
located_eids = self._find_indices(parent_eids)
if not isinstance(located_eids, Mapping):
# (BarclayII) If frontier already has a EID field and located_eids is empty,
# the returned graph will keep EID intact. Otherwise, EID will change
# to the mapping from the new graph to the old frontier.
# So we need to test if located_eids is empty, and do the remapping ourselves.
if len(located_eids) > 0:
frontier = transform.remove_edges(
frontier, located_eids, store_ids=True)
frontier.edata[EID] = F.gather_row(parent_eids, frontier.edata[EID])
else:
# (BarclayII) remove_edges only accepts removing one type of edges,
# so I need to keep track of the edge IDs left one by one.
new_eids = parent_eids.copy()
for k, v in located_eids.items():
if len(v) > 0:
frontier = transform.remove_edges(
frontier, v, etype=k, store_ids=True)
new_eids[k] = F.gather_row(parent_eids[k], frontier.edges[k].data[EID])
frontier.edata[EID] = new_eids
return frontier
def exclude_edges(subg, exclude_eids, device):
"""Find and remove from the subgraph the edges whose IDs in the parent
graph are given.
Parameters
----------
subg : DGLGraph
The subgraph. Must have ``dgl.EID`` field containing the original
edge IDs in the parent graph.
exclude_eids : Tensor or dict
The edge IDs to exclude.
device : device
The output device of the graph.
Returns
-------
DGLGraph
The new subgraph with edges removed. The ``dgl.EID`` field contains
the original edge IDs in the same parent graph.
"""
if exclude_eids is None:
return subg
if device is not None:
if isinstance(exclude_eids, Mapping):
exclude_eids = {k: F.copy_to(v, device) \
for k, v in exclude_eids.items()}
else:
exclude_eids = F.copy_to(exclude_eids, device)
excluder = _EidExcluder(exclude_eids)
return subg if excluder is None else excluder(subg)
def _find_exclude_eids_with_reverse_id(g, eids, reverse_eid_map):
if isinstance(eids, Mapping):
eids = {g.to_canonical_etype(k): v for k, v in eids.items()}
exclude_eids = {
k: F.cat([v, F.gather_row(reverse_eid_map[k], v)], 0)
for k, v in eids.items()}
else:
exclude_eids = F.cat([eids, F.gather_row(reverse_eid_map, eids)], 0)
return exclude_eids
def _find_exclude_eids_with_reverse_types(g, eids, reverse_etype_map):
exclude_eids = {g.to_canonical_etype(k): v for k, v in eids.items()}
reverse_etype_map = {
g.to_canonical_etype(k): g.to_canonical_etype(v)
for k, v in reverse_etype_map.items()}
exclude_eids.update({reverse_etype_map[k]: v for k, v in exclude_eids.items()})
return exclude_eids
def _find_exclude_eids(g, exclude_mode, eids, **kwargs):
"""Find all edge IDs to exclude according to :attr:`exclude_mode`.
Parameters
----------
g : DGLGraph
The graph.
exclude_mode : str, optional
Can be either of the following,
None (default)
Does not exclude any edge.
'self'
Exclude the given edges themselves but nothing else.
'reverse_id'
Exclude all edges specified in ``eids``, as well as their reverse edges
of the same edge type.
The mapping from each edge ID to its reverse edge ID is specified in
the keyword argument ``reverse_eid_map``.
This mode assumes that the reverse of an edge with ID ``e`` and type
``etype`` will have ID ``reverse_eid_map[e]`` and type ``etype``.
'reverse_types'
Exclude all edges specified in ``eids``, as well as their reverse
edges of the corresponding edge types.
The mapping from each edge type to its reverse edge type is specified
in the keyword argument ``reverse_etype_map``.
This mode assumes that the reverse of an edge with ID ``e`` and type ``etype``
will have ID ``e`` and type ``reverse_etype_map[etype]``.
eids : Tensor or dict[etype, Tensor]
The edge IDs.
reverse_eid_map : Tensor or dict[etype, Tensor]
The mapping from edge ID to its reverse edge ID.
reverse_etype_map : dict[etype, etype]
The mapping from edge etype to its reverse edge type.
"""
if exclude_mode is None:
return None
elif exclude_mode == 'self':
return eids
elif exclude_mode == 'reverse_id':
return _find_exclude_eids_with_reverse_id(g, eids, kwargs['reverse_eid_map'])
elif exclude_mode == 'reverse_types':
return _find_exclude_eids_with_reverse_types(g, eids, kwargs['reverse_etype_map'])
else:
raise ValueError('unsupported mode {}'.format(exclude_mode))
class Sampler(object):
"""An abstract class that takes in a graph and a set of seed nodes and returns a
structure representing a smaller portion of the graph for computation. It can
be either a list of bipartite graphs (i.e. :class:`BlockSampler`), or a single
subgraph.
"""
def __init__(self, output_ctx=None):
self.set_output_context(output_ctx)
def sample(self, g, seed_nodes, exclude_eids=None):
"""Sample a structure from the graph.
Parameters
----------
g : DGLGraph
The original graph.
seed_nodes : Tensor or dict[ntype, Tensor]
The destination nodes by type.
If the graph only has one node type, one can just specify a single tensor
of node IDs.
exclude_eids : Tensor or dict[etype, Tensor]
The edges to exclude from computation dependency.
Returns
-------
Tensor or dict[ntype, Tensor]
The nodes whose input features are required for computing the output
representation of :attr:`seed_nodes`.
any
Any data representing the structure.
"""
raise NotImplementedError
def set_output_context(self, ctx):
"""Set the device the generated block or subgraph will be output to.
This should only be set to a cuda device, when multi-processing is not
used in the dataloader (e.g., num_workers is 0).
Parameters
----------
ctx : DGLContext, default None
The device context the sampled blocks will be stored on. This
should only be a CUDA context if multiprocessing is not used in
the dataloader (e.g., num_workers is 0). If this is None, the
sampled blocks will be stored on the same device as the input
graph.
"""
if ctx is not None:
self.output_device = F.to_backend_ctx(ctx)
else:
self.output_device = None
class BlockSampler(Sampler):
"""Abstract class specifying the neighborhood sampling strategy for DGL data loaders.
The main method for BlockSampler is :meth:`sample`,
which generates a list of message flow graphs (MFGs) for a multi-layer GNN given a set of
seed nodes to have their outputs computed.
The default implementation of :meth:`sample` is
to repeat :attr:`num_layers` times the following procedure from the last layer to the first
layer:
* Obtain a frontier. The frontier is defined as a graph with the same nodes as the
original graph but only the edges involved in message passing on the current layer.
Customizable via :meth:`sample_frontier`.
* Optionally, if the task is link prediction or edge classfication, remove edges
connecting training node pairs. If the graph is undirected, also remove the
reverse edges. This is controlled by the argument :attr:`exclude_eids` in
:meth:`sample` method.
* Convert the frontier into a MFG.
* Optionally assign the IDs of the edges in the original graph selected in the first step
to the MFG, controlled by the argument ``return_eids`` in
:meth:`sample` method.
* Prepend the MFG to the MFG list to be returned.
All subclasses should override :meth:`sample_frontier`
method while specifying the number of layers to sample in :attr:`num_layers` argument.
Parameters
----------
num_layers : int
The number of layers to sample.
return_eids : bool, default False
Whether to return the edge IDs involved in message passing in the MFG.
If True, the edge IDs will be stored as an edge feature named ``dgl.EID``.
output_ctx : DGLContext, default None
The context the sampled blocks will be stored on. This should only be
a CUDA context if multiprocessing is not used in the dataloader (e.g.,
num_workers is 0). If this is None, the sampled blocks will be stored
on the same device as the input graph.
exclude_edges_in_frontier : bool, default False
If True, the :func:`sample_frontier` method will receive an argument
:attr:`exclude_eids` containing the edge IDs from the original graph to exclude.
The :func:`sample_frontier` method must return a graph that does not contain
the edges corresponding to the excluded edges. No additional postprocessing
will be done.
Otherwise, the edges will be removed *after* :func:`sample_frontier` returns.
Notes
-----
For the concept of frontiers and MFGs, please refer to
:ref:`User Guide Section 6 <guide-minibatch>` and
:doc:`Minibatch Training Tutorials <tutorials/large/L0_neighbor_sampling_overview>`.
"""
def __init__(self, num_layers, return_eids=False, output_ctx=None):
super().__init__(output_ctx)
self.num_layers = num_layers
self.return_eids = return_eids
# pylint: disable=unused-argument
@staticmethod
def assign_block_eids(block, frontier):
"""Assigns edge IDs from the original graph to the message flow graph (MFG).
See also
--------
BlockSampler
"""
for etype in block.canonical_etypes:
block.edges[etype].data[EID] = frontier.edges[etype].data[EID][
block.edges[etype].data[EID]]
return block
# This is really a hack working around the lack of GPU-based neighbor sampling
# with edge exclusion.
@classmethod
def exclude_edges_in_frontier(cls, g):
"""Returns whether the sampler will exclude edges in :func:`sample_frontier`.
If this method returns True, the method :func:`sample_frontier` will receive an
argument :attr:`exclude_eids` from :func:`sample`. :func:`sample_frontier`
is then responsible for removing those edges.
If this method returns False, :func:`sample` will be responsible for
removing the edges.
When subclassing :class:`BlockSampler`, this method should return True when you
would like to remove the excluded edges in your :func:`sample_frontier` method.
By default this method returns False.
Parameters
----------
g : DGLGraph
The original graph
Returns
-------
bool
Whether :func:`sample_frontier` will receive an argument :attr:`exclude_eids`.
"""
return False
def sample_frontier(self, block_id, g, seed_nodes, exclude_eids=None):
"""Generate the frontier given the destination nodes.
The subclasses should override this function.
Parameters
----------
block_id : int
Represents which GNN layer the frontier is generated for.
g : DGLGraph
The original graph.
seed_nodes : Tensor or dict[ntype, Tensor]
The destination nodes by node type.
If the graph only has one node type, one can just specify a single tensor
of node IDs.
exclude_eids: Tensor or dict
Edge IDs to exclude during sampling neighbors for the seed nodes.
This argument can take a single ID tensor or a dictionary of edge types and ID tensors.
If a single tensor is given, the graph must only have one type of nodes.
Returns
-------
DGLGraph
The frontier generated for the current layer.
Notes
-----
For the concept of frontiers and MFGs, please refer to
:ref:`User Guide Section 6 <guide-minibatch>` and
:doc:`Minibatch Training Tutorials <tutorials/large/L0_neighbor_sampling_overview>`.
"""
raise NotImplementedError
def sample(self, g, seed_nodes, exclude_eids=None):
"""Generate the a list of MFGs given the destination nodes.
Parameters
----------
g : DGLGraph
The original graph.
seed_nodes : Tensor or dict[ntype, Tensor]
The destination nodes by node type.
If the graph only has one node type, one can just specify a single tensor
of node IDs.
exclude_eids : Tensor or dict[etype, Tensor]
The edges to exclude from computation dependency.
Returns
-------
list[DGLGraph]
The MFGs generated for computing the multi-layer GNN output.
Notes
-----
For the concept of frontiers and MFGs, please refer to
:ref:`User Guide Section 6 <guide-minibatch>` and
:doc:`Minibatch Training Tutorials <tutorials/large/L0_neighbor_sampling_overview>`.
"""
blocks = []
if isinstance(g, DistGraph):
# TODO:(nv-dlasalle) dist graphs may not have an associated graph,
# causing an error when trying to fetch the device, so for now,
# always assume the distributed graph's device is CPU.
graph_device = F.cpu()
else:
graph_device = g.device
for block_id in reversed(range(self.num_layers)):
seed_nodes_in = to_device(seed_nodes, graph_device)
if self.exclude_edges_in_frontier(g):
frontier = self.sample_frontier(
block_id, g, seed_nodes_in, exclude_eids=exclude_eids)
else:
frontier = self.sample_frontier(block_id, g, seed_nodes_in)
if self.output_device is not None:
frontier = frontier.to(self.output_device)
seed_nodes_out = to_device(seed_nodes, self.output_device)
else:
seed_nodes_out = seed_nodes
# Removing edges from the frontier for link prediction training falls
# into the category of frontier postprocessing
if not self.exclude_edges_in_frontier(g):
frontier = exclude_edges(frontier, exclude_eids, self.output_device)
block = transform.to_block(frontier, seed_nodes_out)
if self.return_eids:
self.assign_block_eids(block, frontier)
seed_nodes = {ntype: block.srcnodes[ntype].data[NID] for ntype in block.srctypes}
blocks.insert(0, block)
return blocks[0].srcdata[NID], blocks[-1].dstdata[NID], blocks
def sample_blocks(self, g, seed_nodes, exclude_eids=None):
"""Deprecated and identical to :meth:`sample`.
"""
return self.sample(g, seed_nodes, exclude_eids)
class Collator(ABC):
"""Abstract DGL collator for training GNNs on downstream tasks stochastically.
Provides a :attr:`dataset` object containing the collection of all nodes or edges,
as well as a :attr:`collate` method that combines a set of items from
:attr:`dataset` and obtains the message flow graphs (MFGs).
Notes
-----
For the concept of MFGs, please refer to
:ref:`User Guide Section 6 <guide-minibatch>` and
:doc:`Minibatch Training Tutorials <tutorials/large/L0_neighbor_sampling_overview>`.
"""
@abstractproperty
def dataset(self):
"""Returns the dataset object of the collator."""
raise NotImplementedError
@abstractmethod
def collate(self, items):
"""Combines the items from the dataset object and obtains the list of MFGs.
Parameters
----------
items : list[str, int]
The list of node or edge IDs or type-ID pairs.
Notes
-----
For the concept of MFGs, please refer to
:ref:`User Guide Section 6 <guide-minibatch>` and
:doc:`Minibatch Training Tutorials <tutorials/large/L0_neighbor_sampling_overview>`.
"""
raise NotImplementedError
class NodeCollator(Collator):
"""DGL collator to combine nodes and their computation dependencies within a minibatch for
training node classification or regression on a single graph with neighborhood sampling.
Parameters
----------
g : DGLGraph
The graph.
nids : Tensor or dict[ntype, Tensor]
The node set to compute outputs.
graph_sampler : dgl.dataloading.BlockSampler
The neighborhood sampler.
Examples
--------
To train a 3-layer GNN for node classification on a set of nodes ``train_nid`` on
a homogeneous graph where each node takes messages from all neighbors (assume
the backend is PyTorch):
>>> sampler = dgl.dataloading.MultiLayerNeighborSampler([15, 10, 5])
>>> collator = dgl.dataloading.NodeCollator(g, train_nid, sampler)
>>> dataloader = torch.utils.data.DataLoader(
... collator.dataset, collate_fn=collator.collate,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, output_nodes, blocks in dataloader:
... train_on(input_nodes, output_nodes, blocks)
Notes
-----
For the concept of MFGs, please refer to
:ref:`User Guide Section 6 <guide-minibatch>` and
:doc:`Minibatch Training Tutorials <tutorials/large/L0_neighbor_sampling_overview>`.
"""
def __init__(self, g, nids, graph_sampler):
self.g = g
if not isinstance(nids, Mapping):
assert len(g.ntypes) == 1, \
"nids should be a dict of node type and ids for graph with multiple node types"
self.graph_sampler = graph_sampler
self.nids = utils.prepare_tensor_or_dict(g, nids, 'nids')
self._dataset = utils.maybe_flatten_dict(self.nids)
@property
def dataset(self):
return self._dataset
def collate(self, items):
"""Find the list of MFGs necessary for computing the representation of given
nodes for a node classification/regression task.
Parameters
----------
items : list[int] or list[tuple[str, int]]
Either a list of node IDs (for homogeneous graphs), or a list of node type-ID
pairs (for heterogeneous graphs).
Returns
-------
input_nodes : Tensor or dict[ntype, Tensor]
The input nodes necessary for computation in this minibatch.
If the original graph has multiple node types, return a dictionary of
node type names and node ID tensors. Otherwise, return a single tensor.
output_nodes : Tensor or dict[ntype, Tensor]
The nodes whose representations are to be computed in this minibatch.
If the original graph has multiple node types, return a dictionary of
node type names and node ID tensors. Otherwise, return a single tensor.
MFGs : list[DGLGraph]
The list of MFGs necessary for computing the representation.
"""
if isinstance(items[0], tuple):
# returns a list of pairs: group them by node types into a dict
items = utils.group_as_dict(items)
items = utils.prepare_tensor_or_dict(self.g, items, 'items')
input_nodes, output_nodes, blocks = self.graph_sampler.sample_blocks(self.g, items)
return input_nodes, output_nodes, blocks
class EdgeCollator(Collator):
"""DGL collator to combine edges and their computation dependencies within a minibatch for
training edge classification, edge regression, or link prediction on a single graph
with neighborhood sampling.
Given a set of edges, the collate function will yield
* A tensor of input nodes necessary for computing the representation on edges, or
a dictionary of node type names and such tensors.
* A subgraph that contains only the edges in the minibatch and their incident nodes.
Note that the graph has an identical metagraph with the original graph.
* If a negative sampler is given, another graph that contains the "negative edges",
connecting the source and destination nodes yielded from the given negative sampler.
* A list of MFGs necessary for computing the representation of the incident nodes
of the edges in the minibatch.
Parameters
----------
g : DGLGraph
The graph from which the edges are iterated in minibatches and the subgraphs
are generated.
eids : Tensor or dict[etype, Tensor]
The edge set in graph :attr:`g` to compute outputs.
graph_sampler : dgl.dataloading.BlockSampler
The neighborhood sampler.
g_sampling : DGLGraph, optional
The graph where neighborhood sampling and message passing is performed.
Note that this is not necessarily the same as :attr:`g`.
If None, assume to be the same as :attr:`g`.
exclude : str, optional
Whether and how to exclude dependencies related to the sampled edges in the
minibatch. Possible values are
* None, which excludes nothing.
* ``'self'``, which excludes the sampled edges themselves but nothing else.
* ``'reverse_id'``, which excludes the reverse edges of the sampled edges. The said
reverse edges have the same edge type as the sampled edges. Only works
on edge types whose source node type is the same as its destination node type.
* ``'reverse_types'``, which excludes the reverse edges of the sampled edges. The
said reverse edges have different edge types from the sampled edges.
If ``g_sampling`` is given, ``exclude`` is ignored and will be always ``None``.
reverse_eids : Tensor or dict[etype, Tensor], optional
A tensor of reverse edge ID mapping. The i-th element indicates the ID of
the i-th edge's reverse edge.
If the graph is heterogeneous, this argument requires a dictionary of edge
types and the reverse edge ID mapping tensors.
Required and only used when ``exclude`` is set to ``reverse_id``.
For heterogeneous graph this will be a dict of edge type and edge IDs. Note that
only the edge types whose source node type is the same as destination node type
are needed.
reverse_etypes : dict[etype, etype], optional
The mapping from the edge type to its reverse edge type.
Required and only used when ``exclude`` is set to ``reverse_types``.
negative_sampler : callable, optional
The negative sampler. Can be omitted if no negative sampling is needed.
The negative sampler must be a callable that takes in the following arguments:
* The original (heterogeneous) graph.
* The ID array of sampled edges in the minibatch, or the dictionary of edge
types and ID array of sampled edges in the minibatch if the graph is
heterogeneous.
It should return
* A pair of source and destination node ID arrays as negative samples,
or a dictionary of edge types and such pairs if the graph is heterogenenous.
A set of builtin negative samplers are provided in
:ref:`the negative sampling module <api-dataloading-negative-sampling>`.
Examples
--------
The following example shows how to train a 3-layer GNN for edge classification on a
set of edges ``train_eid`` on a homogeneous undirected graph. Each node takes
messages from all neighbors.
Say that you have an array of source node IDs ``src`` and another array of destination
node IDs ``dst``. One can make it bidirectional by adding another set of edges
that connects from ``dst`` to ``src``:
>>> g = dgl.graph((torch.cat([src, dst]), torch.cat([dst, src])))
One can then know that the ID difference of an edge and its reverse edge is ``|E|``,
where ``|E|`` is the length of your source/destination array. The reverse edge
mapping can be obtained by
>>> E = len(src)
>>> reverse_eids = torch.cat([torch.arange(E, 2 * E), torch.arange(0, E)])
Note that the sampled edges as well as their reverse edges are removed from
computation dependencies of the incident nodes. This is a common trick to avoid
information leakage.
>>> sampler = dgl.dataloading.MultiLayerNeighborSampler([15, 10, 5])
>>> collator = dgl.dataloading.EdgeCollator(
... g, train_eid, sampler, exclude='reverse_id',
... reverse_eids=reverse_eids)
>>> dataloader = torch.utils.data.DataLoader(
... collator.dataset, collate_fn=collator.collate,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, pair_graph, blocks in dataloader:
... train_on(input_nodes, pair_graph, blocks)
To train a 3-layer GNN for link prediction on a set of edges ``train_eid`` on a
homogeneous graph where each node takes messages from all neighbors (assume the
backend is PyTorch), with 5 uniformly chosen negative samples per edge:
>>> sampler = dgl.dataloading.MultiLayerNeighborSampler([15, 10, 5])
>>> neg_sampler = dgl.dataloading.negative_sampler.Uniform(5)
>>> collator = dgl.dataloading.EdgeCollator(
... g, train_eid, sampler, exclude='reverse_id',
... reverse_eids=reverse_eids, negative_sampler=neg_sampler)
>>> dataloader = torch.utils.data.DataLoader(
... collator.dataset, collate_fn=collator.collate,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, pos_pair_graph, neg_pair_graph, blocks in dataloader:
... train_on(input_nodse, pair_graph, neg_pair_graph, blocks)
For heterogeneous graphs, the reverse of an edge may have a different edge type
from the original edge. For instance, consider that you have an array of
user-item clicks, representated by a user array ``user`` and an item array ``item``.
You may want to build a heterogeneous graph with a user-click-item relation and an
item-clicked-by-user relation.
>>> g = dgl.heterograph({
... ('user', 'click', 'item'): (user, item),
... ('item', 'clicked-by', 'user'): (item, user)})
To train a 3-layer GNN for edge classification on a set of edges ``train_eid`` with
type ``click``, you can write
>>> sampler = dgl.dataloading.MultiLayerNeighborSampler([15, 10, 5])
>>> collator = dgl.dataloading.EdgeCollator(
... g, {'click': train_eid}, sampler, exclude='reverse_types',
... reverse_etypes={'click': 'clicked-by', 'clicked-by': 'click'})
>>> dataloader = torch.utils.data.DataLoader(
... collator.dataset, collate_fn=collator.collate,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, pair_graph, blocks in dataloader:
... train_on(input_nodes, pair_graph, blocks)
To train a 3-layer GNN for link prediction on a set of edges ``train_eid`` with type
``click``, you can write
>>> sampler = dgl.dataloading.MultiLayerNeighborSampler([15, 10, 5])
>>> neg_sampler = dgl.dataloading.negative_sampler.Uniform(5)
>>> collator = dgl.dataloading.EdgeCollator(
... g, train_eid, sampler, exclude='reverse_types',
... reverse_etypes={'click': 'clicked-by', 'clicked-by': 'click'},
... negative_sampler=neg_sampler)
>>> dataloader = torch.utils.data.DataLoader(
... collator.dataset, collate_fn=collator.collate,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, pos_pair_graph, neg_pair_graph, blocks in dataloader:
... train_on(input_nodes, pair_graph, neg_pair_graph, blocks)
Notes
-----
For the concept of MFGs, please refer to
:ref:`User Guide Section 6 <guide-minibatch>` and
:doc:`Minibatch Training Tutorials <tutorials/large/L0_neighbor_sampling_overview>`.
"""
def __init__(self, g, eids, graph_sampler, g_sampling=None, exclude=None,
reverse_eids=None, reverse_etypes=None, negative_sampler=None):
self.g = g
if not isinstance(eids, Mapping):
assert len(g.etypes) == 1, \
"eids should be a dict of etype and ids for graph with multiple etypes"
self.graph_sampler = graph_sampler
# One may wish to iterate over the edges in one graph while perform sampling in
# another graph. This may be the case for iterating over validation and test
# edge set while perform neighborhood sampling on the graph formed by only
# the training edge set.
# See GCMC for an example usage.
if g_sampling is not None:
self.g_sampling = g_sampling
self.exclude = None
else:
self.g_sampling = self.g
self.exclude = exclude
self.reverse_eids = reverse_eids
self.reverse_etypes = reverse_etypes
self.negative_sampler = negative_sampler
self.eids = utils.prepare_tensor_or_dict(g, eids, 'eids')
self._dataset = utils.maybe_flatten_dict(self.eids)
@property
def dataset(self):
return self._dataset
def _collate(self, items):
if isinstance(items[0], tuple):
# returns a list of pairs: group them by node types into a dict
items = utils.group_as_dict(items)
items = utils.prepare_tensor_or_dict(self.g_sampling, items, 'items')
pair_graph = self.g.edge_subgraph(items)
seed_nodes = pair_graph.ndata[NID]
exclude_eids = _find_exclude_eids(
self.g_sampling,
self.exclude,
items,
reverse_eid_map=self.reverse_eids,
reverse_etype_map=self.reverse_etypes)
input_nodes, _, blocks = self.graph_sampler.sample_blocks(
self.g_sampling, seed_nodes, exclude_eids=exclude_eids)
return input_nodes, pair_graph, blocks
def _collate_with_negative_sampling(self, items):
if isinstance(items[0], tuple):
# returns a list of pairs: group them by node types into a dict
items = utils.group_as_dict(items)
items = utils.prepare_tensor_or_dict(self.g_sampling, items, 'items')
pair_graph = self.g.edge_subgraph(items, relabel_nodes=False)
induced_edges = pair_graph.edata[EID]
neg_srcdst = self.negative_sampler(self.g, items)
if not isinstance(neg_srcdst, Mapping):
assert len(self.g.etypes) == 1, \
'graph has multiple or no edge types; '\
'please return a dict in negative sampler.'
neg_srcdst = {self.g.canonical_etypes[0]: neg_srcdst}
# Get dtype from a tuple of tensors
dtype = F.dtype(list(neg_srcdst.values())[0][0])
ctx = F.context(pair_graph)
neg_edges = {
etype: neg_srcdst.get(etype, (F.copy_to(F.tensor([], dtype), ctx),
F.copy_to(F.tensor([], dtype), ctx)))
for etype in self.g.canonical_etypes}
neg_pair_graph = heterograph(
neg_edges, {ntype: self.g.number_of_nodes(ntype) for ntype in self.g.ntypes})
pair_graph, neg_pair_graph = transform.compact_graphs([pair_graph, neg_pair_graph])
pair_graph.edata[EID] = induced_edges
seed_nodes = pair_graph.ndata[NID]
exclude_eids = _find_exclude_eids(
self.g_sampling,
self.exclude,
items,
reverse_eid_map=self.reverse_eids,
reverse_etype_map=self.reverse_etypes)
input_nodes, _, blocks = self.graph_sampler.sample_blocks(
self.g_sampling, seed_nodes, exclude_eids=exclude_eids)
return input_nodes, pair_graph, neg_pair_graph, blocks
def collate(self, items):
"""Combines the sampled edges into a minibatch for edge classification, edge
regression, and link prediction tasks.
Parameters
----------
items : list[int] or list[tuple[str, int]]
Either a list of edge IDs (for homogeneous graphs), or a list of edge type-ID
pairs (for heterogeneous graphs).
Returns
-------
Either ``(input_nodes, pair_graph, blocks)``, or
``(input_nodes, pair_graph, negative_pair_graph, blocks)`` if negative sampling is
enabled.
input_nodes : Tensor or dict[ntype, Tensor]
The input nodes necessary for computation in this minibatch.
If the original graph has multiple node types, return a dictionary of
node type names and node ID tensors. Otherwise, return a single tensor.
pair_graph : DGLGraph
The graph that contains only the edges in the minibatch as well as their incident
nodes.
Note that the metagraph of this graph will be identical to that of the original
graph.
negative_pair_graph : DGLGraph
The graph that contains only the edges connecting the source and destination nodes
yielded from the given negative sampler, if negative sampling is enabled.
Note that the metagraph of this graph will be identical to that of the original
graph.
blocks : list[DGLGraph]
The list of MFGs necessary for computing the representation of the edges.
"""
if self.negative_sampler is None:
return self._collate(items)
else:
return self._collate_with_negative_sampling(items)
class GraphCollator(object):
"""Given a set of graphs as well as their graph-level data, the collate function will batch the
graphs into a batched graph, and stack the tensors into a single bigger tensor. If the
example is a container (such as sequences or mapping), the collate function preserves
the structure and collates each of the elements recursively.
If the set of graphs has no graph-level data, the collate function will yield a batched graph.
Examples
--------
To train a GNN for graph classification on a set of graphs in ``dataset`` (assume
the backend is PyTorch):
>>> dataloader = dgl.dataloading.GraphDataLoader(
... dataset, batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for batched_graph, labels in dataloader:
... train_on(batched_graph, labels)
"""
def __init__(self):
self.graph_collate_err_msg_format = (
"graph_collate: batch must contain DGLGraph, tensors, numpy arrays, "
"numbers, dicts or lists; found {}")
self.np_str_obj_array_pattern = re.compile(r'[SaUO]')
#This implementation is based on torch.utils.data._utils.collate.default_collate
def collate(self, items):
"""This function is similar to ``torch.utils.data._utils.collate.default_collate``.
It combines the sampled graphs and corresponding graph-level data
into a batched graph and tensors.
Parameters
----------
items : list of data points or tuples
Elements in the list are expected to have the same length.
Each sub-element will be batched as a batched graph, or a
batched tensor correspondingly.
Returns
-------
A tuple of the batching results.
"""
elem = items[0]
elem_type = type(elem)
if isinstance(elem, DGLGraph):
batched_graphs = batch(items)
return batched_graphs
elif F.is_tensor(elem):
return F.stack(items, 0)
elif elem_type.__module__ == 'numpy' and elem_type.__name__ != 'str_' \
and elem_type.__name__ != 'string_':
if elem_type.__name__ == 'ndarray' or elem_type.__name__ == 'memmap':
# array of string classes and object
if self.np_str_obj_array_pattern.search(elem.dtype.str) is not None:
raise TypeError(self.graph_collate_err_msg_format.format(elem.dtype))
return self.collate([F.tensor(b) for b in items])
elif elem.shape == (): # scalars
return F.tensor(items)
elif isinstance(elem, float):
return F.tensor(items, dtype=F.float64)
elif isinstance(elem, int):
return F.tensor(items)
elif isinstance(elem, (str, bytes)):
return items
elif isinstance(elem, Mapping):
return {key: self.collate([d[key] for d in items]) for key in elem}
elif isinstance(elem, tuple) and hasattr(elem, '_fields'): # namedtuple
return elem_type(*(self.collate(samples) for samples in zip(*items)))
elif isinstance(elem, Sequence):
# check to make sure that the elements in batch have consistent size
item_iter = iter(items)
elem_size = len(next(item_iter))
if not all(len(elem) == elem_size for elem in item_iter):
raise RuntimeError('each element in list of batch should be of equal size')
transposed = zip(*items)
return [self.collate(samples) for samples in transposed]
raise TypeError(self.graph_collate_err_msg_format.format(elem_type))
class SubgraphIterator(object):
"""Abstract class representing an iterator that yields a subgraph given a graph.
"""
def __init__(self, g):
self.g = g
"""Negative samplers"""
from collections.abc import Mapping
from .. import backend as F
from ..sampling import global_uniform_negative_sampling
class _BaseNegativeSampler(object):
def _generate(self, g, eids, canonical_etype):
raise NotImplementedError
def __call__(self, g, eids):
"""Returns negative samples.
Parameters
----------
g : DGLGraph
The graph.
eids : Tensor or dict[etype, Tensor]
The sampled edges in the minibatch.
Returns
-------
tuple[Tensor, Tensor] or dict[etype, tuple[Tensor, Tensor]]
The returned source-destination pairs as negative samples.
"""
if isinstance(eids, Mapping):
eids = {g.to_canonical_etype(k): v for k, v in eids.items()}
neg_pair = {k: self._generate(g, v, k) for k, v in eids.items()}
else:
assert len(g.etypes) == 1, \
'please specify a dict of etypes and ids for graphs with multiple edge types'
neg_pair = self._generate(g, eids, g.canonical_etypes[0])
return neg_pair
class PerSourceUniform(_BaseNegativeSampler):
"""Negative sampler that randomly chooses negative destination nodes
for each source node according to a uniform distribution.
For each edge ``(u, v)`` of type ``(srctype, etype, dsttype)``, DGL generates
:attr:`k` pairs of negative edges ``(u, v')``, where ``v'`` is chosen
uniformly from all the nodes of type ``dsttype``. The resulting edges will
also have type ``(srctype, etype, dsttype)``.
Parameters
----------
k : int
The number of negative samples per edge.
Examples
--------
>>> g = dgl.graph(([0, 1, 2], [1, 2, 3]))
>>> neg_sampler = dgl.dataloading.negative_sampler.PerSourceUniform(2)
>>> neg_sampler(g, torch.tensor([0, 1]))
(tensor([0, 0, 1, 1]), tensor([1, 0, 2, 3]))
"""
def __init__(self, k):
self.k = k
def _generate(self, g, eids, canonical_etype):
_, _, vtype = canonical_etype
shape = F.shape(eids)
dtype = F.dtype(eids)
ctx = F.context(eids)
shape = (shape[0] * self.k,)
src, _ = g.find_edges(eids, etype=canonical_etype)
src = F.repeat(src, self.k, 0)
dst = F.randint(shape, dtype, ctx, 0, g.number_of_nodes(vtype))
return src, dst
# Alias
Uniform = PerSourceUniform
class GlobalUniform(_BaseNegativeSampler):
"""Negative sampler that randomly chooses negative source-destination pairs according
to a uniform distribution.
For each edge ``(u, v)`` of type ``(srctype, etype, dsttype)``, DGL generates at most
:attr:`k` pairs of negative edges ``(u', v')``, where ``u'`` is chosen uniformly from
all the nodes of type ``srctype`` and ``v'`` is chosen uniformly from all the nodes
of type ``dsttype``. The resulting edges will also have type
``(srctype, etype, dsttype)``. DGL guarantees that the sampled pairs will not have
edges in between.
Parameters
----------
k : int
The desired number of negative samples to generate per edge.
exclude_self_loops : bool, optional
Whether to exclude self-loops from negative samples. (Default: True)
replace : bool, optional
Whether to sample with replacement. Setting it to True will make things
faster. (Default: True)
redundancy : float, optional
Indicates how much more negative samples to actually generate during rejection sampling
before finding the unique pairs.
Increasing it will increase the likelihood of getting :attr:`k` negative samples
per edge, but will also take more time and memory.
(Default: automatically determined by the density of graph)
Notes
-----
This negative sampler will try to generate as many negative samples as possible, but
it may rarely return less than :attr:`k` negative samples per edge.
This is more likely to happen if a graph is so small or dense that not many unique
negative samples exist.
Examples
--------
>>> g = dgl.graph(([0, 1, 2], [1, 2, 3]))
>>> neg_sampler = dgl.dataloading.negative_sampler.GlobalUniform(2, True)
>>> neg_sampler(g, torch.LongTensor([0, 1]))
(tensor([0, 1, 3, 2]), tensor([2, 0, 2, 1]))
"""
def __init__(self, k, exclude_self_loops=True, replace=False, redundancy=None):
self.k = k
self.exclude_self_loops = exclude_self_loops
self.replace = replace
self.redundancy = redundancy
def _generate(self, g, eids, canonical_etype):
return global_uniform_negative_sampling(
g, len(eids) * self.k, self.exclude_self_loops, self.replace,
canonical_etype, self.redundancy)
...@@ -10,7 +10,6 @@ from torch.utils.data.distributed import DistributedSampler ...@@ -10,7 +10,6 @@ from torch.utils.data.distributed import DistributedSampler
import torch.distributed as dist import torch.distributed as dist
from ..dataloader import NodeCollator, EdgeCollator, GraphCollator, SubgraphIterator from ..dataloader import NodeCollator, EdgeCollator, GraphCollator, SubgraphIterator
from ...distributed import DistGraph from ...distributed import DistGraph
from ...distributed import DistDataLoader
from ...ndarray import NDArray as DGLNDArray from ...ndarray import NDArray as DGLNDArray
from ... import backend as F from ... import backend as F
from ...base import DGLError from ...base import DGLError
...@@ -26,6 +25,10 @@ PYTORCH_VER = LooseVersion(th.__version__) ...@@ -26,6 +25,10 @@ PYTORCH_VER = LooseVersion(th.__version__)
PYTORCH_16 = PYTORCH_VER >= LooseVersion("1.6.0") PYTORCH_16 = PYTORCH_VER >= LooseVersion("1.6.0")
PYTORCH_17 = PYTORCH_VER >= LooseVersion("1.7.0") PYTORCH_17 = PYTORCH_VER >= LooseVersion("1.7.0")
def _check_graph_type(g):
if isinstance(g, DistGraph):
raise TypeError("Please use DistNodeDataLoader or DistEdgeDataLoader for DistGraph")
def _create_dist_sampler(dataset, dataloader_kwargs, ddp_seed): def _create_dist_sampler(dataset, dataloader_kwargs, ddp_seed):
# Note: will change the content of dataloader_kwargs # Note: will change the content of dataloader_kwargs
dist_sampler_kwargs = {'shuffle': dataloader_kwargs['shuffle']} dist_sampler_kwargs = {'shuffle': dataloader_kwargs['shuffle']}
...@@ -166,14 +169,6 @@ class _ScalarDataBatcher(th.utils.data.IterableDataset): ...@@ -166,14 +169,6 @@ class _ScalarDataBatcher(th.utils.data.IterableDataset):
"""Set epoch number for distributed training.""" """Set epoch number for distributed training."""
self.epoch = epoch self.epoch = epoch
def _remove_kwargs_dist(kwargs):
if 'num_workers' in kwargs:
del kwargs['num_workers']
if 'pin_memory' in kwargs:
del kwargs['pin_memory']
print('Distributed DataLoader does not support pin_memory')
return kwargs
# The following code is a fix to the PyTorch-specific issue in # The following code is a fix to the PyTorch-specific issue in
# https://github.com/dmlc/dgl/issues/2137 # https://github.com/dmlc/dgl/issues/2137
# #
...@@ -290,14 +285,14 @@ def _restore_storages(subgs, g): ...@@ -290,14 +285,14 @@ def _restore_storages(subgs, g):
_restore_subgraph_storage(subg, g) _restore_subgraph_storage(subg, g)
class _NodeCollator(NodeCollator): class _NodeCollator(NodeCollator):
def collate(self, items): def collate(self, items): # pylint: disable=missing-docstring
# input_nodes, output_nodes, blocks # input_nodes, output_nodes, blocks
result = super().collate(items) result = super().collate(items)
_pop_storages(result[-1], self.g) _pop_storages(result[-1], self.g)
return result return result
class _EdgeCollator(EdgeCollator): class _EdgeCollator(EdgeCollator):
def collate(self, items): def collate(self, items): # pylint: disable=missing-docstring
if self.negative_sampler is None: if self.negative_sampler is None:
# input_nodes, pair_graph, blocks # input_nodes, pair_graph, blocks
result = super().collate(items) result = super().collate(items)
...@@ -381,10 +376,10 @@ def _background_node_dataloader(dl_iter, g, device, results, load_input, load_ou ...@@ -381,10 +376,10 @@ def _background_node_dataloader(dl_iter, g, device, results, load_input, load_ou
class _NodeDataLoaderIter: class _NodeDataLoaderIter:
def __init__(self, node_dataloader): def __init__(self, node_dataloader, iter_):
self.device = node_dataloader.device self.device = node_dataloader.device
self.node_dataloader = node_dataloader self.node_dataloader = node_dataloader
self.iter_ = iter(node_dataloader.dataloader) self.iter_ = iter_
self.async_load = node_dataloader.async_load and ( self.async_load = node_dataloader.async_load and (
F.device_type(self.device) == 'cuda') F.device_type(self.device) == 'cuda')
if self.async_load: if self.async_load:
...@@ -418,10 +413,10 @@ class _NodeDataLoaderIter: ...@@ -418,10 +413,10 @@ class _NodeDataLoaderIter:
return input_nodes, output_nodes, blocks return input_nodes, output_nodes, blocks
class _EdgeDataLoaderIter: class _EdgeDataLoaderIter:
def __init__(self, edge_dataloader): def __init__(self, edge_dataloader, iter_):
self.device = edge_dataloader.device self.device = edge_dataloader.device
self.edge_dataloader = edge_dataloader self.edge_dataloader = edge_dataloader
self.iter_ = iter(edge_dataloader.dataloader) self.iter_ = iter_
# Make this an iterator for PyTorch Lightning compatibility # Make this an iterator for PyTorch Lightning compatibility
def __iter__(self): def __iter__(self):
...@@ -441,9 +436,9 @@ class _EdgeDataLoaderIter: ...@@ -441,9 +436,9 @@ class _EdgeDataLoaderIter:
return result return result
class _GraphDataLoaderIter: class _GraphDataLoaderIter:
def __init__(self, graph_dataloader): def __init__(self, graph_dataloader, iter_):
self.dataloader = graph_dataloader self.dataloader = graph_dataloader
self.iter_ = iter(graph_dataloader.dataloader) self.iter_ = iter_
def __iter__(self): def __iter__(self):
return self return self
...@@ -490,14 +485,9 @@ def _init_dataloader(collator, device, dataloader_kwargs, use_ddp, ddp_seed): ...@@ -490,14 +485,9 @@ def _init_dataloader(collator, device, dataloader_kwargs, use_ddp, ddp_seed):
else: else:
dist_sampler = None dist_sampler = None
dataloader = DataLoader( return use_scalar_batcher, scalar_batcher, dataset, collator, dist_sampler
dataset,
collate_fn=collator.collate,
**dataloader_kwargs)
return use_scalar_batcher, scalar_batcher, dataloader, dist_sampler class NodeDataLoader(DataLoader):
class NodeDataLoader:
"""PyTorch dataloader for batch-iterating over a set of nodes, generating the list """PyTorch dataloader for batch-iterating over a set of nodes, generating the list
of message flow graphs (MFGs) as computation dependency of the said minibatch. of message flow graphs (MFGs) as computation dependency of the said minibatch.
...@@ -600,6 +590,7 @@ class NodeDataLoader: ...@@ -600,6 +590,7 @@ class NodeDataLoader:
def __init__(self, g, nids, graph_sampler, device=None, use_ddp=False, ddp_seed=0, def __init__(self, g, nids, graph_sampler, device=None, use_ddp=False, ddp_seed=0,
load_input=None, load_output=None, async_load=False, **kwargs): load_input=None, load_output=None, async_load=False, **kwargs):
_check_graph_type(g)
collator_kwargs = {} collator_kwargs = {}
dataloader_kwargs = {} dataloader_kwargs = {}
for k, v in kwargs.items(): for k, v in kwargs.items():
...@@ -608,65 +599,42 @@ class NodeDataLoader: ...@@ -608,65 +599,42 @@ class NodeDataLoader:
else: else:
dataloader_kwargs[k] = v dataloader_kwargs[k] = v
if isinstance(g, DistGraph): if device is None:
if device is None: # default to the same device the graph is on
# for the distributed case default to the CPU device = th.device(g.device)
device = 'cpu'
assert device == 'cpu', 'Only cpu is supported in the case of a DistGraph.' if not g.is_homogeneous:
# Distributed DataLoader currently does not support heterogeneous graphs if load_input or load_output:
# and does not copy features. Fallback to normal solution raise DGLError('load_input/load_output not supported for heterograph yet.')
self.collator = NodeCollator(g, nids, graph_sampler, **collator_kwargs) self.load_input = {} if load_input is None else load_input
_remove_kwargs_dist(dataloader_kwargs) self.load_output = {} if load_output is None else load_output
self.dataloader = DistDataLoader(self.collator.dataset, self.async_load = async_load
collate_fn=self.collator.collate,
**dataloader_kwargs) # if the sampler supports it, tell it to output to the specified device.
self.is_distributed = True # But if async_load is enabled, set_output_context should be skipped as
else: # we'd like to avoid any graph/data transfer graphs across devices in
if device is None: # sampler. Such transfer will be handled in dataloader.
# default to the same device the graph is on num_workers = dataloader_kwargs.get('num_workers', 0)
device = th.device(g.device) if ((not async_load) and
callable(getattr(graph_sampler, "set_output_context", None)) and
if not g.is_homogeneous: num_workers == 0):
if load_input or load_output: graph_sampler.set_output_context(to_dgl_context(device))
raise DGLError('load_input/load_output not supported for heterograph yet.')
self.load_input = {} if load_input is None else load_input self.collator = _NodeCollator(g, nids, graph_sampler, **collator_kwargs)
self.load_output = {} if load_output is None else load_output self.use_scalar_batcher, self.scalar_batcher, self.dataloader, self.dist_sampler = \
self.async_load = async_load _init_dataloader(self.collator, device, dataloader_kwargs, use_ddp, ddp_seed)
# if the sampler supports it, tell it to output to the specified device.
# But if async_load is enabled, set_output_context should be skipped as
# we'd like to avoid any graph/data transfer graphs across devices in
# sampler. Such transfer will be handled in dataloader.
num_workers = dataloader_kwargs.get('num_workers', 0)
if ((not async_load) and
callable(getattr(graph_sampler, "set_output_context", None)) and
num_workers == 0):
graph_sampler.set_output_context(to_dgl_context(device))
self.collator = _NodeCollator(g, nids, graph_sampler, **collator_kwargs)
self.use_scalar_batcher, self.scalar_batcher, self.dataloader, self.dist_sampler = \
_init_dataloader(self.collator, device, dataloader_kwargs, use_ddp, ddp_seed)
self.use_ddp = use_ddp self.use_ddp = use_ddp
self.is_distributed = False self.is_distributed = False
# Precompute the CSR and CSC representations so each subprocess does not # Precompute the CSR and CSC representations so each subprocess does not
# duplicate. # duplicate.
if num_workers > 0: if num_workers > 0:
g.create_formats_() g.create_formats_()
self.device = device self.device = device
def __iter__(self): def __iter__(self):
"""Return the iterator of the data loader.""" return _NodeDataLoaderIter(self, super().__iter__())
if self.is_distributed:
# Directly use the iterator of DistDataLoader, which doesn't copy features anyway.
return iter(self.dataloader)
else:
return _NodeDataLoaderIter(self)
def __len__(self):
"""Return the number of batches of the data loader."""
return len(self.dataloader)
def set_epoch(self, epoch): def set_epoch(self, epoch):
"""Sets the epoch number for the underlying sampler which ensures all replicas """Sets the epoch number for the underlying sampler which ensures all replicas
...@@ -689,7 +657,7 @@ class NodeDataLoader: ...@@ -689,7 +657,7 @@ class NodeDataLoader:
else: else:
raise DGLError('set_epoch is only available when use_ddp is True.') raise DGLError('set_epoch is only available when use_ddp is True.')
class EdgeDataLoader: class EdgeDataLoader(DataLoader):
"""PyTorch dataloader for batch-iterating over a set of edges, generating the list """PyTorch dataloader for batch-iterating over a set of edges, generating the list
of message flow graphs (MFGs) as computation dependency of the said minibatch for of message flow graphs (MFGs) as computation dependency of the said minibatch for
edge classification, edge regression, and link prediction. edge classification, edge regression, and link prediction.
...@@ -897,8 +865,9 @@ class EdgeDataLoader: ...@@ -897,8 +865,9 @@ class EdgeDataLoader:
* Link prediction on heterogeneous graph: RGCN for link prediction. * Link prediction on heterogeneous graph: RGCN for link prediction.
""" """
collator_arglist = inspect.getfullargspec(EdgeCollator).args collator_arglist = inspect.getfullargspec(EdgeCollator).args
def __init__(self, g, eids, graph_sampler, device='cpu', use_ddp=False, ddp_seed=0,
def __init__(self, g, eids, graph_sampler, device='cpu', use_ddp=False, ddp_seed=0, **kwargs): **kwargs):
_check_graph_type(g)
collator_kwargs = {} collator_kwargs = {}
dataloader_kwargs = {} dataloader_kwargs = {}
for k, v in kwargs.items(): for k, v in kwargs.items():
...@@ -907,53 +876,30 @@ class EdgeDataLoader: ...@@ -907,53 +876,30 @@ class EdgeDataLoader:
else: else:
dataloader_kwargs[k] = v dataloader_kwargs[k] = v
if isinstance(g, DistGraph):
if device is None:
# for the distributed case default to the CPU
device = 'cpu'
assert device == 'cpu', 'Only cpu is supported in the case of a DistGraph.'
# Distributed DataLoader currently does not support heterogeneous graphs
# and does not copy features. Fallback to normal solution
self.collator = EdgeCollator(g, eids, graph_sampler, **collator_kwargs)
_remove_kwargs_dist(dataloader_kwargs)
self.dataloader = DistDataLoader(self.collator.dataset,
collate_fn=self.collator.collate,
**dataloader_kwargs)
self.is_distributed = True
else:
if device is None: if device is None:
# default to the same device the graph is on # default to the same device the graph is on
device = th.device(g.device) device = th.device(g.device)
# if the sampler supports it, tell it to output to the # if the sampler supports it, tell it to output to the
# specified device # specified device
num_workers = dataloader_kwargs.get('num_workers', 0) num_workers = dataloader_kwargs.get('num_workers', 0)
if callable(getattr(graph_sampler, "set_output_context", None)) and num_workers == 0: if callable(getattr(graph_sampler, "set_output_context", None)) and num_workers == 0:
graph_sampler.set_output_context(to_dgl_context(device)) graph_sampler.set_output_context(to_dgl_context(device))
self.collator = _EdgeCollator(g, eids, graph_sampler, **collator_kwargs) self.collator = EdgeCollator(g, eids, graph_sampler, **collator_kwargs)
self.use_scalar_batcher, self.scalar_batcher, self.dataloader, self.dist_sampler = \ self.use_scalar_batcher, self.scalar_batcher, dataset, collator, self.dist_sampler = \
_init_dataloader(self.collator, device, dataloader_kwargs, use_ddp, ddp_seed) _init_dataloader(self.collator, device, dataloader_kwargs, use_ddp, ddp_seed)
self.use_ddp = use_ddp self.use_ddp = use_ddp
self.is_distributed = False super().__init__(dataset, collate_fn=collator.collate, **dataloader_kwargs)
# Precompute the CSR and CSC representations so each subprocess does not duplicate. # Precompute the CSR and CSC representations so each subprocess does not duplicate.
if num_workers > 0: if num_workers > 0:
g.create_formats_() g.create_formats_()
self.device = device self.device = device
def __iter__(self): def __iter__(self):
"""Return the iterator of the data loader.""" return _EdgeDataLoaderIter(self, super().__iter__())
if self.is_distributed:
# Directly use the iterator of DistDataLoader, which doesn't copy features anyway.
return iter(self.dataloader)
else:
return _EdgeDataLoaderIter(self)
def __len__(self):
"""Return the number of batches of the data loader."""
return len(self.dataloader)
def set_epoch(self, epoch): def set_epoch(self, epoch):
"""Sets the epoch number for the underlying sampler which ensures all replicas """Sets the epoch number for the underlying sampler which ensures all replicas
...@@ -976,7 +922,7 @@ class EdgeDataLoader: ...@@ -976,7 +922,7 @@ class EdgeDataLoader:
else: else:
raise DGLError('set_epoch is only available when use_ddp is True.') raise DGLError('set_epoch is only available when use_ddp is True.')
class GraphDataLoader: class GraphDataLoader(DataLoader):
"""PyTorch dataloader for batch-iterating over a set of graphs, generating the batched """PyTorch dataloader for batch-iterating over a set of graphs, generating the batched
graph and corresponding label tensor (if provided) of the said minibatch. graph and corresponding label tensor (if provided) of the said minibatch.
...@@ -1023,7 +969,6 @@ class GraphDataLoader: ...@@ -1023,7 +969,6 @@ class GraphDataLoader:
... train_on(batched_graph, labels) ... train_on(batched_graph, labels)
""" """
collator_arglist = inspect.getfullargspec(GraphCollator).args collator_arglist = inspect.getfullargspec(GraphCollator).args
def __init__(self, dataset, collate_fn=None, use_ddp=False, ddp_seed=0, **kwargs): def __init__(self, dataset, collate_fn=None, use_ddp=False, ddp_seed=0, **kwargs):
collator_kwargs = {} collator_kwargs = {}
dataloader_kwargs = {} dataloader_kwargs = {}
...@@ -1058,14 +1003,11 @@ class GraphDataLoader: ...@@ -1058,14 +1003,11 @@ class GraphDataLoader:
if use_ddp: if use_ddp:
self.dist_sampler = _create_dist_sampler(dataset, dataloader_kwargs, ddp_seed) self.dist_sampler = _create_dist_sampler(dataset, dataloader_kwargs, ddp_seed)
dataloader_kwargs['sampler'] = self.dist_sampler dataloader_kwargs['sampler'] = self.dist_sampler
super().__init__(dataset, collate_fn=self.collate, **dataloader_kwargs)
self.dataloader = DataLoader(dataset=dataset,
collate_fn=self.collate,
**dataloader_kwargs)
def __iter__(self): def __iter__(self):
"""Return the iterator of the data loader.""" """Return the iterator of the data loader."""
return _GraphDataLoaderIter(self) return _GraphDataLoaderIter(self, super().__iter__())
def __len__(self): def __len__(self):
"""Return the number of batches of the data loader.""" """Return the number of batches of the data loader."""
......
"""ShaDow-GNN subgraph samplers."""
from ..utils import prepare_tensor_or_dict
from ..base import NID
from .. import transform
from ..sampling import sample_neighbors
from .neighbor import NeighborSamplingMixin
from .dataloader import exclude_edges, Sampler
class ShaDowKHopSampler(NeighborSamplingMixin, Sampler):
"""K-hop subgraph sampler used by
`ShaDow-GNN <https://arxiv.org/abs/2012.01380>`__.
It performs node-wise neighbor sampling but instead of returning a list of
MFGs, it returns a single subgraph induced by all the sampled nodes. The
seed nodes from which the neighbors are sampled will appear the first in the
induced nodes of the subgraph.
This is used in conjunction with :class:`dgl.dataloading.pytorch.NodeDataLoader`
and :class:`dgl.dataloading.pytorch.EdgeDataLoader`.
Parameters
----------
fanouts : list[int] or list[dict[etype, int]]
List of neighbors to sample per edge type for each GNN layer, with the i-th
element being the fanout for the i-th GNN layer.
If only a single integer is provided, DGL assumes that every edge type
will have the same fanout.
If -1 is provided for one edge type on one layer, then all inbound edges
of that edge type will be included.
replace : bool, default True
Whether to sample with replacement
prob : str, optional
If given, the probability of each neighbor being sampled is proportional
to the edge feature value with the given name in ``g.edata``. The feature must be
a scalar on each edge.
Examples
--------
To train a 3-layer GNN for node classification on a set of nodes ``train_nid`` on
a homogeneous graph where each node takes messages from 5, 10, 15 neighbors for
the first, second, and third layer respectively (assuming the backend is PyTorch):
>>> g = dgl.data.CoraFullDataset()[0]
>>> sampler = dgl.dataloading.ShaDowKHopSampler([5, 10, 15])
>>> dataloader = dgl.dataloading.NodeDataLoader(
... g, torch.arange(g.num_nodes()), sampler,
... batch_size=5, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, output_nodes, (subgraph,) in dataloader:
... print(subgraph)
... assert torch.equal(input_nodes, subgraph.ndata[dgl.NID])
... assert torch.equal(input_nodes[:output_nodes.shape[0]], output_nodes)
... break
Graph(num_nodes=529, num_edges=3796,
ndata_schemes={'label': Scheme(shape=(), dtype=torch.int64),
'feat': Scheme(shape=(8710,), dtype=torch.float32),
'_ID': Scheme(shape=(), dtype=torch.int64)}
edata_schemes={'_ID': Scheme(shape=(), dtype=torch.int64)})
If training on a heterogeneous graph and you want different number of neighbors for each
edge type, one should instead provide a list of dicts. Each dict would specify the
number of neighbors to pick per edge type.
>>> sampler = dgl.dataloading.ShaDowKHopSampler([
... {('user', 'follows', 'user'): 5,
... ('user', 'plays', 'game'): 4,
... ('game', 'played-by', 'user'): 3}] * 3)
If you would like non-uniform neighbor sampling:
>>> g.edata['p'] = torch.rand(g.num_edges()) # any non-negative 1D vector works
>>> sampler = dgl.dataloading.MultiLayerNeighborSampler([5, 10, 15], prob='p')
"""
def __init__(self, fanouts, replace=False, prob=None, output_ctx=None):
super().__init__(output_ctx)
self.fanouts = fanouts
self.replace = replace
self.prob = prob
self.set_output_context(output_ctx)
def sample(self, g, seed_nodes, exclude_eids=None):
self._build_fanout(len(self.fanouts), g)
self._build_prob_arrays(g)
seed_nodes = prepare_tensor_or_dict(g, seed_nodes, 'seed nodes')
output_nodes = seed_nodes
for i in range(len(self.fanouts)):
fanout = self.fanouts[i]
frontier = sample_neighbors(
g, seed_nodes, fanout, replace=self.replace, prob=self.prob_arrays)
block = transform.to_block(frontier, seed_nodes)
seed_nodes = block.srcdata[NID]
subg = g.subgraph(seed_nodes, relabel_nodes=True)
subg = exclude_edges(subg, exclude_eids, self.output_device)
return seed_nodes, output_nodes, [subg]
...@@ -14,15 +14,12 @@ Read the user guide :ref:`guide-minibatch`. ...@@ -14,15 +14,12 @@ Read the user guide :ref:`guide-minibatch`.
This package is experimental and the interfaces may be subject This package is experimental and the interfaces may be subject
to changes in future releases. It currently only has implementations in PyTorch. to changes in future releases. It currently only has implementations in PyTorch.
""" """
from .neighbor import * from .. import backend as F
from .dataloader import * from .neighbor_sampler import *
from .cluster_gcn import * from .cluster_gcn import *
from .shadow import * from .shadow import *
from .base import *
from . import negative_sampler from . import negative_sampler
from .async_transferer import AsyncTransferer
from .. import backend as F
if F.get_preferred_backend() == 'pytorch': if F.get_preferred_backend() == 'pytorch':
from .pytorch import * from .dataloader import *
from .dist_dataloader import *
"""Base classes and functionalities for dataloaders"""
from collections import Mapping
from ..base import NID, EID
from ..convert import heterograph
from .. import backend as F
from ..transform import compact_graphs
from ..frame import LazyFeature
from ..utils import recursive_apply
def _set_lazy_features(x, xdata, feature_names):
if feature_names is None:
return
if not isinstance(feature_names, Mapping):
xdata.update({k: LazyFeature(k) for k in feature_names})
else:
for type_, names in feature_names.items():
x[type_].data.update({k: LazyFeature(k) for k in names})
def set_node_lazy_features(g, feature_names):
"""Set lazy features for ``g.ndata`` if :attr:`feature_names` is a list of strings,
or ``g.nodes[ntype].data`` if :attr:`feature_names` is a dict of list of strings.
"""
return _set_lazy_features(g.nodes, g.ndata, feature_names)
def set_edge_lazy_features(g, feature_names):
"""Set lazy features for ``g.edata`` if :attr:`feature_names` is a list of strings,
or ``g.edges[etype].data`` if :attr:`feature_names` is a dict of list of strings.
"""
return _set_lazy_features(g.edges, g.edata, feature_names)
def set_src_lazy_features(g, feature_names):
"""Set lazy features for ``g.srcdata`` if :attr:`feature_names` is a list of strings,
or ``g.srcnodes[srctype].data`` if :attr:`feature_names` is a dict of list of strings.
"""
return _set_lazy_features(g.srcnodes, g.srcdata, feature_names)
def set_dst_lazy_features(g, feature_names):
"""Set lazy features for ``g.dstdata`` if :attr:`feature_names` is a list of strings,
or ``g.dstnodes[dsttype].data`` if :attr:`feature_names` is a dict of list of strings.
"""
return _set_lazy_features(g.dstnodes, g.dstdata, feature_names)
class BlockSampler(object):
"""BlockSampler is an abstract class assuming to take in a set of nodes whose
outputs are to compute, and return a list of blocks.
Moreover, it assumes that the input node features will be put in the first block's
``srcdata``, the output node labels will be put in the last block's ``dstdata``, and
the edge data will be put in all the blocks' ``edata``.
"""
def __init__(self, prefetch_node_feats=None, prefetch_labels=None,
prefetch_edge_feats=None, output_device=None):
self.prefetch_node_feats = prefetch_node_feats or []
self.prefetch_labels = prefetch_labels or []
self.prefetch_edge_feats = prefetch_edge_feats or []
self.output_device = output_device
def sample_blocks(self, g, seed_nodes, exclude_eids=None):
"""Generates a list of blocks from the given seed nodes.
This function must return a triplet where the first element is the input node IDs
for the first GNN layer (a tensor or a dict of tensors for heterogeneous graphs),
the second element is the output node IDs for the last GNN layer, and the third
element is the said list of blocks.
"""
raise NotImplementedError
def assign_lazy_features(self, result):
"""Assign lazy features for prefetching."""
# A LazyFeature is a placeholder telling the dataloader where and which IDs
# to prefetch. It has the signature LazyFeature(name, id_). id_ can be None
# if the LazyFeature is set into one of the subgraph's ``xdata``, in which case the
# dataloader will infer from the subgraph's ``xdata[dgl.NID]`` (or ``xdata[dgl.EID]``
# if the LazyFeature is set as edge features).
#
# If you want to prefetch things other than ndata and edata, you can also
# return a LazyFeature(name, id_). If a LazyFeature is returned in places other than
# in a graph's ndata/edata/srcdata/dstdata, the DataLoader will prefetch it
# from its dictionary ``other_data``.
# For instance, you can run
#
# return blocks, LazyFeature('other_feat', id_)
#
# To make it work with the sampler returning the stuff above, your dataloader
# needs to have the following
#
# dataloader.attach_data('other_feat', tensor)
#
# Then you can run
#
# for blocks, other_feat in dataloader:
# train_on(blocks, other_feat)
input_nodes, output_nodes, blocks = result
set_src_lazy_features(blocks[0], self.prefetch_node_feats)
set_dst_lazy_features(blocks[-1], self.prefetch_labels)
for block in blocks:
set_edge_lazy_features(block, self.prefetch_edge_feats)
return input_nodes, output_nodes, blocks
def sample(self, g, seed_nodes):
"""Sample a list of blocks from the given seed nodes."""
result = self.sample_blocks(g, seed_nodes)
return self.assign_lazy_features(result)
def _find_exclude_eids_with_reverse_id(g, eids, reverse_eid_map):
if isinstance(eids, Mapping):
eids = {g.to_canonical_etype(k): v for k, v in eids.items()}
exclude_eids = {
k: F.cat([v, F.gather_row(reverse_eid_map[k], v)], 0)
for k, v in eids.items()}
else:
exclude_eids = F.cat([eids, F.gather_row(reverse_eid_map, eids)], 0)
return exclude_eids
def _find_exclude_eids_with_reverse_types(g, eids, reverse_etype_map):
exclude_eids = {g.to_canonical_etype(k): v for k, v in eids.items()}
reverse_etype_map = {
g.to_canonical_etype(k): g.to_canonical_etype(v)
for k, v in reverse_etype_map.items()}
exclude_eids.update({reverse_etype_map[k]: v for k, v in exclude_eids.items()})
return exclude_eids
def _find_exclude_eids(g, exclude_mode, eids, **kwargs):
if exclude_mode is None:
return None
elif F.is_tensor(exclude_mode) or (
isinstance(exclude_mode, Mapping) and
all(F.is_tensor(v) for v in exclude_mode.values())):
return exclude_mode
elif exclude_mode == 'self':
return eids
elif exclude_mode == 'reverse_id':
return _find_exclude_eids_with_reverse_id(g, eids, kwargs['reverse_eid_map'])
elif exclude_mode == 'reverse_types':
return _find_exclude_eids_with_reverse_types(g, eids, kwargs['reverse_etype_map'])
else:
raise ValueError('unsupported mode {}'.format(exclude_mode))
def find_exclude_eids(g, seed_edges, exclude, reverse_eids=None, reverse_etypes=None,
output_device=None):
"""Find all edge IDs to exclude according to :attr:`exclude_mode`.
Parameters
----------
g : DGLGraph
The graph.
exclude_mode : str, optional
Can be either of the following,
None (default)
Does not exclude any edge.
Tensor or dict[etype, Tensor]
Exclude the given edge IDs.
'self'
Exclude the given edges themselves but nothing else.
'reverse_id'
Exclude all edges specified in ``eids``, as well as their reverse edges
of the same edge type.
The mapping from each edge ID to its reverse edge ID is specified in
the keyword argument ``reverse_eid_map``.
This mode assumes that the reverse of an edge with ID ``e`` and type
``etype`` will have ID ``reverse_eid_map[e]`` and type ``etype``.
'reverse_types'
Exclude all edges specified in ``eids``, as well as their reverse
edges of the corresponding edge types.
The mapping from each edge type to its reverse edge type is specified
in the keyword argument ``reverse_etype_map``.
This mode assumes that the reverse of an edge with ID ``e`` and type ``etype``
will have ID ``e`` and type ``reverse_etype_map[etype]``.
eids : Tensor or dict[etype, Tensor]
The edge IDs.
reverse_eids : Tensor or dict[etype, Tensor]
The mapping from edge ID to its reverse edge ID.
reverse_etypes : dict[etype, etype]
The mapping from edge etype to its reverse edge type.
output_device : device
The device of the output edge IDs.
"""
exclude_eids = _find_exclude_eids(
g,
exclude,
seed_edges,
reverse_eid_map=reverse_eids,
reverse_etype_map=reverse_etypes)
if exclude_eids is not None:
exclude_eids = recursive_apply(
exclude_eids, lambda x: x.to(output_device))
return exclude_eids
class EdgeBlockSampler(object):
"""Adapts a :class:`BlockSampler` object's :attr:`sample` method for edge
classification and link prediction.
"""
def __init__(self, block_sampler, exclude=None, reverse_eids=None,
reverse_etypes=None, negative_sampler=None, prefetch_node_feats=None,
prefetch_labels=None, prefetch_edge_feats=None):
self.reverse_eids = reverse_eids
self.reverse_etypes = reverse_etypes
self.exclude = exclude
self.block_sampler = block_sampler
self.negative_sampler = negative_sampler
self.prefetch_node_feats = prefetch_node_feats or []
self.prefetch_labels = prefetch_labels or []
self.prefetch_edge_feats = prefetch_edge_feats or []
self.output_device = block_sampler.output_device
def _build_neg_graph(self, g, seed_edges):
neg_srcdst = self.negative_sampler(g, seed_edges)
if not isinstance(neg_srcdst, Mapping):
assert len(g.canonical_etypes) == 1, \
'graph has multiple or no edge types; '\
'please return a dict in negative sampler.'
neg_srcdst = {g.canonical_etypes[0]: neg_srcdst}
dtype = F.dtype(list(neg_srcdst.values())[0][0])
neg_edges = {
etype: neg_srcdst.get(etype, (F.tensor([], dtype), F.tensor([], dtype)))
for etype in g.canonical_etypes}
neg_pair_graph = heterograph(
neg_edges, {ntype: g.num_nodes(ntype) for ntype in g.ntypes})
return neg_pair_graph
def assign_lazy_features(self, result):
"""Assign lazy features for prefetching."""
pair_graph = result[1]
blocks = result[-1]
set_src_lazy_features(blocks[0], self.prefetch_node_feats)
set_edge_lazy_features(pair_graph, self.prefetch_labels)
for block in blocks:
set_edge_lazy_features(block, self.prefetch_edge_feats)
# In-place updates
return result
def sample(self, g, seed_edges):
"""Samples a list of blocks, as well as a subgraph containing the sampled
edges from the original graph.
If :attr:`negative_sampler` is given, also returns another graph containing the
negative pairs as edges.
"""
exclude = self.exclude
pair_graph = g.edge_subgraph(
seed_edges, relabel_nodes=False, output_device=self.output_device)
eids = pair_graph.edata[EID]
if self.negative_sampler is not None:
neg_graph = self._build_neg_graph(g, seed_edges)
pair_graph, neg_graph = compact_graphs([pair_graph, neg_graph])
else:
pair_graph = compact_graphs(pair_graph)
pair_graph.edata[EID] = eids
seed_nodes = pair_graph.ndata[NID]
exclude_eids = find_exclude_eids(
g, seed_edges, exclude, self.reverse_eids, self.reverse_etypes,
self.output_device)
input_nodes, _, blocks = self.block_sampler.sample_blocks(g, seed_nodes, exclude_eids)
if self.negative_sampler is None:
return self.assign_lazy_features((input_nodes, pair_graph, blocks))
else:
return self.assign_lazy_features((input_nodes, pair_graph, neg_graph, blocks))
"""Cluster-GCN subgraph iterators.""" """Cluster-GCN samplers."""
import os import os
import pickle import pickle
import numpy as np import numpy as np
from ..transform import metis_partition_assignment
from .. import backend as F from .. import backend as F
from .dataloader import SubgraphIterator from ..base import DGLError
from ..partition import metis_partition_assignment
from .base import set_node_lazy_features, set_edge_lazy_features
class ClusterGCNSubgraphIterator(SubgraphIterator): class ClusterGCNSampler(object):
"""Subgraph sampler following that of ClusterGCN. """Cluster-GCN sampler.
This sampler first partitions the graph with METIS partitioning, then it caches the nodes of This sampler first partitions the graph with METIS partitioning, then it caches the nodes of
each partition to a file within the given cache directory. each partition to a file within the given cache directory.
This is used in conjunction with :class:`dgl.dataloading.pytorch.GraphDataLoader`. This is used in conjunction with :class:`dgl.dataloading.DataLoader`.
Notes Notes
----- -----
...@@ -23,61 +24,53 @@ class ClusterGCNSubgraphIterator(SubgraphIterator): ...@@ -23,61 +24,53 @@ class ClusterGCNSubgraphIterator(SubgraphIterator):
---------- ----------
g : DGLGraph g : DGLGraph
The original graph. The original graph.
num_partitions : int k : int
The number of partitions. The number of partitions.
cache_directory : str cache_path : str
The path to the cache directory for storing the partition result. The path to the cache directory for storing the partition result.
refresh : bool
If True, recompute the partition.
Examples
--------
Assuming that you have a graph ``g``:
>>> sgiter = dgl.dataloading.ClusterGCNSubgraphIterator(
... g, num_partitions=100, cache_directory='.', refresh=True)
>>> dataloader = dgl.dataloading.GraphDataLoader(sgiter, batch_size=4, num_workers=0)
>>> for subgraph_batch in dataloader:
... train_on(subgraph_batch)
""" """
def __init__(self, g, num_partitions, cache_directory, refresh=False): def __init__(self, g, k, balance_ntypes=None, balance_edges=False, mode='k-way',
if os.name == 'nt': prefetch_node_feats=None, prefetch_edge_feats=None, output_device=None,
raise NotImplementedError("METIS partitioning is not supported on Windows yet.") cache_path='cluster_gcn.pkl'):
super().__init__(g) if os.path.exists(cache_path):
try:
# First see if the cache is already there. If so, directly read from cache. with open(cache_path, 'rb') as f:
if not refresh and self._load_parts(cache_directory): self.partition_offset, self.partition_node_ids = pickle.load(f)
return except (EOFError, TypeError, ValueError):
raise DGLError(
# Otherwise, build the cache. f'The contents in the cache file {cache_path} is invalid. '
assignment = F.asnumpy(metis_partition_assignment(g, num_partitions)) f'Please remove the cache file {cache_path} or specify another path.')
self._save_parts(assignment, cache_directory) if len(self.partition_offset) != k + 1:
raise DGLError(
def _cache_file_path(self, cache_directory): f'Number of partitions in the cache does not match the value of k. '
return os.path.join(cache_directory, 'cluster_gcn_cache') f'Please remove the cache file {cache_path} or specify another path.')
if len(self.partition_node_ids) != g.num_nodes():
def _load_parts(self, cache_directory): raise DGLError(
path = self._cache_file_path(cache_directory) f'Number of nodes in the cache does not match the given graph. '
if not os.path.exists(path): f'Please remove the cache file {cache_path} or specify another path.')
return False else:
partition_ids = metis_partition_assignment(
with open(path, 'rb') as file_: g, k, balance_ntypes=balance_ntypes, balance_edges=balance_edges, mode=mode)
self.part_indptr, self.part_indices = pickle.load(file_) partition_ids = F.asnumpy(partition_ids)
return True partition_node_ids = np.argsort(partition_ids)
partition_size = F.zerocopy_from_numpy(np.bincount(partition_ids, minlength=k))
def _save_parts(self, assignment, cache_directory): partition_offset = F.zerocopy_from_numpy(np.insert(np.cumsum(partition_size), 0, 0))
os.makedirs(cache_directory, exist_ok=True) partition_node_ids = F.zerocopy_from_numpy(partition_ids)
with open(cache_path, 'wb') as f:
self.part_indices = np.argsort(assignment) pickle.dump((partition_offset, partition_node_ids), f)
num_nodes_per_part = np.bincount(assignment) self.partition_offset = partition_offset
self.part_indptr = np.insert(np.cumsum(num_nodes_per_part), 0, 0) self.partition_node_ids = partition_node_ids
with open(self._cache_file_path(cache_directory), 'wb') as file_: self.prefetch_node_feats = prefetch_node_feats or []
pickle.dump((self.part_indptr, self.part_indices), file_) self.prefetch_edge_feats = prefetch_edge_feats or []
self.output_device = output_device
def __len__(self):
return self.part_indptr.shape[0] - 1 def sample(self, g, partition_ids):
"""Samples a subgraph given a list of partition IDs."""
def __getitem__(self, i): node_ids = F.cat([
nodes = self.part_indices[self.part_indptr[i]:self.part_indptr[i+1]] self.partition_node_ids[self.partition_offset[i]:self.partition_offset[i+1]]
return self.g.subgraph(nodes) for i in F.asnumpy(partition_ids)], 0)
sg = g.subgraph(node_ids, relabel_nodes=True, output_device=self.output_device)
set_node_lazy_features(sg, self.prefetch_node_feats)
set_edge_lazy_features(sg, self.prefetch_edge_feats)
return sg
"""Data loaders""" """DGL PyTorch DataLoaders"""
from collections.abc import Mapping, Sequence from collections.abc import Mapping, Sequence
from abc import ABC, abstractproperty, abstractmethod from queue import Queue
import itertools
import threading
from distutils.version import LooseVersion
import random
import math
import inspect
import re import re
import numpy as np
from .. import transform
from ..base import NID, EID
from .. import backend as F
from .. import utils
from ..batch import batch
from ..convert import heterograph
from ..heterograph import DGLHeteroGraph as DGLGraph
from ..distributed.dist_graph import DistGraph
from ..utils import to_device
def _tensor_or_dict_to_numpy(ids):
if isinstance(ids, Mapping):
return {k: F.zerocopy_to_numpy(v) for k, v in ids.items()}
else:
return F.zerocopy_to_numpy(ids)
def _locate_eids_to_exclude(frontier_parent_eids, exclude_eids): import torch
"""Find the edges whose IDs in parent graph appeared in exclude_eids. import torch.distributed as dist
from torch.utils.data.distributed import DistributedSampler
from ..base import NID, EID, dgl_warning
from ..batch import batch as batch_graphs
from ..heterograph import DGLHeteroGraph
from .. import ndarray as nd
from ..utils import (
recursive_apply, ExceptionWrapper, recursive_apply_pair, set_num_threads,
create_shared_mem_array, get_shared_mem_array)
from ..frame import LazyFeature
from ..storages import wrap_storage
from .base import BlockSampler, EdgeBlockSampler
from .. import backend as F
Note that both arguments are numpy arrays or numpy dicts. class _TensorizedDatasetIter(object):
def __init__(self, dataset, batch_size, drop_last, mapping_keys):
self.dataset = dataset
self.batch_size = batch_size
self.drop_last = drop_last
self.mapping_keys = mapping_keys
self.index = 0
# For PyTorch Lightning compatibility
def __iter__(self):
return self
def _next_indices(self):
num_items = self.dataset.shape[0]
if self.index >= num_items:
raise StopIteration
end_idx = self.index + self.batch_size
if end_idx > num_items:
if self.drop_last:
raise StopIteration
end_idx = num_items
batch = self.dataset[self.index:end_idx]
self.index += self.batch_size
return batch
def __next__(self):
batch = self._next_indices()
if self.mapping_keys is None:
return batch
# convert the type-ID pairs to dictionary
type_ids = batch[:, 0]
indices = batch[:, 1]
type_ids_sortidx = torch.argsort(type_ids)
type_ids = type_ids[type_ids_sortidx]
indices = indices[type_ids_sortidx]
type_id_uniq, type_id_count = torch.unique_consecutive(type_ids, return_counts=True)
type_id_uniq = type_id_uniq.tolist()
type_id_offset = type_id_count.cumsum(0).tolist()
type_id_offset.insert(0, 0)
id_dict = {
self.mapping_keys[type_id_uniq[i]]: indices[type_id_offset[i]:type_id_offset[i+1]]
for i in range(len(type_id_uniq))}
return id_dict
def _get_id_tensor_from_mapping(indices, device, keys):
lengths = torch.LongTensor([
(indices[k].shape[0] if k in indices else 0) for k in keys], device=device)
type_ids = torch.arange(len(keys), device=device).repeat_interleave(lengths)
all_indices = torch.cat([indices[k] for k in keys if k in indices])
return torch.stack([type_ids, all_indices], 1)
def _divide_by_worker(dataset):
num_samples = dataset.shape[0]
worker_info = torch.utils.data.get_worker_info()
if worker_info:
chunk_size = num_samples // worker_info.num_workers
left_over = num_samples % worker_info.num_workers
start = (chunk_size * worker_info.id) + min(left_over, worker_info.id)
end = start + chunk_size + (worker_info.id < left_over)
assert worker_info.id < worker_info.num_workers - 1 or end == num_samples
dataset = dataset[start:end]
return dataset
class TensorizedDataset(torch.utils.data.IterableDataset):
"""Custom Dataset wrapper that returns a minibatch as tensors or dicts of tensors.
When the dataset is on the GPU, this significantly reduces the overhead.
""" """
if isinstance(frontier_parent_eids, Mapping): def __init__(self, indices, batch_size, drop_last):
result = { if isinstance(indices, Mapping):
k: np.isin(frontier_parent_eids[k], exclude_eids[k]).nonzero()[0] self._mapping_keys = list(indices.keys())
for k in frontier_parent_eids.keys() if k in exclude_eids.keys()} self._device = next(iter(indices.values())).device
return {k: F.zerocopy_from_numpy(v) for k, v in result.items()} self._tensor_dataset = _get_id_tensor_from_mapping(
else: indices, self._device, self._mapping_keys)
result = np.isin(frontier_parent_eids, exclude_eids).nonzero()[0]
return F.zerocopy_from_numpy(result)
class _EidExcluder():
def __init__(self, exclude_eids):
device = None
if isinstance(exclude_eids, Mapping):
for _, v in exclude_eids.items():
if device is None:
device = F.context(v)
break
else: else:
device = F.context(exclude_eids) self._tensor_dataset = indices
self._exclude_eids = None self._device = indices.device
self._filter = None self._mapping_keys = None
self.batch_size = batch_size
if device == F.cpu(): self.drop_last = drop_last
# TODO(nv-dlasalle): Once Filter is implemented for the CPU, we
# should just use that irregardless of the device. def shuffle(self):
self._exclude_eids = ( """Shuffle the dataset."""
_tensor_or_dict_to_numpy(exclude_eids) if exclude_eids is not None else None) # TODO: may need an in-place shuffle kernel
perm = torch.randperm(self._tensor_dataset.shape[0], device=self._device)
self._tensor_dataset[:] = self._tensor_dataset[perm]
def __iter__(self):
dataset = _divide_by_worker(self._tensor_dataset)
return _TensorizedDatasetIter(
dataset, self.batch_size, self.drop_last, self._mapping_keys)
def __len__(self):
num_samples = self._tensor_dataset.shape[0]
return (num_samples + (0 if self.drop_last else (self.batch_size - 1))) // self.batch_size
def _get_shared_mem_name(id_):
return f'ddp_{id_}'
def _generate_shared_mem_name_id():
for _ in range(3): # 3 trials
id_ = random.getrandbits(32)
name = _get_shared_mem_name(id_)
if not nd.exist_shared_mem_array(name):
return name, id_
raise DGLError('Unable to generate a shared memory array')
class DDPTensorizedDataset(torch.utils.data.IterableDataset):
"""Custom Dataset wrapper that returns a minibatch as tensors or dicts of tensors.
When the dataset is on the GPU, this significantly reduces the overhead.
This class additionally saves the index tensor in shared memory and therefore
avoids duplicating the same index tensor during shuffling.
"""
def __init__(self, indices, batch_size, drop_last, ddp_seed):
if isinstance(indices, Mapping):
self._mapping_keys = list(indices.keys())
else: else:
if isinstance(exclude_eids, Mapping): self._mapping_keys = None
self._filter = {k: utils.Filter(v) for k, v in exclude_eids.items()}
else:
self._filter = utils.Filter(exclude_eids)
def _find_indices(self, parent_eids): self.rank = dist.get_rank()
""" Find the set of edge indices to remove. self.num_replicas = dist.get_world_size()
""" self.seed = ddp_seed
if self._exclude_eids is not None: self.epoch = 0
parent_eids_np = _tensor_or_dict_to_numpy(parent_eids) self.batch_size = batch_size
return _locate_eids_to_exclude(parent_eids_np, self._exclude_eids) self.drop_last = drop_last
if self.drop_last and len(indices) % self.num_replicas != 0:
self.num_samples = math.ceil((len(indices) - self.num_replicas) / self.num_replicas)
else: else:
assert self._filter is not None self.num_samples = math.ceil(len(indices) / self.num_replicas)
if isinstance(parent_eids, Mapping): self.total_size = self.num_samples * self.num_replicas
located_eids = {k: self._filter[k].find_included_indices(parent_eids[k]) # If drop_last is True, we create a shared memory array larger than the number
for k, v in parent_eids.items() if k in self._filter} # of indices since we will need to pad it after shuffling to make it evenly
# divisible before every epoch. If drop_last is False, we create an array
# with the same size as the indices so we can trim it later.
self.shared_mem_size = self.total_size if not self.drop_last else len(indices)
self.num_indices = len(indices)
if self.rank == 0:
name, id_ = _generate_shared_mem_name_id()
if isinstance(indices, Mapping):
device = next(iter(indices.values())).device
id_tensor = _get_id_tensor_from_mapping(indices, device, self._mapping_keys)
self._tensor_dataset = create_shared_mem_array(
name, (self.shared_mem_size, 2), torch.int64)
self._tensor_dataset[:id_tensor.shape[0], :] = id_tensor
else: else:
located_eids = self._filter.find_included_indices(parent_eids) self._tensor_dataset = create_shared_mem_array(
return located_eids name, (self.shared_mem_size,), torch.int64)
self._tensor_dataset[:len(indices)] = indices
def __call__(self, frontier): self._device = self._tensor_dataset.device
parent_eids = frontier.edata[EID] meta_info = torch.LongTensor([id_, self._tensor_dataset.shape[0]])
located_eids = self._find_indices(parent_eids)
if not isinstance(located_eids, Mapping):
# (BarclayII) If frontier already has a EID field and located_eids is empty,
# the returned graph will keep EID intact. Otherwise, EID will change
# to the mapping from the new graph to the old frontier.
# So we need to test if located_eids is empty, and do the remapping ourselves.
if len(located_eids) > 0:
frontier = transform.remove_edges(
frontier, located_eids, store_ids=True)
frontier.edata[EID] = F.gather_row(parent_eids, frontier.edata[EID])
else:
# (BarclayII) remove_edges only accepts removing one type of edges,
# so I need to keep track of the edge IDs left one by one.
new_eids = parent_eids.copy()
for k, v in located_eids.items():
if len(v) > 0:
frontier = transform.remove_edges(
frontier, v, etype=k, store_ids=True)
new_eids[k] = F.gather_row(parent_eids[k], frontier.edges[k].data[EID])
frontier.edata[EID] = new_eids
return frontier
def exclude_edges(subg, exclude_eids, device):
"""Find and remove from the subgraph the edges whose IDs in the parent
graph are given.
Parameters
----------
subg : DGLGraph
The subgraph. Must have ``dgl.EID`` field containing the original
edge IDs in the parent graph.
exclude_eids : Tensor or dict
The edge IDs to exclude.
device : device
The output device of the graph.
Returns
-------
DGLGraph
The new subgraph with edges removed. The ``dgl.EID`` field contains
the original edge IDs in the same parent graph.
"""
if exclude_eids is None:
return subg
if device is not None:
if isinstance(exclude_eids, Mapping):
exclude_eids = {k: F.copy_to(v, device) \
for k, v in exclude_eids.items()}
else: else:
exclude_eids = F.copy_to(exclude_eids, device) meta_info = torch.LongTensor([0, 0])
excluder = _EidExcluder(exclude_eids) if dist.get_backend() == 'nccl':
return subg if excluder is None else excluder(subg) # Use default CUDA device; PyTorch DDP required the users to set the CUDA
# device for each process themselves so calling .cuda() should be safe.
meta_info = meta_info.cuda()
def _find_exclude_eids_with_reverse_id(g, eids, reverse_eid_map): dist.broadcast(meta_info, src=0)
if isinstance(eids, Mapping):
eids = {g.to_canonical_etype(k): v for k, v in eids.items()} if self.rank != 0:
exclude_eids = { id_, num_samples = meta_info.tolist()
k: F.cat([v, F.gather_row(reverse_eid_map[k], v)], 0) name = _get_shared_mem_name(id_)
for k, v in eids.items()} if isinstance(indices, Mapping):
indices_shared = get_shared_mem_array(name, (num_samples, 2), torch.int64)
else:
indices_shared = get_shared_mem_array(name, (num_samples,), torch.int64)
self._tensor_dataset = indices_shared
self._device = indices_shared.device
def shuffle(self):
"""Shuffles the dataset."""
# Only rank 0 does the actual shuffling. The other ranks wait for it.
if self.rank == 0:
self._tensor_dataset[:self.num_indices] = self._tensor_dataset[
torch.randperm(self.num_indices, device=self._device)]
if not self.drop_last:
# pad extra
self._tensor_dataset[self.num_indices:] = \
self._tensor_dataset[:self.total_size - self.num_indices]
dist.barrier()
def __iter__(self):
start = self.num_samples * self.rank
end = self.num_samples * (self.rank + 1)
dataset = _divide_by_worker(self._tensor_dataset[start:end])
return _TensorizedDatasetIter(
dataset, self.batch_size, self.drop_last, self._mapping_keys)
def __len__(self):
return (self.num_samples + (0 if self.drop_last else (self.batch_size - 1))) // \
self.batch_size
def _prefetch_update_feats(feats, frames, types, get_storage_func, id_name, device, pin_memory):
for tid, frame in enumerate(frames):
type_ = types[tid]
default_id = frame.get(id_name, None)
for key in frame.keys():
column = frame[key]
if isinstance(column, LazyFeature):
parent_key = column.name or key
if column.id_ is None and default_id is None:
raise DGLError(
'Found a LazyFeature with no ID specified, '
'and the graph does not have dgl.NID or dgl.EID columns')
feats[tid, key] = get_storage_func(parent_key, type_).fetch(
column.id_ or default_id, device, pin_memory)
# This class exists to avoid recursion into the feature dictionary returned by the
# prefetcher when calling recursive_apply().
class _PrefetchedGraphFeatures(object):
__slots__ = ['node_feats', 'edge_feats']
def __init__(self, node_feats, edge_feats):
self.node_feats = node_feats
self.edge_feats = edge_feats
def _prefetch_for_subgraph(subg, dataloader):
node_feats, edge_feats = {}, {}
_prefetch_update_feats(
node_feats, subg._node_frames, subg.ntypes, dataloader.graph.get_node_storage,
NID, dataloader.device, dataloader.pin_memory)
_prefetch_update_feats(
edge_feats, subg._edge_frames, subg.canonical_etypes, dataloader.graph.get_edge_storage,
EID, dataloader.device, dataloader.pin_memory)
return _PrefetchedGraphFeatures(node_feats, edge_feats)
def _prefetch_for(item, dataloader):
if isinstance(item, DGLHeteroGraph):
return _prefetch_for_subgraph(item, dataloader)
elif isinstance(item, LazyFeature):
return dataloader.other_storages[item.name].fetch(
item.id_, dataloader.device, dataloader.pin_memory)
else: else:
exclude_eids = F.cat([eids, F.gather_row(reverse_eid_map, eids)], 0) return None
return exclude_eids
def _find_exclude_eids_with_reverse_types(g, eids, reverse_etype_map):
exclude_eids = {g.to_canonical_etype(k): v for k, v in eids.items()}
reverse_etype_map = {
g.to_canonical_etype(k): g.to_canonical_etype(v)
for k, v in reverse_etype_map.items()}
exclude_eids.update({reverse_etype_map[k]: v for k, v in exclude_eids.items()})
return exclude_eids
def _find_exclude_eids(g, exclude_mode, eids, **kwargs):
"""Find all edge IDs to exclude according to :attr:`exclude_mode`.
Parameters def _await_or_return(x):
---------- if hasattr(x, 'wait'):
g : DGLGraph return x.wait()
The graph. elif isinstance(x, _PrefetchedGraphFeatures):
exclude_mode : str, optional node_feats = recursive_apply(x.node_feats, _await_or_return)
Can be either of the following, edge_feats = recursive_apply(x.edge_feats, _await_or_return)
return _PrefetchedGraphFeatures(node_feats, edge_feats)
None (default)
Does not exclude any edge.
'self'
Exclude the given edges themselves but nothing else.
'reverse_id'
Exclude all edges specified in ``eids``, as well as their reverse edges
of the same edge type.
The mapping from each edge ID to its reverse edge ID is specified in
the keyword argument ``reverse_eid_map``.
This mode assumes that the reverse of an edge with ID ``e`` and type
``etype`` will have ID ``reverse_eid_map[e]`` and type ``etype``.
'reverse_types'
Exclude all edges specified in ``eids``, as well as their reverse
edges of the corresponding edge types.
The mapping from each edge type to its reverse edge type is specified
in the keyword argument ``reverse_etype_map``.
This mode assumes that the reverse of an edge with ID ``e`` and type ``etype``
will have ID ``e`` and type ``reverse_etype_map[etype]``.
eids : Tensor or dict[etype, Tensor]
The edge IDs.
reverse_eid_map : Tensor or dict[etype, Tensor]
The mapping from edge ID to its reverse edge ID.
reverse_etype_map : dict[etype, etype]
The mapping from edge etype to its reverse edge type.
"""
if exclude_mode is None:
return None
elif exclude_mode == 'self':
return eids
elif exclude_mode == 'reverse_id':
return _find_exclude_eids_with_reverse_id(g, eids, kwargs['reverse_eid_map'])
elif exclude_mode == 'reverse_types':
return _find_exclude_eids_with_reverse_types(g, eids, kwargs['reverse_etype_map'])
else: else:
raise ValueError('unsupported mode {}'.format(exclude_mode)) return x
class Sampler(object):
"""An abstract class that takes in a graph and a set of seed nodes and returns a def _prefetch(batch, dataloader, stream):
structure representing a smaller portion of the graph for computation. It can # feats has the same nested structure of batch, except that
be either a list of bipartite graphs (i.e. :class:`BlockSampler`), or a single # (1) each subgraph is replaced with a pair of node features and edge features, both
subgraph. # being dictionaries whose keys are (type_id, column_name) and values are either
# tensors or futures.
# (2) each LazyFeature object is replaced with a tensor or future.
# (3) everything else are replaced with None.
#
# Once the futures are fetched, this function waits for them to complete by
# calling its wait() method.
with torch.cuda.stream(stream):
feats = recursive_apply(batch, _prefetch_for, dataloader)
feats = recursive_apply(feats, _await_or_return)
return feats
def _assign_for(item, feat):
if isinstance(item, DGLHeteroGraph):
subg = item
for (tid, key), value in feat.node_feats.items():
assert isinstance(subg._node_frames[tid][key], LazyFeature)
subg._node_frames[tid][key] = value
for (tid, key), value in feat.edge_feats.items():
assert isinstance(subg._edge_frames[tid][key], LazyFeature)
subg._edge_frames[tid][key] = value
return subg
elif isinstance(item, LazyFeature):
return feat
else:
return item
def _prefetcher_entry(dataloader_it, dataloader, queue, num_threads, use_alternate_streams):
# PyTorch will set the number of threads to 1 which slows down pin_memory() calls
# in main process if a prefetching thread is created.
if num_threads is not None:
torch.set_num_threads(num_threads)
if use_alternate_streams:
stream = (
torch.cuda.Stream(device=dataloader.device)
if dataloader.device.type == 'cuda' else None)
else:
stream = None
try:
for batch in dataloader_it:
batch = recursive_apply(batch, restore_parent_storage_columns, dataloader.graph)
feats = _prefetch(batch, dataloader, stream)
queue.put((
# batch will be already in pinned memory as per the behavior of
# PyTorch DataLoader.
recursive_apply(batch, lambda x: x.to(dataloader.device, non_blocking=True)),
feats,
stream.record_event() if stream is not None else None,
None))
queue.put((None, None, None, None))
except: # pylint: disable=bare-except
queue.put((None, None, None, ExceptionWrapper(where='in prefetcher')))
# DGLHeteroGraphs have the semantics of lazy feature slicing with subgraphs. Such behavior depends
# on that DGLHeteroGraph's ndata and edata are maintained by Frames. So to maintain compatibility
# with older code, DGLHeteroGraphs and other graph storages are handled separately: (1)
# DGLHeteroGraphs will preserve the lazy feature slicing for subgraphs. (2) Other graph storages
# will not have lazy feature slicing; all feature slicing will be eager.
def remove_parent_storage_columns(item, g):
"""Removes the storage objects in the given graphs' Frames if it is a sub-frame of the
given parent graph, so that the storages are not serialized during IPC from PyTorch
DataLoader workers.
""" """
def __init__(self, output_ctx=None): if not isinstance(item, DGLHeteroGraph) or not isinstance(g, DGLHeteroGraph):
self.set_output_context(output_ctx) return item
def sample(self, g, seed_nodes, exclude_eids=None): for subframe, frame in zip(
"""Sample a structure from the graph. itertools.chain(item._node_frames, item._edge_frames),
itertools.chain(g._node_frames, g._edge_frames)):
Parameters for key in list(subframe.keys()):
---------- subcol = subframe._columns[key] # directly get the column object
g : DGLGraph if isinstance(subcol, LazyFeature):
The original graph. continue
seed_nodes : Tensor or dict[ntype, Tensor] col = frame._columns.get(key, None)
The destination nodes by type. if col is None:
continue
If the graph only has one node type, one can just specify a single tensor if col.storage is subcol.storage:
of node IDs. subcol.storage = None
exclude_eids : Tensor or dict[etype, Tensor] return item
The edges to exclude from computation dependency.
Returns def restore_parent_storage_columns(item, g):
------- """Restores the storage objects in the given graphs' Frames if it is a sub-frame of the
Tensor or dict[ntype, Tensor] given parent graph (i.e. when the storage object is None).
The nodes whose input features are required for computing the output
representation of :attr:`seed_nodes`.
any
Any data representing the structure.
"""
raise NotImplementedError
def set_output_context(self, ctx):
"""Set the device the generated block or subgraph will be output to.
This should only be set to a cuda device, when multi-processing is not
used in the dataloader (e.g., num_workers is 0).
Parameters
----------
ctx : DGLContext, default None
The device context the sampled blocks will be stored on. This
should only be a CUDA context if multiprocessing is not used in
the dataloader (e.g., num_workers is 0). If this is None, the
sampled blocks will be stored on the same device as the input
graph.
"""
if ctx is not None:
self.output_device = F.to_backend_ctx(ctx)
else:
self.output_device = None
class BlockSampler(Sampler):
"""Abstract class specifying the neighborhood sampling strategy for DGL data loaders.
The main method for BlockSampler is :meth:`sample`,
which generates a list of message flow graphs (MFGs) for a multi-layer GNN given a set of
seed nodes to have their outputs computed.
The default implementation of :meth:`sample` is
to repeat :attr:`num_layers` times the following procedure from the last layer to the first
layer:
* Obtain a frontier. The frontier is defined as a graph with the same nodes as the
original graph but only the edges involved in message passing on the current layer.
Customizable via :meth:`sample_frontier`.
* Optionally, if the task is link prediction or edge classfication, remove edges
connecting training node pairs. If the graph is undirected, also remove the
reverse edges. This is controlled by the argument :attr:`exclude_eids` in
:meth:`sample` method.
* Convert the frontier into a MFG.
* Optionally assign the IDs of the edges in the original graph selected in the first step
to the MFG, controlled by the argument ``return_eids`` in
:meth:`sample` method.
* Prepend the MFG to the MFG list to be returned.
All subclasses should override :meth:`sample_frontier`
method while specifying the number of layers to sample in :attr:`num_layers` argument.
Parameters
----------
num_layers : int
The number of layers to sample.
return_eids : bool, default False
Whether to return the edge IDs involved in message passing in the MFG.
If True, the edge IDs will be stored as an edge feature named ``dgl.EID``.
output_ctx : DGLContext, default None
The context the sampled blocks will be stored on. This should only be
a CUDA context if multiprocessing is not used in the dataloader (e.g.,
num_workers is 0). If this is None, the sampled blocks will be stored
on the same device as the input graph.
exclude_edges_in_frontier : bool, default False
If True, the :func:`sample_frontier` method will receive an argument
:attr:`exclude_eids` containing the edge IDs from the original graph to exclude.
The :func:`sample_frontier` method must return a graph that does not contain
the edges corresponding to the excluded edges. No additional postprocessing
will be done.
Otherwise, the edges will be removed *after* :func:`sample_frontier` returns.
Notes
-----
For the concept of frontiers and MFGs, please refer to
:ref:`User Guide Section 6 <guide-minibatch>` and
:doc:`Minibatch Training Tutorials <tutorials/large/L0_neighbor_sampling_overview>`.
""" """
def __init__(self, num_layers, return_eids=False, output_ctx=None): if not isinstance(item, DGLHeteroGraph) or not isinstance(g, DGLHeteroGraph):
super().__init__(output_ctx) return item
self.num_layers = num_layers
self.return_eids = return_eids for subframe, frame in zip(
itertools.chain(item._node_frames, item._edge_frames),
# pylint: disable=unused-argument itertools.chain(g._node_frames, g._edge_frames)):
@staticmethod for key in subframe.keys():
def assign_block_eids(block, frontier): subcol = subframe._columns[key]
"""Assigns edge IDs from the original graph to the message flow graph (MFG). if isinstance(subcol, LazyFeature):
continue
See also col = frame._columns.get(key, None)
-------- if col is None:
BlockSampler continue
""" if subcol.storage is None:
for etype in block.canonical_etypes: subcol.storage = col.storage
block.edges[etype].data[EID] = frontier.edges[etype].data[EID][ return item
block.edges[etype].data[EID]]
return block
class _PrefetchingIter(object):
# This is really a hack working around the lack of GPU-based neighbor sampling def __init__(self, dataloader, dataloader_it, use_thread=False, use_alternate_streams=True,
# with edge exclusion. num_threads=None):
@classmethod self.queue = Queue(1)
def exclude_edges_in_frontier(cls, g): self.dataloader_it = dataloader_it
"""Returns whether the sampler will exclude edges in :func:`sample_frontier`. self.dataloader = dataloader
self.graph_sampler = self.dataloader.graph_sampler
If this method returns True, the method :func:`sample_frontier` will receive an self.pin_memory = self.dataloader.pin_memory
argument :attr:`exclude_eids` from :func:`sample`. :func:`sample_frontier` self.num_threads = num_threads
is then responsible for removing those edges.
self.use_thread = use_thread
If this method returns False, :func:`sample` will be responsible for self.use_alternate_streams = use_alternate_streams
removing the edges. if use_thread:
thread = threading.Thread(
When subclassing :class:`BlockSampler`, this method should return True when you target=_prefetcher_entry,
would like to remove the excluded edges in your :func:`sample_frontier` method. args=(dataloader_it, dataloader, self.queue, num_threads, use_alternate_streams),
daemon=True)
By default this method returns False. thread.start()
self.thread = thread
Parameters
---------- def __iter__(self):
g : DGLGraph return self
The original graph
def _next_non_threaded(self):
Returns batch = next(self.dataloader_it)
------- batch = recursive_apply(batch, restore_parent_storage_columns, self.dataloader.graph)
bool device = self.dataloader.device
Whether :func:`sample_frontier` will receive an argument :attr:`exclude_eids`. if self.use_alternate_streams:
""" stream = torch.cuda.Stream(device=device) if device.type == 'cuda' else None
return False
def sample_frontier(self, block_id, g, seed_nodes, exclude_eids=None):
"""Generate the frontier given the destination nodes.
The subclasses should override this function.
Parameters
----------
block_id : int
Represents which GNN layer the frontier is generated for.
g : DGLGraph
The original graph.
seed_nodes : Tensor or dict[ntype, Tensor]
The destination nodes by node type.
If the graph only has one node type, one can just specify a single tensor
of node IDs.
exclude_eids: Tensor or dict
Edge IDs to exclude during sampling neighbors for the seed nodes.
This argument can take a single ID tensor or a dictionary of edge types and ID tensors.
If a single tensor is given, the graph must only have one type of nodes.
Returns
-------
DGLGraph
The frontier generated for the current layer.
Notes
-----
For the concept of frontiers and MFGs, please refer to
:ref:`User Guide Section 6 <guide-minibatch>` and
:doc:`Minibatch Training Tutorials <tutorials/large/L0_neighbor_sampling_overview>`.
"""
raise NotImplementedError
def sample(self, g, seed_nodes, exclude_eids=None):
"""Generate the a list of MFGs given the destination nodes.
Parameters
----------
g : DGLGraph
The original graph.
seed_nodes : Tensor or dict[ntype, Tensor]
The destination nodes by node type.
If the graph only has one node type, one can just specify a single tensor
of node IDs.
exclude_eids : Tensor or dict[etype, Tensor]
The edges to exclude from computation dependency.
Returns
-------
list[DGLGraph]
The MFGs generated for computing the multi-layer GNN output.
Notes
-----
For the concept of frontiers and MFGs, please refer to
:ref:`User Guide Section 6 <guide-minibatch>` and
:doc:`Minibatch Training Tutorials <tutorials/large/L0_neighbor_sampling_overview>`.
"""
blocks = []
if isinstance(g, DistGraph):
# TODO:(nv-dlasalle) dist graphs may not have an associated graph,
# causing an error when trying to fetch the device, so for now,
# always assume the distributed graph's device is CPU.
graph_device = F.cpu()
else: else:
graph_device = g.device stream = None
feats = _prefetch(batch, self.dataloader, stream)
for block_id in reversed(range(self.num_layers)): batch = recursive_apply(batch, lambda x: x.to(device, non_blocking=True))
seed_nodes_in = to_device(seed_nodes, graph_device) stream_event = stream.record_event() if stream is not None else None
return batch, feats, stream_event
if self.exclude_edges_in_frontier(g):
frontier = self.sample_frontier( def _next_threaded(self):
block_id, g, seed_nodes_in, exclude_eids=exclude_eids) batch, feats, stream_event, exception = self.queue.get()
else: if batch is None:
frontier = self.sample_frontier(block_id, g, seed_nodes_in) self.thread.join()
if exception is None:
if self.output_device is not None: raise StopIteration
frontier = frontier.to(self.output_device) exception.reraise()
seed_nodes_out = to_device(seed_nodes, self.output_device) return batch, feats, stream_event
else:
seed_nodes_out = seed_nodes def __next__(self):
batch, feats, stream_event = \
# Removing edges from the frontier for link prediction training falls self._next_non_threaded() if not self.use_thread else self._next_threaded()
# into the category of frontier postprocessing batch = recursive_apply_pair(batch, feats, _assign_for)
if not self.exclude_edges_in_frontier(g): if stream_event is not None:
frontier = exclude_edges(frontier, exclude_eids, self.output_device) stream_event.wait()
return batch
block = transform.to_block(frontier, seed_nodes_out)
if self.return_eids:
self.assign_block_eids(block, frontier) # Make them classes to work with pickling in mp.spawn
class CollateWrapper(object):
seed_nodes = {ntype: block.srcnodes[ntype].data[NID] for ntype in block.srctypes} """Wraps a collate function with :func:`remove_parent_storage_columns` for serializing
blocks.insert(0, block) from PyTorch DataLoader workers.
return blocks[0].srcdata[NID], blocks[-1].dstdata[NID], blocks
def sample_blocks(self, g, seed_nodes, exclude_eids=None):
"""Deprecated and identical to :meth:`sample`.
"""
return self.sample(g, seed_nodes, exclude_eids)
class Collator(ABC):
"""Abstract DGL collator for training GNNs on downstream tasks stochastically.
Provides a :attr:`dataset` object containing the collection of all nodes or edges,
as well as a :attr:`collate` method that combines a set of items from
:attr:`dataset` and obtains the message flow graphs (MFGs).
Notes
-----
For the concept of MFGs, please refer to
:ref:`User Guide Section 6 <guide-minibatch>` and
:doc:`Minibatch Training Tutorials <tutorials/large/L0_neighbor_sampling_overview>`.
"""
@abstractproperty
def dataset(self):
"""Returns the dataset object of the collator."""
raise NotImplementedError
@abstractmethod
def collate(self, items):
"""Combines the items from the dataset object and obtains the list of MFGs.
Parameters
----------
items : list[str, int]
The list of node or edge IDs or type-ID pairs.
Notes
-----
For the concept of MFGs, please refer to
:ref:`User Guide Section 6 <guide-minibatch>` and
:doc:`Minibatch Training Tutorials <tutorials/large/L0_neighbor_sampling_overview>`.
"""
raise NotImplementedError
class NodeCollator(Collator):
"""DGL collator to combine nodes and their computation dependencies within a minibatch for
training node classification or regression on a single graph with neighborhood sampling.
Parameters
----------
g : DGLGraph
The graph.
nids : Tensor or dict[ntype, Tensor]
The node set to compute outputs.
graph_sampler : dgl.dataloading.BlockSampler
The neighborhood sampler.
Examples
--------
To train a 3-layer GNN for node classification on a set of nodes ``train_nid`` on
a homogeneous graph where each node takes messages from all neighbors (assume
the backend is PyTorch):
>>> sampler = dgl.dataloading.MultiLayerNeighborSampler([15, 10, 5])
>>> collator = dgl.dataloading.NodeCollator(g, train_nid, sampler)
>>> dataloader = torch.utils.data.DataLoader(
... collator.dataset, collate_fn=collator.collate,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, output_nodes, blocks in dataloader:
... train_on(input_nodes, output_nodes, blocks)
Notes
-----
For the concept of MFGs, please refer to
:ref:`User Guide Section 6 <guide-minibatch>` and
:doc:`Minibatch Training Tutorials <tutorials/large/L0_neighbor_sampling_overview>`.
""" """
def __init__(self, g, nids, graph_sampler): def __init__(self, sample_func, g):
self.sample_func = sample_func
self.g = g self.g = g
if not isinstance(nids, Mapping):
assert len(g.ntypes) == 1, \
"nids should be a dict of node type and ids for graph with multiple node types"
self.graph_sampler = graph_sampler
self.nids = utils.prepare_tensor_or_dict(g, nids, 'nids') def __call__(self, items):
self._dataset = utils.maybe_flatten_dict(self.nids) batch = self.sample_func(self.g, items)
return recursive_apply(batch, remove_parent_storage_columns, self.g)
@property
def dataset(self):
return self._dataset
def collate(self, items): class WorkerInitWrapper(object):
"""Find the list of MFGs necessary for computing the representation of given """Wraps the :attr:`worker_init_fn` argument of the DataLoader to set the number of DGL
nodes for a node classification/regression task. OMP threads to 1 for PyTorch DataLoader workers.
"""
Parameters def __init__(self, func):
---------- self.func = func
items : list[int] or list[tuple[str, int]]
Either a list of node IDs (for homogeneous graphs), or a list of node type-ID
pairs (for heterogeneous graphs).
Returns
-------
input_nodes : Tensor or dict[ntype, Tensor]
The input nodes necessary for computation in this minibatch.
If the original graph has multiple node types, return a dictionary of
node type names and node ID tensors. Otherwise, return a single tensor.
output_nodes : Tensor or dict[ntype, Tensor]
The nodes whose representations are to be computed in this minibatch.
If the original graph has multiple node types, return a dictionary of
node type names and node ID tensors. Otherwise, return a single tensor.
MFGs : list[DGLGraph]
The list of MFGs necessary for computing the representation.
"""
if isinstance(items[0], tuple):
# returns a list of pairs: group them by node types into a dict
items = utils.group_as_dict(items)
items = utils.prepare_tensor_or_dict(self.g, items, 'items')
input_nodes, output_nodes, blocks = self.graph_sampler.sample(self.g, items)
return input_nodes, output_nodes, blocks
class EdgeCollator(Collator):
"""DGL collator to combine edges and their computation dependencies within a minibatch for
training edge classification, edge regression, or link prediction on a single graph
with neighborhood sampling.
Given a set of edges, the collate function will yield
* A tensor of input nodes necessary for computing the representation on edges, or
a dictionary of node type names and such tensors.
* A subgraph that contains only the edges in the minibatch and their incident nodes.
Note that the graph has an identical metagraph with the original graph.
* If a negative sampler is given, another graph that contains the "negative edges",
connecting the source and destination nodes yielded from the given negative sampler.
* A list of MFGs necessary for computing the representation of the incident nodes
of the edges in the minibatch.
Parameters
----------
g : DGLGraph
The graph from which the edges are iterated in minibatches and the subgraphs
are generated.
eids : Tensor or dict[etype, Tensor]
The edge set in graph :attr:`g` to compute outputs.
graph_sampler : dgl.dataloading.BlockSampler
The neighborhood sampler.
g_sampling : DGLGraph, optional
The graph where neighborhood sampling and message passing is performed.
Note that this is not necessarily the same as :attr:`g`.
If None, assume to be the same as :attr:`g`.
exclude : str, optional
Whether and how to exclude dependencies related to the sampled edges in the
minibatch. Possible values are
* None, which excludes nothing.
* ``'self'``, which excludes the sampled edges themselves but nothing else.
* ``'reverse_id'``, which excludes the reverse edges of the sampled edges. The said
reverse edges have the same edge type as the sampled edges. Only works
on edge types whose source node type is the same as its destination node type.
* ``'reverse_types'``, which excludes the reverse edges of the sampled edges. The
said reverse edges have different edge types from the sampled edges.
If ``g_sampling`` is given, ``exclude`` is ignored and will be always ``None``.
reverse_eids : Tensor or dict[etype, Tensor], optional
A tensor of reverse edge ID mapping. The i-th element indicates the ID of
the i-th edge's reverse edge.
If the graph is heterogeneous, this argument requires a dictionary of edge
types and the reverse edge ID mapping tensors.
Required and only used when ``exclude`` is set to ``reverse_id``.
For heterogeneous graph this will be a dict of edge type and edge IDs. Note that
only the edge types whose source node type is the same as destination node type
are needed.
reverse_etypes : dict[etype, etype], optional
The mapping from the edge type to its reverse edge type.
Required and only used when ``exclude`` is set to ``reverse_types``.
negative_sampler : callable, optional
The negative sampler. Can be omitted if no negative sampling is needed.
The negative sampler must be a callable that takes in the following arguments: def __call__(self, worker_id):
set_num_threads(1)
if self.func is not None:
self.func(worker_id)
* The original (heterogeneous) graph.
* The ID array of sampled edges in the minibatch, or the dictionary of edge def create_tensorized_dataset(indices, batch_size, drop_last, use_ddp, ddp_seed):
types and ID array of sampled edges in the minibatch if the graph is """Converts a given indices tensor to a TensorizedDataset, an IterableDataset
heterogeneous. that returns views of the original tensor, to reduce overhead from having
a list of scalar tensors in default PyTorch DataLoader implementation.
"""
if use_ddp:
return DDPTensorizedDataset(indices, batch_size, drop_last, ddp_seed)
else:
return TensorizedDataset(indices, batch_size, drop_last)
It should return
* A pair of source and destination node ID arrays as negative samples, class DataLoader(torch.utils.data.DataLoader):
or a dictionary of edge types and such pairs if the graph is heterogenenous. """DataLoader class."""
def __init__(self, graph, indices, graph_sampler, device='cpu', use_ddp=False,
ddp_seed=0, batch_size=1, drop_last=False, shuffle=False,
use_prefetch_thread=False, use_alternate_streams=True, **kwargs):
self.graph = graph
A set of builtin negative samplers are provided in try:
:ref:`the negative sampling module <api-dataloading-negative-sampling>`. if isinstance(indices, Mapping):
indices = {k: (torch.tensor(v) if not torch.is_tensor(v) else v)
for k, v in indices.items()}
else:
indices = torch.tensor(indices) if not torch.is_tensor(indices) else indices
except: # pylint: disable=bare-except
# ignore when it fails to convert to torch Tensors.
pass
if (torch.is_tensor(indices) or (
isinstance(indices, Mapping) and
all(torch.is_tensor(v) for v in indices.values()))):
self.dataset = create_tensorized_dataset(
indices, batch_size, drop_last, use_ddp, ddp_seed)
else:
self.dataset = indices
Examples self.ddp_seed = ddp_seed
-------- self._shuffle_dataset = shuffle
The following example shows how to train a 3-layer GNN for edge classification on a
set of edges ``train_eid`` on a homogeneous undirected graph. Each node takes
messages from all neighbors.
Say that you have an array of source node IDs ``src`` and another array of destination
node IDs ``dst``. One can make it bidirectional by adding another set of edges
that connects from ``dst`` to ``src``:
>>> g = dgl.graph((torch.cat([src, dst]), torch.cat([dst, src])))
One can then know that the ID difference of an edge and its reverse edge is ``|E|``,
where ``|E|`` is the length of your source/destination array. The reverse edge
mapping can be obtained by
>>> E = len(src)
>>> reverse_eids = torch.cat([torch.arange(E, 2 * E), torch.arange(0, E)])
Note that the sampled edges as well as their reverse edges are removed from
computation dependencies of the incident nodes. This is a common trick to avoid
information leakage.
>>> sampler = dgl.dataloading.MultiLayerNeighborSampler([15, 10, 5])
>>> collator = dgl.dataloading.EdgeCollator(
... g, train_eid, sampler, exclude='reverse_id',
... reverse_eids=reverse_eids)
>>> dataloader = torch.utils.data.DataLoader(
... collator.dataset, collate_fn=collator.collate,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, pair_graph, blocks in dataloader:
... train_on(input_nodes, pair_graph, blocks)
To train a 3-layer GNN for link prediction on a set of edges ``train_eid`` on a
homogeneous graph where each node takes messages from all neighbors (assume the
backend is PyTorch), with 5 uniformly chosen negative samples per edge:
>>> sampler = dgl.dataloading.MultiLayerNeighborSampler([15, 10, 5])
>>> neg_sampler = dgl.dataloading.negative_sampler.Uniform(5)
>>> collator = dgl.dataloading.EdgeCollator(
... g, train_eid, sampler, exclude='reverse_id',
... reverse_eids=reverse_eids, negative_sampler=neg_sampler)
>>> dataloader = torch.utils.data.DataLoader(
... collator.dataset, collate_fn=collator.collate,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, pos_pair_graph, neg_pair_graph, blocks in dataloader:
... train_on(input_nodse, pair_graph, neg_pair_graph, blocks)
For heterogeneous graphs, the reverse of an edge may have a different edge type
from the original edge. For instance, consider that you have an array of
user-item clicks, representated by a user array ``user`` and an item array ``item``.
You may want to build a heterogeneous graph with a user-click-item relation and an
item-clicked-by-user relation.
>>> g = dgl.heterograph({
... ('user', 'click', 'item'): (user, item),
... ('item', 'clicked-by', 'user'): (item, user)})
To train a 3-layer GNN for edge classification on a set of edges ``train_eid`` with
type ``click``, you can write
>>> sampler = dgl.dataloading.MultiLayerNeighborSampler([15, 10, 5])
>>> collator = dgl.dataloading.EdgeCollator(
... g, {'click': train_eid}, sampler, exclude='reverse_types',
... reverse_etypes={'click': 'clicked-by', 'clicked-by': 'click'})
>>> dataloader = torch.utils.data.DataLoader(
... collator.dataset, collate_fn=collator.collate,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, pair_graph, blocks in dataloader:
... train_on(input_nodes, pair_graph, blocks)
To train a 3-layer GNN for link prediction on a set of edges ``train_eid`` with type
``click``, you can write
>>> sampler = dgl.dataloading.MultiLayerNeighborSampler([15, 10, 5])
>>> neg_sampler = dgl.dataloading.negative_sampler.Uniform(5)
>>> collator = dgl.dataloading.EdgeCollator(
... g, train_eid, sampler, exclude='reverse_types',
... reverse_etypes={'click': 'clicked-by', 'clicked-by': 'click'},
... negative_sampler=neg_sampler)
>>> dataloader = torch.utils.data.DataLoader(
... collator.dataset, collate_fn=collator.collate,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, pos_pair_graph, neg_pair_graph, blocks in dataloader:
... train_on(input_nodes, pair_graph, neg_pair_graph, blocks)
Notes
-----
For the concept of MFGs, please refer to
:ref:`User Guide Section 6 <guide-minibatch>` and
:doc:`Minibatch Training Tutorials <tutorials/large/L0_neighbor_sampling_overview>`.
"""
def __init__(self, g, eids, graph_sampler, g_sampling=None, exclude=None,
reverse_eids=None, reverse_etypes=None, negative_sampler=None):
self.g = g
if not isinstance(eids, Mapping):
assert len(g.etypes) == 1, \
"eids should be a dict of etype and ids for graph with multiple etypes"
self.graph_sampler = graph_sampler self.graph_sampler = graph_sampler
self.device = torch.device(device)
# One may wish to iterate over the edges in one graph while perform sampling in self.use_alternate_streams = use_alternate_streams
# another graph. This may be the case for iterating over validation and test if self.device.type == 'cuda' and self.device.index is None:
# edge set while perform neighborhood sampling on the graph formed by only self.device = torch.device('cuda', torch.cuda.current_device())
# the training edge set. self.use_prefetch_thread = use_prefetch_thread
# See GCMC for an example usage. worker_init_fn = WorkerInitWrapper(kwargs.get('worker_init_fn', None))
# Instantiate all the formats if the number of workers is greater than 0.
if kwargs.get('num_workers', 0) > 0 and hasattr(self.graph, 'create_formats_'):
self.graph.create_formats_()
self.other_storages = {}
super().__init__(
self.dataset,
collate_fn=CollateWrapper(self.graph_sampler.sample, graph),
batch_size=None,
worker_init_fn=worker_init_fn,
**kwargs)
def __iter__(self):
if self._shuffle_dataset:
self.dataset.shuffle()
# When using multiprocessing PyTorch sometimes set the number of PyTorch threads to 1
# when spawning new Python threads. This drastically slows down pinning features.
num_threads = torch.get_num_threads() if self.num_workers > 0 else None
return _PrefetchingIter(
self, super().__iter__(), use_thread=self.use_prefetch_thread,
use_alternate_streams=self.use_alternate_streams, num_threads=num_threads)
# To allow data other than node/edge data to be prefetched.
def attach_data(self, name, data):
"""Add a data other than node and edge features for prefetching."""
self.other_storages[name] = wrap_storage(data)
# Alias
class NodeDataLoader(DataLoader):
"""NodeDataLoader class."""
class EdgeDataLoader(DataLoader):
"""EdgeDataLoader class."""
def __init__(self, graph, indices, graph_sampler, device='cpu', use_ddp=False,
ddp_seed=0, batch_size=1, drop_last=False, shuffle=False,
use_prefetch_thread=False, use_alternate_streams=True,
exclude=None, reverse_eids=None, reverse_etypes=None, negative_sampler=None,
g_sampling=None, **kwargs):
if g_sampling is not None: if g_sampling is not None:
self.g_sampling = g_sampling dgl_warning(
self.exclude = None "g_sampling is deprecated. "
else: "Please merge g_sampling and the original graph into one graph and use "
self.g_sampling = self.g "the exclude argument to specify which edges you don't want to sample.")
self.exclude = exclude if isinstance(graph_sampler, BlockSampler):
graph_sampler = EdgeBlockSampler(
self.reverse_eids = reverse_eids graph_sampler, exclude=exclude, reverse_eids=reverse_eids,
self.reverse_etypes = reverse_etypes reverse_etypes=reverse_etypes, negative_sampler=negative_sampler)
self.negative_sampler = negative_sampler
super().__init__(
self.eids = utils.prepare_tensor_or_dict(g, eids, 'eids') graph, indices, graph_sampler, device=device, use_ddp=use_ddp, ddp_seed=ddp_seed,
self._dataset = utils.maybe_flatten_dict(self.eids) batch_size=batch_size, drop_last=drop_last, shuffle=shuffle,
use_prefetch_thread=use_prefetch_thread, use_alternate_streams=use_alternate_streams,
@property **kwargs)
def dataset(self):
return self._dataset
######## Graph DataLoaders ########
def _collate(self, items): # GraphDataLoader loads a set of graphs so it's not relevant to the above. They are currently
if isinstance(items[0], tuple): # copied from the old DataLoader implementation.
# returns a list of pairs: group them by node types into a dict
items = utils.group_as_dict(items) PYTORCH_VER = LooseVersion(torch.__version__)
items = utils.prepare_tensor_or_dict(self.g_sampling, items, 'items') PYTORCH_16 = PYTORCH_VER >= LooseVersion("1.6.0")
PYTORCH_17 = PYTORCH_VER >= LooseVersion("1.7.0")
pair_graph = self.g.edge_subgraph(items)
seed_nodes = pair_graph.ndata[NID] def _create_dist_sampler(dataset, dataloader_kwargs, ddp_seed):
# Note: will change the content of dataloader_kwargs
exclude_eids = _find_exclude_eids( dist_sampler_kwargs = {'shuffle': dataloader_kwargs['shuffle']}
self.g_sampling, dataloader_kwargs['shuffle'] = False
self.exclude, if PYTORCH_16:
items, dist_sampler_kwargs['seed'] = ddp_seed
reverse_eid_map=self.reverse_eids, if PYTORCH_17:
reverse_etype_map=self.reverse_etypes) dist_sampler_kwargs['drop_last'] = dataloader_kwargs['drop_last']
dataloader_kwargs['drop_last'] = False
input_nodes, _, blocks = self.graph_sampler.sample(
self.g_sampling, seed_nodes, exclude_eids=exclude_eids) return DistributedSampler(dataset, **dist_sampler_kwargs)
return input_nodes, pair_graph, blocks
def _collate_with_negative_sampling(self, items):
if isinstance(items[0], tuple):
# returns a list of pairs: group them by node types into a dict
items = utils.group_as_dict(items)
items = utils.prepare_tensor_or_dict(self.g_sampling, items, 'items')
pair_graph = self.g.edge_subgraph(items, relabel_nodes=False)
induced_edges = pair_graph.edata[EID]
neg_srcdst = self.negative_sampler(self.g, items)
if not isinstance(neg_srcdst, Mapping):
assert len(self.g.etypes) == 1, \
'graph has multiple or no edge types; '\
'please return a dict in negative sampler.'
neg_srcdst = {self.g.canonical_etypes[0]: neg_srcdst}
# Get dtype from a tuple of tensors
dtype = F.dtype(list(neg_srcdst.values())[0][0])
ctx = F.context(pair_graph)
neg_edges = {
etype: neg_srcdst.get(etype, (F.copy_to(F.tensor([], dtype), ctx),
F.copy_to(F.tensor([], dtype), ctx)))
for etype in self.g.canonical_etypes}
neg_pair_graph = heterograph(
neg_edges, {ntype: self.g.number_of_nodes(ntype) for ntype in self.g.ntypes})
pair_graph, neg_pair_graph = transform.compact_graphs([pair_graph, neg_pair_graph])
pair_graph.edata[EID] = induced_edges
seed_nodes = pair_graph.ndata[NID]
exclude_eids = _find_exclude_eids(
self.g_sampling,
self.exclude,
items,
reverse_eid_map=self.reverse_eids,
reverse_etype_map=self.reverse_etypes)
input_nodes, _, blocks = self.graph_sampler.sample(
self.g_sampling, seed_nodes, exclude_eids=exclude_eids)
return input_nodes, pair_graph, neg_pair_graph, blocks
def collate(self, items):
"""Combines the sampled edges into a minibatch for edge classification, edge
regression, and link prediction tasks.
Parameters
----------
items : list[int] or list[tuple[str, int]]
Either a list of edge IDs (for homogeneous graphs), or a list of edge type-ID
pairs (for heterogeneous graphs).
Returns
-------
Either ``(input_nodes, pair_graph, blocks)``, or
``(input_nodes, pair_graph, negative_pair_graph, blocks)`` if negative sampling is
enabled.
input_nodes : Tensor or dict[ntype, Tensor]
The input nodes necessary for computation in this minibatch.
If the original graph has multiple node types, return a dictionary of
node type names and node ID tensors. Otherwise, return a single tensor.
pair_graph : DGLGraph
The graph that contains only the edges in the minibatch as well as their incident
nodes.
Note that the metagraph of this graph will be identical to that of the original
graph.
negative_pair_graph : DGLGraph
The graph that contains only the edges connecting the source and destination nodes
yielded from the given negative sampler, if negative sampling is enabled.
Note that the metagraph of this graph will be identical to that of the original
graph.
blocks : list[DGLGraph]
The list of MFGs necessary for computing the representation of the edges.
"""
if self.negative_sampler is None:
return self._collate(items)
else:
return self._collate_with_negative_sampling(items)
class GraphCollator(object): class GraphCollator(object):
"""Given a set of graphs as well as their graph-level data, the collate function will batch the """Given a set of graphs as well as their graph-level data, the collate function will batch the
...@@ -939,8 +641,8 @@ class GraphCollator(object): ...@@ -939,8 +641,8 @@ class GraphCollator(object):
""" """
elem = items[0] elem = items[0]
elem_type = type(elem) elem_type = type(elem)
if isinstance(elem, DGLGraph): if isinstance(elem, DGLHeteroGraph):
batched_graphs = batch(items) batched_graphs = batch_graphs(items)
return batched_graphs return batched_graphs
elif F.is_tensor(elem): elif F.is_tensor(elem):
return F.stack(items, 0) return F.stack(items, 0)
...@@ -975,8 +677,89 @@ class GraphCollator(object): ...@@ -975,8 +677,89 @@ class GraphCollator(object):
raise TypeError(self.graph_collate_err_msg_format.format(elem_type)) raise TypeError(self.graph_collate_err_msg_format.format(elem_type))
class SubgraphIterator(object): class GraphDataLoader(torch.utils.data.DataLoader):
"""Abstract class representing an iterator that yields a subgraph given a graph. """PyTorch dataloader for batch-iterating over a set of graphs, generating the batched
graph and corresponding label tensor (if provided) of the said minibatch.
Parameters
----------
collate_fn : Function, default is None
The customized collate function. Will use the default collate
function if not given.
use_ddp : boolean, optional
If True, tells the DataLoader to split the training set for each
participating process appropriately using
:class:`torch.utils.data.distributed.DistributedSampler`.
Overrides the :attr:`sampler` argument of :class:`torch.utils.data.DataLoader`.
ddp_seed : int, optional
The seed for shuffling the dataset in
:class:`torch.utils.data.distributed.DistributedSampler`.
Only effective when :attr:`use_ddp` is True.
kwargs : dict
Arguments being passed to :py:class:`torch.utils.data.DataLoader`.
Examples
--------
To train a GNN for graph classification on a set of graphs in ``dataset`` (assume
the backend is PyTorch):
>>> dataloader = dgl.dataloading.GraphDataLoader(
... dataset, batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for batched_graph, labels in dataloader:
... train_on(batched_graph, labels)
**Using with Distributed Data Parallel**
If you are using PyTorch's distributed training (e.g. when using
:mod:`torch.nn.parallel.DistributedDataParallel`), you can train the model by
turning on the :attr:`use_ddp` option:
>>> dataloader = dgl.dataloading.GraphDataLoader(
... dataset, use_ddp=True, batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for epoch in range(start_epoch, n_epochs):
... dataloader.set_epoch(epoch)
... for batched_graph, labels in dataloader:
... train_on(batched_graph, labels)
""" """
def __init__(self, g): collator_arglist = inspect.getfullargspec(GraphCollator).args
self.g = g
def __init__(self, dataset, collate_fn=None, use_ddp=False, ddp_seed=0, **kwargs):
collator_kwargs = {}
dataloader_kwargs = {}
for k, v in kwargs.items():
if k in self.collator_arglist:
collator_kwargs[k] = v
else:
dataloader_kwargs[k] = v
if collate_fn is None:
self.collate = GraphCollator(**collator_kwargs).collate
else:
self.collate = collate_fn
self.use_ddp = use_ddp
if use_ddp:
self.dist_sampler = _create_dist_sampler(dataset, dataloader_kwargs, ddp_seed)
dataloader_kwargs['sampler'] = self.dist_sampler
super().__init__(dataset=dataset, collate_fn=self.collate, **dataloader_kwargs)
def set_epoch(self, epoch):
"""Sets the epoch number for the underlying sampler which ensures all replicas
to use a different ordering for each epoch.
Only available when :attr:`use_ddp` is True.
Calls :meth:`torch.utils.data.distributed.DistributedSampler.set_epoch`.
Parameters
----------
epoch : int
The epoch number.
"""
if self.use_ddp:
self.dist_sampler.set_epoch(epoch)
else:
raise DGLError('set_epoch is only available when use_ddp is True.')
"""Distributed dataloaders.
"""
import inspect
from ..distributed import DistDataLoader
# Still depends on the legacy NodeCollator...
from .._dataloading.dataloader import NodeCollator, EdgeCollator
def _remove_kwargs_dist(kwargs):
if 'num_workers' in kwargs:
del kwargs['num_workers']
if 'pin_memory' in kwargs:
del kwargs['pin_memory']
print('Distributed DataLoaders do not support pin_memory.')
return kwargs
class DistNodeDataLoader(DistDataLoader):
"""PyTorch dataloader for batch-iterating over a set of nodes, generating the list
of message flow graphs (MFGs) as computation dependency of the said minibatch, on
a distributed graph.
All the arguments have the same meaning as the single-machine counterpart
:class:`dgl.dataloading.pytorch.NodeDataLoader` except the first argument
:attr:`g` which must be a :class:`dgl.distributed.DistGraph`.
Parameters
----------
g : DistGraph
The distributed graph.
nids, graph_sampler, device, kwargs :
See :class:`dgl.dataloading.pytorch.NodeDataLoader`.
See also
--------
dgl.dataloading.pytorch.NodeDataLoader
"""
def __init__(self, g, nids, graph_sampler, device=None, **kwargs):
collator_kwargs = {}
dataloader_kwargs = {}
_collator_arglist = inspect.getfullargspec(NodeCollator).args
for k, v in kwargs.items():
if k in _collator_arglist:
collator_kwargs[k] = v
else:
dataloader_kwargs[k] = v
if device is None:
# for the distributed case default to the CPU
device = 'cpu'
assert device == 'cpu', 'Only cpu is supported in the case of a DistGraph.'
# Distributed DataLoader currently does not support heterogeneous graphs
# and does not copy features. Fallback to normal solution
self.collator = NodeCollator(g, nids, graph_sampler, **collator_kwargs)
_remove_kwargs_dist(dataloader_kwargs)
super().__init__(self.collator.dataset,
collate_fn=self.collator.collate,
**dataloader_kwargs)
self.device = device
class DistEdgeDataLoader(DistDataLoader):
"""PyTorch dataloader for batch-iterating over a set of edges, generating the list
of message flow graphs (MFGs) as computation dependency of the said minibatch for
edge classification, edge regression, and link prediction, on a distributed
graph.
All the arguments have the same meaning as the single-machine counterpart
:class:`dgl.dataloading.pytorch.EdgeDataLoader` except the first argument
:attr:`g` which must be a :class:`dgl.distributed.DistGraph`.
Parameters
----------
g : DistGraph
The distributed graph.
eids, graph_sampler, device, kwargs :
See :class:`dgl.dataloading.pytorch.EdgeDataLoader`.
See also
--------
dgl.dataloading.pytorch.EdgeDataLoader
"""
def __init__(self, g, eids, graph_sampler, device=None, **kwargs):
collator_kwargs = {}
dataloader_kwargs = {}
_collator_arglist = inspect.getfullargspec(EdgeCollator).args
for k, v in kwargs.items():
if k in _collator_arglist:
collator_kwargs[k] = v
else:
dataloader_kwargs[k] = v
if device is None:
# for the distributed case default to the CPU
device = 'cpu'
assert device == 'cpu', 'Only cpu is supported in the case of a DistGraph.'
# Distributed DataLoader currently does not support heterogeneous graphs
# and does not copy features. Fallback to normal solution
self.collator = EdgeCollator(g, eids, graph_sampler, **collator_kwargs)
_remove_kwargs_dist(dataloader_kwargs)
super().__init__(self.collator.dataset,
collate_fn=self.collator.collate,
**dataloader_kwargs)
self.device = device
"""Negative samplers""" """Negative samplers"""
from collections.abc import Mapping from collections.abc import Mapping
from .. import backend as F from .. import backend as F
from ..sampling import global_uniform_negative_sampling
class _BaseNegativeSampler(object): class _BaseNegativeSampler(object):
def _generate(self, g, eids, canonical_etype): def _generate(self, g, eids, canonical_etype):
...@@ -26,7 +25,7 @@ class _BaseNegativeSampler(object): ...@@ -26,7 +25,7 @@ class _BaseNegativeSampler(object):
eids = {g.to_canonical_etype(k): v for k, v in eids.items()} eids = {g.to_canonical_etype(k): v for k, v in eids.items()}
neg_pair = {k: self._generate(g, v, k) for k, v in eids.items()} neg_pair = {k: self._generate(g, v, k) for k, v in eids.items()}
else: else:
assert len(g.etypes) == 1, \ assert len(g.canonical_etypes) == 1, \
'please specify a dict of etypes and ids for graphs with multiple edge types' 'please specify a dict of etypes and ids for graphs with multiple edge types'
neg_pair = self._generate(g, eids, g.canonical_etypes[0]) neg_pair = self._generate(g, eids, g.canonical_etypes[0])
...@@ -64,7 +63,7 @@ class PerSourceUniform(_BaseNegativeSampler): ...@@ -64,7 +63,7 @@ class PerSourceUniform(_BaseNegativeSampler):
shape = (shape[0] * self.k,) shape = (shape[0] * self.k,)
src, _ = g.find_edges(eids, etype=canonical_etype) src, _ = g.find_edges(eids, etype=canonical_etype)
src = F.repeat(src, self.k, 0) src = F.repeat(src, self.k, 0)
dst = F.randint(shape, dtype, ctx, 0, g.number_of_nodes(vtype)) dst = F.randint(shape, dtype, ctx, 0, g.num_nodes(vtype))
return src, dst return src, dst
# Alias # Alias
...@@ -90,14 +89,6 @@ class GlobalUniform(_BaseNegativeSampler): ...@@ -90,14 +89,6 @@ class GlobalUniform(_BaseNegativeSampler):
replace : bool, optional replace : bool, optional
Whether to sample with replacement. Setting it to True will make things Whether to sample with replacement. Setting it to True will make things
faster. (Default: True) faster. (Default: True)
redundancy : float, optional
Indicates how much more negative samples to actually generate during rejection sampling
before finding the unique pairs.
Increasing it will increase the likelihood of getting :attr:`k` negative samples
per edge, but will also take more time and memory.
(Default: automatically determined by the density of graph)
Notes Notes
----- -----
...@@ -113,13 +104,11 @@ class GlobalUniform(_BaseNegativeSampler): ...@@ -113,13 +104,11 @@ class GlobalUniform(_BaseNegativeSampler):
>>> neg_sampler(g, torch.LongTensor([0, 1])) >>> neg_sampler(g, torch.LongTensor([0, 1]))
(tensor([0, 1, 3, 2]), tensor([2, 0, 2, 1])) (tensor([0, 1, 3, 2]), tensor([2, 0, 2, 1]))
""" """
def __init__(self, k, exclude_self_loops=True, replace=False, redundancy=None): def __init__(self, k, exclude_self_loops=True, replace=False):
self.k = k self.k = k
self.exclude_self_loops = exclude_self_loops self.exclude_self_loops = exclude_self_loops
self.replace = replace self.replace = replace
self.redundancy = redundancy
def _generate(self, g, eids, canonical_etype): def _generate(self, g, eids, canonical_etype):
return global_uniform_negative_sampling( return g.global_uniform_negative_sampling(
g, len(eids) * self.k, self.exclude_self_loops, self.replace, len(eids) * self.k, self.exclude_self_loops, self.replace, canonical_etype)
canonical_etype, self.redundancy)
"""Data loading components for neighbor sampling"""
from ..base import NID, EID
from ..transform import to_block
from .base import BlockSampler
class NeighborSampler(BlockSampler):
"""Sampler that builds computational dependency of node representations via
neighbor sampling for multilayer GNN.
This sampler will make every node gather messages from a fixed number of neighbors
per edge type. The neighbors are picked uniformly.
Parameters
----------
fanouts : list[int] or list[dict[etype, int]]
List of neighbors to sample per edge type for each GNN layer, with the i-th
element being the fanout for the i-th GNN layer.
If only a single integer is provided, DGL assumes that every edge type
will have the same fanout.
If -1 is provided for one edge type on one layer, then all inbound edges
of that edge type will be included.
replace : bool, default False
Whether to sample with replacement
prob : str, optional
If given, the probability of each neighbor being sampled is proportional
to the edge feature value with the given name in ``g.edata``. The feature must be
a scalar on each edge.
Examples
--------
To train a 3-layer GNN for node classification on a set of nodes ``train_nid`` on
a homogeneous graph where each node takes messages from 5, 10, 15 neighbors for
the first, second, and third layer respectively (assuming the backend is PyTorch):
>>> sampler = dgl.dataloading.NeighborSampler([5, 10, 15])
>>> dataloader = dgl.dataloading.NodeDataLoader(
... g, train_nid, sampler,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, output_nodes, blocks in dataloader:
... train_on(blocks)
If training on a heterogeneous graph and you want different number of neighbors for each
edge type, one should instead provide a list of dicts. Each dict would specify the
number of neighbors to pick per edge type.
>>> sampler = dgl.dataloading.NeighborSampler([
... {('user', 'follows', 'user'): 5,
... ('user', 'plays', 'game'): 4,
... ('game', 'played-by', 'user'): 3}] * 3)
If you would like non-uniform neighbor sampling:
>>> g.edata['p'] = torch.rand(g.num_edges()) # any non-negative 1D vector works
>>> sampler = dgl.dataloading.NeighborSampler([5, 10, 15], prob='p')
Notes
-----
For the concept of MFGs, please refer to
:ref:`User Guide Section 6 <guide-minibatch>` and
:doc:`Minibatch Training Tutorials <tutorials/large/L0_neighbor_sampling_overview>`.
"""
def __init__(self, fanouts, edge_dir='in', prob=None, replace=False, **kwargs):
super().__init__(**kwargs)
self.fanouts = fanouts
self.edge_dir = edge_dir
self.prob = prob
self.replace = replace
def sample_blocks(self, g, seed_nodes, exclude_eids=None):
output_nodes = seed_nodes
blocks = []
for fanout in reversed(self.fanouts):
frontier = g.sample_neighbors(
seed_nodes, fanout, edge_dir=self.edge_dir, prob=self.prob,
replace=self.replace, output_device=self.output_device,
exclude_edges=exclude_eids)
eid = frontier.edata[EID]
block = to_block(frontier, seed_nodes)
block.edata[EID] = eid
seed_nodes = block.srcdata[NID]
blocks.insert(0, block)
return seed_nodes, output_nodes, blocks
MultiLayerNeighborSampler = NeighborSampler
class MultiLayerFullNeighborSampler(NeighborSampler):
"""Sampler that builds computational dependency of node representations by taking messages
from all neighbors for multilayer GNN.
This sampler will make every node gather messages from every single neighbor per edge type.
Parameters
----------
n_layers : int
The number of GNN layers to sample.
return_eids : bool, default False
Whether to return the edge IDs involved in message passing in the MFG.
If True, the edge IDs will be stored as an edge feature named ``dgl.EID``.
Examples
--------
To train a 3-layer GNN for node classification on a set of nodes ``train_nid`` on
a homogeneous graph where each node takes messages from all neighbors for the first,
second, and third layer respectively (assuming the backend is PyTorch):
>>> sampler = dgl.dataloading.MultiLayerFullNeighborSampler(3)
>>> dataloader = dgl.dataloading.NodeDataLoader(
... g, train_nid, sampler,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, output_nodes, blocks in dataloader:
... train_on(blocks)
Notes
-----
For the concept of MFGs, please refer to
:ref:`User Guide Section 6 <guide-minibatch>` and
:doc:`Minibatch Training Tutorials <tutorials/large/L0_neighbor_sampling_overview>`.
"""
def __init__(self, num_layers, edge_dir='in', prob=None, replace=False, **kwargs):
super().__init__([-1] * num_layers, edge_dir=edge_dir, prob=prob, replace=replace,
**kwargs)
"""ShaDow-GNN subgraph samplers.""" """ShaDow-GNN subgraph samplers."""
from ..utils import prepare_tensor_or_dict from ..sampling.utils import EidExcluder
from ..base import NID
from .. import transform from .. import transform
from ..sampling import sample_neighbors from ..base import NID
from .neighbor import NeighborSamplingMixin from .base import set_node_lazy_features, set_edge_lazy_features
from .dataloader import exclude_edges, Sampler
class ShaDowKHopSampler(NeighborSamplingMixin, Sampler): class ShaDowKHopSampler(object):
"""K-hop subgraph sampler used by """K-hop subgraph sampler used by
`ShaDow-GNN <https://arxiv.org/abs/2012.01380>`__. `ShaDow-GNN <https://arxiv.org/abs/2012.01380>`__.
...@@ -70,29 +68,32 @@ class ShaDowKHopSampler(NeighborSamplingMixin, Sampler): ...@@ -70,29 +68,32 @@ class ShaDowKHopSampler(NeighborSamplingMixin, Sampler):
If you would like non-uniform neighbor sampling: If you would like non-uniform neighbor sampling:
>>> g.edata['p'] = torch.rand(g.num_edges()) # any non-negative 1D vector works >>> g.edata['p'] = torch.rand(g.num_edges()) # any non-negative 1D vector works
>>> sampler = dgl.dataloading.MultiLayerNeighborSampler([5, 10, 15], prob='p') >>> sampler = dgl.dataloading.ShaDowKHopSampler([5, 10, 15], prob='p')
""" """
def __init__(self, fanouts, replace=False, prob=None, output_ctx=None): def __init__(self, fanouts, replace=False, prob=None, prefetch_node_feats=None,
super().__init__(output_ctx) prefetch_edge_feats=None, output_device=None):
self.fanouts = fanouts self.fanouts = fanouts
self.replace = replace self.replace = replace
self.prob = prob self.prob = prob
self.set_output_context(output_ctx) self.prefetch_node_feats = prefetch_node_feats
self.prefetch_edge_feats = prefetch_edge_feats
self.output_device = output_device
def sample(self, g, seed_nodes, exclude_eids=None): def sample(self, g, seed_nodes, exclude_edges=None):
self._build_fanout(len(self.fanouts), g) """Sample a subgraph given a tensor of seed nodes."""
self._build_prob_arrays(g)
seed_nodes = prepare_tensor_or_dict(g, seed_nodes, 'seed nodes')
output_nodes = seed_nodes output_nodes = seed_nodes
for fanout in reversed(self.fanouts):
for i in range(len(self.fanouts)): frontier = g.sample_neighbors(
fanout = self.fanouts[i] seed_nodes, fanout, output_device=self.output_device,
frontier = sample_neighbors( replace=self.replace, prob=self.prob, exclude_edges=exclude_edges)
g, seed_nodes, fanout, replace=self.replace, prob=self.prob_arrays)
block = transform.to_block(frontier, seed_nodes) block = transform.to_block(frontier, seed_nodes)
seed_nodes = block.srcdata[NID] seed_nodes = block.srcdata[NID]
subg = g.subgraph(seed_nodes, relabel_nodes=True) subg = g.subgraph(seed_nodes, relabel_nodes=True, output_device=self.output_device)
subg = exclude_edges(subg, exclude_eids, self.output_device) if exclude_edges is not None:
subg = EidExcluder(exclude_edges)(subg)
set_node_lazy_features(subg, self.prefetch_node_feats)
set_edge_lazy_features(subg, self.prefetch_edge_feats)
return seed_nodes, output_nodes, [subg] return seed_nodes, output_nodes, subg
...@@ -26,6 +26,7 @@ from . import rpc ...@@ -26,6 +26,7 @@ from . import rpc
from . import role from . import role
from .server_state import ServerState from .server_state import ServerState
from .rpc_server import start_server from .rpc_server import start_server
from . import graph_services
from .graph_services import find_edges as dist_find_edges from .graph_services import find_edges as dist_find_edges
from .graph_services import out_degrees as dist_out_degrees from .graph_services import out_degrees as dist_out_degrees
from .graph_services import in_degrees as dist_in_degrees from .graph_services import in_degrees as dist_in_degrees
...@@ -1223,6 +1224,20 @@ class DistGraph: ...@@ -1223,6 +1224,20 @@ class DistGraph:
''' '''
self._client.barrier() self._client.barrier()
def sample_neighbors(self, seed_nodes, fanout, edge_dir='in', prob=None,
exclude_edges=None, replace=False,
output_device=None):
# pylint: disable=unused-argument
"""Sample neighbors from a distributed graph."""
# Currently prob, exclude_edges, output_device, and edge_dir are ignored.
if len(self.etypes) > 1:
frontier = graph_services.sample_etype_neighbors(
self, seed_nodes, ETYPE, fanout, replace=replace)
else:
frontier = graph_services.sample_neighbors(
self, seed_nodes, fanout, replace=replace)
return frontier
def _get_ndata_names(self, ntype=None): def _get_ndata_names(self, ntype=None):
''' Get the names of all node data. ''' Get the names of all node data.
''' '''
......
...@@ -7,6 +7,7 @@ from collections.abc import MutableMapping ...@@ -7,6 +7,7 @@ from collections.abc import MutableMapping
from . import backend as F from . import backend as F
from .base import DGLError, dgl_warning from .base import DGLError, dgl_warning
from .init import zero_initializer from .init import zero_initializer
from .storages import TensorStorage
class _LazyIndex(object): class _LazyIndex(object):
def __init__(self, index): def __init__(self, index):
...@@ -38,6 +39,23 @@ class _LazyIndex(object): ...@@ -38,6 +39,23 @@ class _LazyIndex(object):
flat_index = F.gather_row(flat_index, index) flat_index = F.gather_row(flat_index, index)
return flat_index return flat_index
class LazyFeature(object):
"""Placeholder for prefetching from DataLoader.
"""
__slots__ = ['name', 'id_']
def __init__(self, name=None, id_=None):
self.name = name
self.id_ = id_
def to(self, *args, **kwargs): # pylint: disable=invalid-name, unused-argument
"""No-op. For compatibility of :meth:`Frame.to` method."""
return self
@property
def data(self):
"""No-op. For compatibility of :meth:`Frame.__repr__` method."""
return self
class Scheme(namedtuple('Scheme', ['shape', 'dtype'])): class Scheme(namedtuple('Scheme', ['shape', 'dtype'])):
"""The column scheme. """The column scheme.
...@@ -77,7 +95,7 @@ def infer_scheme(tensor): ...@@ -77,7 +95,7 @@ def infer_scheme(tensor):
""" """
return Scheme(tuple(F.shape(tensor)[1:]), F.dtype(tensor)) return Scheme(tuple(F.shape(tensor)[1:]), F.dtype(tensor))
class Column(object): class Column(TensorStorage):
"""A column is a compact store of features of multiple nodes/edges. """A column is a compact store of features of multiple nodes/edges.
It batches all the feature tensors together along the first dimension It batches all the feature tensors together along the first dimension
...@@ -120,7 +138,7 @@ class Column(object): ...@@ -120,7 +138,7 @@ class Column(object):
Index tensor Index tensor
""" """
def __init__(self, storage, scheme=None, index=None, device=None): def __init__(self, storage, scheme=None, index=None, device=None):
self.storage = storage super().__init__(storage)
self.scheme = scheme if scheme else infer_scheme(storage) self.scheme = scheme if scheme else infer_scheme(storage)
self.index = index self.index = index
self.device = device self.device = device
...@@ -336,10 +354,13 @@ class Frame(MutableMapping): ...@@ -336,10 +354,13 @@ class Frame(MutableMapping):
assert not isinstance(data, Frame) # sanity check for code refactor assert not isinstance(data, Frame) # sanity check for code refactor
# Note that we always create a new column for the given data. # Note that we always create a new column for the given data.
# This avoids two frames accidentally sharing the same column. # This avoids two frames accidentally sharing the same column.
self._columns = {k : Column.create(v) for k, v in data.items()} self._columns = {k : v if isinstance(v, LazyFeature) else Column.create(v)
for k, v in data.items()}
self._num_rows = num_rows self._num_rows = num_rows
# infer num_rows & sanity check # infer num_rows & sanity check
for name, col in self._columns.items(): for name, col in self._columns.items():
if isinstance(col, LazyFeature):
continue
if self._num_rows is None: if self._num_rows is None:
self._num_rows = len(col) self._num_rows = len(col)
elif len(col) != self._num_rows: elif len(col) != self._num_rows:
...@@ -504,6 +525,10 @@ class Frame(MutableMapping): ...@@ -504,6 +525,10 @@ class Frame(MutableMapping):
data : Column or data convertible to Column data : Column or data convertible to Column
The column data. The column data.
""" """
if isinstance(data, LazyFeature):
self._columns[name] = data
return
col = Column.create(data) col = Column.create(data)
if len(col) != self.num_rows: if len(col) != self.num_rows:
raise DGLError('Expected data to have %d rows, got %d.' % raise DGLError('Expected data to have %d rows, got %d.' %
......
"""Python interfaces to DGL farthest point sampler.""" """Python interfaces to DGL farthest point sampler."""
from dgl._ffi.base import DGLError
import numpy as np import numpy as np
from .._ffi.base import DGLError
from .._ffi.function import _init_api from .._ffi.function import _init_api
from .. import backend as F from .. import backend as F
from .. import ndarray as nd from .. import ndarray as nd
......
...@@ -1600,6 +1600,14 @@ class DGLHeteroGraph(object): ...@@ -1600,6 +1600,14 @@ class DGLHeteroGraph(object):
# View # View
################################################################# #################################################################
def get_node_storage(self, key, ntype=None):
"""Get storage object of node feature of type :attr:`ntype` and name :attr:`key`."""
return self._node_frames[self.get_ntype_id(ntype)]._columns[key]
def get_edge_storage(self, key, etype=None):
"""Get storage object of edge feature of type :attr:`etype` and name :attr:`key`."""
return self._edge_frames[self.get_etype_id(etype)]._columns[key]
@property @property
def nodes(self): def nodes(self):
"""Return a node view """Return a node view
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment