Unverified Commit b8ce0f41 authored by Quan (Andy) Gan's avatar Quan (Andy) Gan Committed by GitHub
Browse files

[Sampling] Cluster-GCN and ShaDow-GNN DataLoader (#3487)

* first commit

* next commit

* third commit

* add ShaDow-GNN sampler and unit tests

* fixes

* lint

* cr*p

* lint

* fix lint

* fixes and more unit tests

* more tests

* fix docs

* lint

* fix

* fix

* fix

* fixes

* fix doc
parent bba88cd6
......@@ -21,7 +21,7 @@ and an ``EdgeDataLoader`` for edge/link prediction task.
.. _api-dataloading-neighbor-sampling:
Neighbor Sampler
-----------------------------
----------------
.. currentmodule:: dgl.dataloading.neighbor
Neighbor samplers are classes that control the behavior of ``DataLoader`` s
......@@ -30,7 +30,7 @@ different neighbor sampling strategies by overriding the ``sample_frontier`` or
the ``sample_blocks`` methods.
.. autoclass:: BlockSampler
:members: sample_frontier, sample_blocks
:members: sample_frontier, sample_blocks, sample
.. autoclass:: MultiLayerNeighborSampler
:members: sample_frontier
......@@ -39,6 +39,29 @@ the ``sample_blocks`` methods.
.. autoclass:: MultiLayerFullNeighborSampler
:show-inheritance:
Subgraph Iterators
------------------
Subgraph iterators iterate over the original graph in subgraphs. One should use subgraph
iterators with ``GraphDataLoader`` like follows:
.. code:: python
sgiter = dgl.dataloading.ClusterGCNSubgraphIterator(
g, num_partitions=100, cache_directory='.', refresh=True)
dataloader = dgl.dataloading.GraphDataLoader(sgiter, batch_size=4, num_workers=0)
for subgraph_batch in dataloader:
train_on(subgraph_batch)
.. autoclass:: dgl.dataloading.dataloader.SubgraphIterator
.. autoclass:: dgl.dataloading.cluster_gcn.ClusterGCNSubgraphIterator
ShaDow-GNN Subgraph Sampler
---------------------------
.. currentmodule:: dgl.dataloading.shadow
.. autoclass:: ShaDowKHopSampler
.. _api-dataloading-collators:
Collators
......
......@@ -10,6 +10,7 @@ import torch
import torch.nn as nn
import torch.nn.functional as F
import dgl
import dgl.function as fn
from dgl.data import register_data_args
from modules import GraphSAGE
......@@ -72,8 +73,17 @@ def main(args):
# metis only support int64 graph
g = g.long()
cluster_iterator = ClusterIter(
args.dataset, g, args.psize, args.batch_size, train_nid, use_pp=args.use_pp)
if args.use_pp:
g.update_all(fn.copy_u('feat', 'm'), fn.sum('m', 'feat_agg'))
g.ndata['feat'] = torch.cat([g.ndata['feat'], g.ndata['feat_agg']], 1)
del g.ndata['feat_agg']
cluster_iterator = dgl.dataloading.GraphDataLoader(
dgl.dataloading.ClusterGCNSubgraphIterator(
dgl.node_subgraph(g, train_nid), args.psize, './cache'),
batch_size=args.batch_size, num_workers=4)
#cluster_iterator = ClusterIter(
# args.dataset, g, args.psize, args.batch_size, train_nid, use_pp=args.use_pp)
# set device for dataset tensors
if args.gpu < 0:
......@@ -132,9 +142,11 @@ def main(args):
cluster = cluster.to(torch.cuda.current_device())
model.train()
# forward
pred = model(cluster)
batch_labels = cluster.ndata['label']
batch_train_mask = cluster.ndata['train_mask']
if batch_train_mask.sum().item() == 0:
continue
pred = model(cluster)
loss = loss_f(pred[batch_train_mask],
batch_labels[batch_train_mask])
......
......@@ -37,7 +37,7 @@ class GraphSAGELayer(nn.Module):
def forward(self, g, h):
g = g.local_var()
if not self.use_pp or not self.training:
if not self.use_pp:
norm = self.get_norm(g)
g.ndata['h'] = h
g.update_all(fn.copy_src(src='h', out='m'),
......
......@@ -426,7 +426,7 @@ class LatentDirichletAllocation:
def doc_subgraph(G, doc_ids):
sampler = dgl.dataloading.MultiLayerFullNeighborSampler(1)
block, *_ = sampler.sample_blocks(G.reverse(), {'doc': torch.as_tensor(doc_ids)})
_, _, (block,) = sampler.sample(G.reverse(), {'doc': torch.as_tensor(doc_ids)})
B = dgl.DGLHeteroGraph(
block._graph, ['_', 'word', 'doc', '_'], block.etypes
).reverse()
......
import torch
import dgl
from dgl.dataloading.dataloader import EdgeCollator, assign_block_eids
from dgl.dataloading.dataloader import EdgeCollator
from dgl.dataloading import BlockSampler
from dgl.dataloading.pytorch import _pop_subgraph_storage, _pop_blocks_storage
from dgl.base import DGLError
......@@ -91,7 +91,7 @@ class TemporalSampler(BlockSampler):
#block = transform.to_block(frontier,seed_nodes)
block = frontier
if self.return_eids:
assign_block_eids(block, frontier)
self.assign_block_eids(block, frontier)
blocks.append(block)
return blocks
......
......@@ -16,6 +16,8 @@ Read the user guide :ref:`guide-minibatch`.
"""
from .neighbor import *
from .dataloader import *
from .cluster_gcn import *
from .shadow import *
from . import negative_sampler
from .async_transferer import AsyncTransferer
......
"""Cluster-GCN subgraph iterators."""
import os
import pickle
import numpy as np
from ..transform import metis_partition_assignment
from .. import backend as F
from .dataloader import SubgraphIterator
class ClusterGCNSubgraphIterator(SubgraphIterator):
"""Subgraph sampler following that of ClusterGCN.
This sampler first partitions the graph with METIS partitioning, then it caches the nodes of
each partition to a file within the given cache directory.
This is used in conjunction with :class:`dgl.dataloading.pytorch.GraphDataLoader`.
Notes
-----
The graph must be homogeneous and on CPU.
Parameters
----------
g : DGLGraph
The original graph.
num_partitions : int
The number of partitions.
cache_directory : str
The path to the cache directory for storing the partition result.
refresh : bool
If True, recompute the partition.
Examples
--------
Assuming that you have a graph ``g``:
>>> sgiter = dgl.dataloading.ClusterGCNSubgraphIterator(
... g, num_partitions=100, cache_directory='.', refresh=True)
>>> dataloader = dgl.dataloading.GraphDataLoader(sgiter, batch_size=4, num_workers=0)
>>> for subgraph_batch in dataloader:
... train_on(subgraph_batch)
"""
def __init__(self, g, num_partitions, cache_directory, refresh=False):
if os.name == 'nt':
raise NotImplementedError("METIS partitioning is not supported on Windows yet.")
super().__init__(g)
# First see if the cache is already there. If so, directly read from cache.
if not refresh and self._load_parts(cache_directory):
return
# Otherwise, build the cache.
assignment = F.asnumpy(metis_partition_assignment(g, num_partitions))
self._save_parts(assignment, cache_directory)
def _cache_file_path(self, cache_directory):
return os.path.join(cache_directory, 'cluster_gcn_cache')
def _load_parts(self, cache_directory):
path = self._cache_file_path(cache_directory)
if not os.path.exists(path):
return False
with open(path, 'rb') as file_:
self.part_indptr, self.part_indices = pickle.load(file_)
return True
def _save_parts(self, assignment, cache_directory):
os.makedirs(cache_directory, exist_ok=True)
self.part_indices = np.argsort(assignment)
num_nodes_per_part = np.bincount(assignment)
self.part_indptr = np.insert(np.cumsum(num_nodes_per_part), 0, 0)
with open(self._cache_file_path(cache_directory), 'wb') as file_:
pickle.dump((self.part_indptr, self.part_indices), file_)
def __len__(self):
return self.part_indptr.shape[0] - 1
def __getitem__(self, i):
nodes = self.part_indices[self.part_indptr[i]:self.part_indptr[i+1]]
return self.g.subgraph(nodes)
......@@ -12,19 +12,7 @@ from ..batch import batch
from ..convert import heterograph
from ..heterograph import DGLHeteroGraph as DGLGraph
from ..distributed.dist_graph import DistGraph
# pylint: disable=unused-argument
def assign_block_eids(block, frontier):
"""Assigns edge IDs from the original graph to the message flow graph (MFG).
See also
--------
BlockSampler
"""
for etype in block.canonical_etypes:
block.edges[etype].data[EID] = frontier.edges[etype].data[EID][
block.edges[etype].data[EID]]
return block
from ..utils import to_device
def _tensor_or_dict_to_numpy(ids):
if isinstance(ids, Mapping):
......@@ -111,9 +99,28 @@ class _EidExcluder():
return frontier
def _create_eid_excluder(exclude_eids, device):
def exclude_edges(subg, exclude_eids, device):
"""Find and remove from the subgraph the edges whose IDs in the parent
graph are given.
Parameters
----------
subg : DGLGraph
The subgraph. Must have ``dgl.EID`` field containing the original
edge IDs in the parent graph.
exclude_eids : Tensor or dict
The edge IDs to exclude.
device : device
The output device of the graph.
Returns
-------
DGLGraph
The new subgraph with edges removed. The ``dgl.EID`` field contains
the original edge IDs in the same parent graph.
"""
if exclude_eids is None:
return None
return subg
if device is not None:
if isinstance(exclude_eids, Mapping):
......@@ -122,7 +129,9 @@ def _create_eid_excluder(exclude_eids, device):
else:
exclude_eids = F.copy_to(exclude_eids, device)
return _EidExcluder(exclude_eids)
excluder = _EidExcluder(exclude_eids)
return subg if excluder is None else excluder(subg)
def _find_exclude_eids_with_reverse_id(g, eids, reverse_eid_map):
if isinstance(eids, Mapping):
......@@ -195,15 +204,67 @@ def _find_exclude_eids(g, exclude_mode, eids, **kwargs):
else:
raise ValueError('unsupported mode {}'.format(exclude_mode))
class Sampler(object):
"""An abstract class that takes in a graph and a set of seed nodes and returns a
structure representing a smaller portion of the graph for computation. It can
be either a list of bipartite graphs (i.e. :class:`BlockSampler`), or a single
subgraph.
"""
def __init__(self, output_ctx=None):
self.set_output_context(output_ctx)
def sample(self, g, seed_nodes, exclude_eids=None):
"""Sample a structure from the graph.
Parameters
----------
g : DGLGraph
The original graph.
seed_nodes : Tensor or dict[ntype, Tensor]
The destination nodes by type.
If the graph only has one node type, one can just specify a single tensor
of node IDs.
exclude_eids : Tensor or dict[etype, Tensor]
The edges to exclude from computation dependency.
Returns
-------
Tensor or dict[ntype, Tensor]
The nodes whose input features are required for computing the output
representation of :attr:`seed_nodes`.
any
Any data representing the structure.
"""
raise NotImplementedError
def set_output_context(self, ctx):
"""Set the device the generated block or subgraph will be output to.
This should only be set to a cuda device, when multi-processing is not
used in the dataloader (e.g., num_workers is 0).
Parameters
----------
ctx : DGLContext, default None
The device context the sampled blocks will be stored on. This
should only be a CUDA context if multiprocessing is not used in
the dataloader (e.g., num_workers is 0). If this is None, the
sampled blocks will be stored on the same device as the input
graph.
"""
if ctx is not None:
self.output_device = F.to_backend_ctx(ctx)
else:
self.output_device = None
class BlockSampler(object):
class BlockSampler(Sampler):
"""Abstract class specifying the neighborhood sampling strategy for DGL data loaders.
The main method for BlockSampler is :meth:`sample_blocks`,
The main method for BlockSampler is :meth:`sample`,
which generates a list of message flow graphs (MFGs) for a multi-layer GNN given a set of
seed nodes to have their outputs computed.
The default implementation of :meth:`sample_blocks` is
The default implementation of :meth:`sample` is
to repeat :attr:`num_layers` times the following procedure from the last layer to the first
layer:
......@@ -214,13 +275,13 @@ class BlockSampler(object):
* Optionally, if the task is link prediction or edge classfication, remove edges
connecting training node pairs. If the graph is undirected, also remove the
reverse edges. This is controlled by the argument :attr:`exclude_eids` in
:meth:`sample_blocks` method.
:meth:`sample` method.
* Convert the frontier into a MFG.
* Optionally assign the IDs of the edges in the original graph selected in the first step
to the MFG, controlled by the argument ``return_eids`` in
:meth:`sample_blocks` method.
:meth:`sample` method.
* Prepend the MFG to the MFG list to be returned.
......@@ -255,9 +316,23 @@ class BlockSampler(object):
:doc:`Minibatch Training Tutorials <tutorials/large/L0_neighbor_sampling_overview>`.
"""
def __init__(self, num_layers, return_eids=False, output_ctx=None):
super().__init__(output_ctx)
self.num_layers = num_layers
self.return_eids = return_eids
self.set_output_context(output_ctx)
# pylint: disable=unused-argument
@staticmethod
def assign_block_eids(block, frontier):
"""Assigns edge IDs from the original graph to the message flow graph (MFG).
See also
--------
BlockSampler
"""
for etype in block.canonical_etypes:
block.edges[etype].data[EID] = frontier.edges[etype].data[EID][
block.edges[etype].data[EID]]
return block
# This is really a hack working around the lack of GPU-based neighbor sampling
# with edge exclusion.
......@@ -266,10 +341,10 @@ class BlockSampler(object):
"""Returns whether the sampler will exclude edges in :func:`sample_frontier`.
If this method returns True, the method :func:`sample_frontier` will receive an
argument :attr:`exclude_eids` from :func:`sample_blocks`. :func:`sample_frontier`
argument :attr:`exclude_eids` from :func:`sample`. :func:`sample_frontier`
is then responsible for removing those edges.
If this method returns False, :func:`sample_blocks` will be responsible for
If this method returns False, :func:`sample` will be responsible for
removing the edges.
When subclassing :class:`BlockSampler`, this method should return True when you
......@@ -324,7 +399,7 @@ class BlockSampler(object):
"""
raise NotImplementedError
def sample_blocks(self, g, seed_nodes, exclude_eids=None):
def sample(self, g, seed_nodes, exclude_eids=None):
"""Generate the a list of MFGs given the destination nodes.
Parameters
......@@ -361,12 +436,7 @@ class BlockSampler(object):
graph_device = g.device
for block_id in reversed(range(self.num_layers)):
seed_nodes_in = seed_nodes
if isinstance(seed_nodes_in, dict):
seed_nodes_in = {ntype: nodes.to(graph_device) \
for ntype, nodes in seed_nodes_in.items()}
else:
seed_nodes_in = seed_nodes_in.to(graph_device)
seed_nodes_in = to_device(seed_nodes, graph_device)
if self.exclude_edges_in_frontier(g):
frontier = self.sample_frontier(
......@@ -376,48 +446,27 @@ class BlockSampler(object):
if self.output_device is not None:
frontier = frontier.to(self.output_device)
if isinstance(seed_nodes, dict):
seed_nodes_out = {ntype: nodes.to(self.output_device) \
for ntype, nodes in seed_nodes.items()}
else:
seed_nodes_out = seed_nodes.to(self.output_device)
seed_nodes_out = to_device(seed_nodes, self.output_device)
else:
seed_nodes_out = seed_nodes
# Removing edges from the frontier for link prediction training falls
# into the category of frontier postprocessing
if not self.exclude_edges_in_frontier(g):
eid_excluder = _create_eid_excluder(exclude_eids, self.output_device)
if eid_excluder is not None:
frontier = eid_excluder(frontier)
frontier = exclude_edges(frontier, exclude_eids, self.output_device)
block = transform.to_block(frontier, seed_nodes_out)
if self.return_eids:
assign_block_eids(block, frontier)
self.assign_block_eids(block, frontier)
seed_nodes = {ntype: block.srcnodes[ntype].data[NID] for ntype in block.srctypes}
blocks.insert(0, block)
return blocks
return blocks[0].srcdata[NID], blocks[-1].dstdata[NID], blocks
def set_output_context(self, ctx):
"""Set the device the generated block will be output to. This should
only be set to a cuda device, when multi-processing is not used in
the dataloader (e.g., num_workers is 0).
Parameters
----------
output_ctx : DGLContext, default None
The device context the sampled blocks will be stored on. This
should only be a CUDA context if multiprocessing is not used in
the dataloader (e.g., num_workers is 0). If this is None, the
sampled blocks will be stored on the same device as the input
graph.
def sample_blocks(self, g, seed_nodes, exclude_eids=None):
"""Deprecated and identical to :meth:`sample`.
"""
if ctx is not None:
self.output_device = F.to_backend_ctx(ctx)
else:
self.output_device = None
return self.sample(g, seed_nodes, exclude_eids)
class Collator(ABC):
"""Abstract DGL collator for training GNNs on downstream tasks stochastically.
......@@ -454,26 +503,6 @@ class Collator(ABC):
"""
raise NotImplementedError
# TODO(BarclayII): DistGraph.idtype and DistGraph.device are in the code, however
# the underlying DGLGraph object could be None. I was unable to figure out how
# to properly implement those two properties so I'm working around that. If the
# graph is a DistGraph, I assume that the dtype and device of the data should
# be the same as the graph already.
#
# After idtype and device get properly implemented, we should remove these two
# _prepare_* functions.
def _prepare_tensor_dict(g, data, name, is_distributed):
if is_distributed:
x = F.tensor(next(iter(data.values())))
return {k: F.copy_to(F.astype(F.tensor(v), F.dtype(x)), F.context(x)) \
for k, v in data.items()}
else:
return utils.prepare_tensor_dict(g, data, name)
def _prepare_tensor(g, data, name, is_distributed):
return F.tensor(data) if is_distributed else utils.prepare_tensor(g, data, name)
class NodeCollator(Collator):
"""DGL collator to combine nodes and their computation dependencies within a minibatch for
training node classification or regression on a single graph with neighborhood sampling.
......@@ -484,7 +513,7 @@ class NodeCollator(Collator):
The graph.
nids : Tensor or dict[ntype, Tensor]
The node set to compute outputs.
block_sampler : dgl.dataloading.BlockSampler
graph_sampler : dgl.dataloading.BlockSampler
The neighborhood sampler.
Examples
......@@ -507,20 +536,15 @@ class NodeCollator(Collator):
:ref:`User Guide Section 6 <guide-minibatch>` and
:doc:`Minibatch Training Tutorials <tutorials/large/L0_neighbor_sampling_overview>`.
"""
def __init__(self, g, nids, block_sampler):
def __init__(self, g, nids, graph_sampler):
self.g = g
self._is_distributed = isinstance(g, DistGraph)
if not isinstance(nids, Mapping):
assert len(g.ntypes) == 1, \
"nids should be a dict of node type and ids for graph with multiple node types"
self.block_sampler = block_sampler
self.graph_sampler = graph_sampler
if isinstance(nids, Mapping):
self.nids = _prepare_tensor_dict(g, nids, 'nids', self._is_distributed)
self._dataset = utils.FlattenedDict(self.nids)
else:
self.nids = _prepare_tensor(g, nids, 'nids', self._is_distributed)
self._dataset = self.nids
self.nids = utils.prepare_tensor_or_dict(g, nids, 'nids')
self._dataset = utils.maybe_flatten_dict(self.nids)
@property
def dataset(self):
......@@ -554,13 +578,9 @@ class NodeCollator(Collator):
if isinstance(items[0], tuple):
# returns a list of pairs: group them by node types into a dict
items = utils.group_as_dict(items)
items = _prepare_tensor_dict(self.g, items, 'items', self._is_distributed)
else:
items = _prepare_tensor(self.g, items, 'items', self._is_distributed)
items = utils.prepare_tensor_or_dict(self.g, items, 'items')
blocks = self.block_sampler.sample_blocks(self.g, items)
output_nodes = blocks[-1].dstdata[NID]
input_nodes = blocks[0].srcdata[NID]
input_nodes, output_nodes, blocks = self.graph_sampler.sample(self.g, items)
return input_nodes, output_nodes, blocks
......@@ -590,7 +610,7 @@ class EdgeCollator(Collator):
are generated.
eids : Tensor or dict[etype, Tensor]
The edge set in graph :attr:`g` to compute outputs.
block_sampler : dgl.dataloading.BlockSampler
graph_sampler : dgl.dataloading.BlockSampler
The neighborhood sampler.
g_sampling : DGLGraph, optional
The graph where neighborhood sampling and message passing is performed.
......@@ -652,7 +672,7 @@ class EdgeCollator(Collator):
Examples
--------
The following example shows how to train a 3-layer GNN for edge classification on a
set of edges ``train_eid`` on a homogeneous undirected graph. Each node takes
set of edges ``train_eid`` on a homogeneous undirected graph. Each node takes
messages from all neighbors.
Say that you have an array of source node IDs ``src`` and another array of destination
......@@ -741,14 +761,13 @@ class EdgeCollator(Collator):
:ref:`User Guide Section 6 <guide-minibatch>` and
:doc:`Minibatch Training Tutorials <tutorials/large/L0_neighbor_sampling_overview>`.
"""
def __init__(self, g, eids, block_sampler, g_sampling=None, exclude=None,
def __init__(self, g, eids, graph_sampler, g_sampling=None, exclude=None,
reverse_eids=None, reverse_etypes=None, negative_sampler=None):
self.g = g
self._is_distributed = isinstance(g, DistGraph)
if not isinstance(eids, Mapping):
assert len(g.etypes) == 1, \
"eids should be a dict of etype and ids for graph with multiple etypes"
self.block_sampler = block_sampler
self.graph_sampler = graph_sampler
# One may wish to iterate over the edges in one graph while perform sampling in
# another graph. This may be the case for iterating over validation and test
......@@ -766,12 +785,8 @@ class EdgeCollator(Collator):
self.reverse_etypes = reverse_etypes
self.negative_sampler = negative_sampler
if isinstance(eids, Mapping):
self.eids = _prepare_tensor_dict(g, eids, 'eids', self._is_distributed)
self._dataset = utils.FlattenedDict(self.eids)
else:
self.eids = _prepare_tensor(g, eids, 'eids', self._is_distributed)
self._dataset = self.eids
self.eids = utils.prepare_tensor_or_dict(g, eids, 'eids')
self._dataset = utils.maybe_flatten_dict(self.eids)
@property
def dataset(self):
......@@ -781,9 +796,7 @@ class EdgeCollator(Collator):
if isinstance(items[0], tuple):
# returns a list of pairs: group them by node types into a dict
items = utils.group_as_dict(items)
items = _prepare_tensor_dict(self.g_sampling, items, 'items', self._is_distributed)
else:
items = _prepare_tensor(self.g_sampling, items, 'items', self._is_distributed)
items = utils.prepare_tensor_or_dict(self.g_sampling, items, 'items')
pair_graph = self.g.edge_subgraph(items)
seed_nodes = pair_graph.ndata[NID]
......@@ -795,9 +808,8 @@ class EdgeCollator(Collator):
reverse_eid_map=self.reverse_eids,
reverse_etype_map=self.reverse_etypes)
blocks = self.block_sampler.sample_blocks(
input_nodes, _, blocks = self.graph_sampler.sample(
self.g_sampling, seed_nodes, exclude_eids=exclude_eids)
input_nodes = blocks[0].srcdata[NID]
return input_nodes, pair_graph, blocks
......@@ -805,9 +817,7 @@ class EdgeCollator(Collator):
if isinstance(items[0], tuple):
# returns a list of pairs: group them by node types into a dict
items = utils.group_as_dict(items)
items = _prepare_tensor_dict(self.g_sampling, items, 'items', self._is_distributed)
else:
items = _prepare_tensor(self.g_sampling, items, 'items', self._is_distributed)
items = utils.prepare_tensor_or_dict(self.g_sampling, items, 'items')
pair_graph = self.g.edge_subgraph(items, relabel_nodes=False)
induced_edges = pair_graph.edata[EID]
......@@ -840,9 +850,8 @@ class EdgeCollator(Collator):
reverse_eid_map=self.reverse_eids,
reverse_etype_map=self.reverse_etypes)
blocks = self.block_sampler.sample_blocks(
input_nodes, _, blocks = self.graph_sampler.sample(
self.g_sampling, seed_nodes, exclude_eids=exclude_eids)
input_nodes = blocks[0].srcdata[NID]
return input_nodes, pair_graph, neg_pair_graph, blocks
......@@ -965,3 +974,9 @@ class GraphCollator(object):
return [self.collate(samples) for samples in transposed]
raise TypeError(self.graph_collate_err_msg_format.format(elem_type))
class SubgraphIterator(object):
"""Abstract class representing an iterator that yields a subgraph given a graph.
"""
def __init__(self, g):
self.g = g
"""Data loading components for neighbor sampling"""
from .dataloader import BlockSampler
from .. import sampling, subgraph, distributed
from .. import sampling, distributed
from .. import ndarray as nd
from .. import backend as F
from ..base import ETYPE
class MultiLayerNeighborSampler(BlockSampler):
class NeighborSamplingMixin(object):
"""Mixin object containing common optimizing routines that caches fanout and probability
arrays.
The mixin requires the object to have the following attributes:
- :attr:`prob`: The edge feature name that stores the (unnormalized) probability.
- :attr:`fanouts`: The list of fanouts (either an integer or a dictionary of edge
types and integers).
The mixin will generate the following attributes:
- :attr:`prob_arrays`: List of DGL NDArrays containing the unnormalized probabilities
for every edge type.
- :attr:`fanout_arrays`: List of DGL NDArrays containing the fanouts for every edge
type at every layer.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) # forward to base classes
self.fanout_arrays = []
self.prob_arrays = None
def _build_prob_arrays(self, g):
if self.prob is not None:
self.prob_arrays = [F.to_dgl_nd(g.edges[etype].data[self.prob]) for etype in g.etypes]
elif self.prob_arrays is None:
# build prob_arrays only once
self.prob_arrays = [nd.array([], ctx=nd.cpu())] * len(g.etypes)
def _build_fanout(self, block_id, g):
assert not self.fanouts is None, \
"_build_fanout() should only be called when fanouts is not None"
# build fanout_arrays only once for each layer
while block_id >= len(self.fanout_arrays):
for i in range(len(self.fanouts)):
fanout = self.fanouts[i]
if not isinstance(fanout, dict):
fanout_array = [int(fanout)] * len(g.etypes)
else:
if len(fanout) != len(g.etypes):
raise DGLError('Fan-out must be specified for each edge type '
'if a dict is provided.')
fanout_array = [None] * len(g.etypes)
for etype, value in fanout.items():
fanout_array[g.get_etype_id(etype)] = value
self.fanout_arrays.append(
F.to_dgl_nd(F.tensor(fanout_array, dtype=F.int64)))
class MultiLayerNeighborSampler(NeighborSamplingMixin, BlockSampler):
"""Sampler that builds computational dependency of node representations via
neighbor sampling for multilayer GNN.
......@@ -14,14 +62,12 @@ class MultiLayerNeighborSampler(BlockSampler):
Parameters
----------
fanouts : list[int] or list[dict[etype, int] or None]
List of neighbors to sample per edge type for each GNN layer, starting from the
first layer.
fanouts : list[int] or list[dict[etype, int]]
List of neighbors to sample per edge type for each GNN layer, with the i-th
element being the fanout for the i-th GNN layer.
If the graph is homogeneous, only an integer is needed for each layer.
If None is provided for one layer, all neighbors will be included regardless of
edge types.
If only a single integer is provided, DGL assumes that every edge type
will have the same fanout.
If -1 is provided for one edge type on one layer, then all inbound edges
of that edge type will be included.
......@@ -30,6 +76,10 @@ class MultiLayerNeighborSampler(BlockSampler):
return_eids : bool, default False
Whether to return the edge IDs involved in message passing in the MFG.
If True, the edge IDs will be stored as an edge feature named ``dgl.EID``.
prob : str, optional
If given, the probability of each neighbor being sampled is proportional
to the edge feature value with the given name in ``g.edata``. The feature must be
a scalar on each edge.
Examples
--------
......@@ -38,11 +88,10 @@ class MultiLayerNeighborSampler(BlockSampler):
the first, second, and third layer respectively (assuming the backend is PyTorch):
>>> sampler = dgl.dataloading.MultiLayerNeighborSampler([5, 10, 15])
>>> collator = dgl.dataloading.NodeCollator(g, train_nid, sampler)
>>> dataloader = torch.utils.data.DataLoader(
... collator.dataset, collate_fn=collator.collate,
>>> dataloader = dgl.dataloading.NodeDataLoader(
... g, train_nid, sampler,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for blocks in dataloader:
>>> for input_nodes, output_nodes, blocks in dataloader:
... train_on(blocks)
If training on a heterogeneous graph and you want different number of neighbors for each
......@@ -54,13 +103,18 @@ class MultiLayerNeighborSampler(BlockSampler):
... ('user', 'plays', 'game'): 4,
... ('game', 'played-by', 'user'): 3}] * 3)
If you would like non-uniform neighbor sampling:
>>> g.edata['p'] = torch.rand(g.num_edges()) # any non-negative 1D vector works
>>> sampler = dgl.dataloading.MultiLayerNeighborSampler([5, 10, 15], prob='p')
Notes
-----
For the concept of MFGs, please refer to
:ref:`User Guide Section 6 <guide-minibatch>` and
:doc:`Minibatch Training Tutorials <tutorials/large/L0_neighbor_sampling_overview>`.
"""
def __init__(self, fanouts, replace=False, return_eids=False):
def __init__(self, fanouts, replace=False, return_eids=False, prob=None):
super().__init__(len(fanouts), return_eids)
self.fanouts = fanouts
......@@ -68,8 +122,7 @@ class MultiLayerNeighborSampler(BlockSampler):
# used to cache computations and memory allocations
# list[dgl.nd.NDArray]; each array stores the fan-outs of all edge types
self.fanout_arrays = []
self.prob_arrays = None
self.prob = prob
@classmethod
def exclude_edges_in_frontier(cls, g):
......@@ -78,56 +131,24 @@ class MultiLayerNeighborSampler(BlockSampler):
def sample_frontier(self, block_id, g, seed_nodes, exclude_eids=None):
fanout = self.fanouts[block_id]
if isinstance(g, distributed.DistGraph):
if fanout is None:
# TODO(zhengda) There is a bug in the distributed version of in_subgraph.
# let's use sample_neighbors to replace in_subgraph for now.
frontier = distributed.sample_neighbors(g, seed_nodes, -1, replace=False)
if len(g.etypes) > 1: # heterogeneous distributed graph
# The edge type is stored in g.edata[dgl.ETYPE]
assert isinstance(fanout, int), "For distributed training, " \
"we can only sample same number of neighbors for each edge type"
frontier = distributed.sample_etype_neighbors(
g, seed_nodes, ETYPE, fanout, replace=self.replace)
else:
if len(g.etypes) > 1: # heterogeneous distributed graph
# The edge type is stored in g.edata[dgl.ETYPE]
assert isinstance(fanout, int), "For distributed training, " \
"we can only sample same number of neighbors for each edge type"
frontier = distributed.sample_etype_neighbors(
g, seed_nodes, ETYPE, fanout, replace=self.replace)
else:
frontier = distributed.sample_neighbors(
g, seed_nodes, fanout, replace=self.replace)
frontier = distributed.sample_neighbors(
g, seed_nodes, fanout, replace=self.replace)
else:
if fanout is None:
frontier = subgraph.in_subgraph(g, seed_nodes)
else:
self._build_fanout(block_id, g)
self._build_prob_arrays(g)
self._build_fanout(block_id, g)
self._build_prob_arrays(g)
frontier = sampling.sample_neighbors(
g, seed_nodes, self.fanout_arrays[block_id],
replace=self.replace, prob=self.prob_arrays, exclude_edges=exclude_eids)
frontier = sampling.sample_neighbors(
g, seed_nodes, self.fanout_arrays[block_id],
replace=self.replace, prob=self.prob_arrays, exclude_edges=exclude_eids)
return frontier
def _build_prob_arrays(self, g):
# build prob_arrays only once
if self.prob_arrays is None:
self.prob_arrays = [nd.array([], ctx=nd.cpu())] * len(g.etypes)
def _build_fanout(self, block_id, g):
assert not self.fanouts is None, \
"_build_fanout() should only be called when fanouts is not None"
# build fanout_arrays only once for each layer
while block_id >= len(self.fanout_arrays):
for i in range(len(self.fanouts)):
fanout = self.fanouts[i]
if not isinstance(fanout, dict):
fanout_array = [int(fanout)] * len(g.etypes)
else:
if len(fanout) != len(g.etypes):
raise DGLError('Fan-out must be specified for each edge type '
'if a dict is provided.')
fanout_array = [None] * len(g.etypes)
for etype, value in fanout.items():
fanout_array[g.get_etype_id(etype)] = value
self.fanout_arrays.append(
F.to_dgl_nd(F.tensor(fanout_array, dtype=F.int64)))
class MultiLayerFullNeighborSampler(MultiLayerNeighborSampler):
"""Sampler that builds computational dependency of node representations by taking messages
......@@ -150,11 +171,10 @@ class MultiLayerFullNeighborSampler(MultiLayerNeighborSampler):
second, and third layer respectively (assuming the backend is PyTorch):
>>> sampler = dgl.dataloading.MultiLayerFullNeighborSampler(3)
>>> collator = dgl.dataloading.NodeCollator(g, train_nid, sampler)
>>> dataloader = torch.utils.data.DataLoader(
... collator.dataset, collate_fn=collator.collate,
>>> dataloader = dgl.dataloading.NodeDataLoader(
... g, train_nid, sampler,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for blocks in dataloader:
>>> for input_nodes, output_nodes, blocks in dataloader:
... train_on(blocks)
Notes
......@@ -164,7 +184,7 @@ class MultiLayerFullNeighborSampler(MultiLayerNeighborSampler):
:doc:`Minibatch Training Tutorials <tutorials/large/L0_neighbor_sampling_overview>`.
"""
def __init__(self, n_layers, return_eids=False):
super().__init__([None] * n_layers, return_eids=return_eids)
super().__init__([-1] * n_layers, return_eids=return_eids)
@classmethod
def exclude_edges_in_frontier(cls, g):
......
......@@ -3,10 +3,10 @@ import inspect
import math
from distutils.version import LooseVersion
import torch as th
from torch.utils.data import DataLoader
from torch.utils.data import DataLoader, IterableDataset
from torch.utils.data.distributed import DistributedSampler
import torch.distributed as dist
from ..dataloader import NodeCollator, EdgeCollator, GraphCollator
from ..dataloader import NodeCollator, EdgeCollator, GraphCollator, SubgraphIterator
from ...distributed import DistGraph
from ...distributed import DistDataLoader
from ...ndarray import NDArray as DGLNDArray
......@@ -16,8 +16,8 @@ from ...utils import to_dgl_context
__all__ = ['NodeDataLoader', 'EdgeDataLoader', 'GraphDataLoader',
# Temporary exposure.
'_pop_subgraph_storage', '_pop_blocks_storage',
'_restore_subgraph_storage', '_restore_blocks_storage']
'_pop_subgraph_storage', '_pop_storages',
'_restore_subgraph_storage', '_restore_storages']
PYTORCH_VER = LooseVersion(th.__version__)
PYTORCH_16 = PYTORCH_VER >= LooseVersion("1.6.0")
......@@ -213,26 +213,32 @@ def _pop_subgraph_storage(subg, g):
frame = g._edge_frames[g.get_etype_id(etype)]
_pop_subframe_storage(subframe, frame)
def _pop_blocks_storage(blocks, g):
for block in blocks:
for ntype in block.srctypes:
if ntype not in g.ntypes:
continue
subframe = block._node_frames[block.get_ntype_id_from_src(ntype)]
frame = g._node_frames[g.get_ntype_id(ntype)]
_pop_subframe_storage(subframe, frame)
for ntype in block.dsttypes:
if ntype not in g.ntypes:
continue
subframe = block._node_frames[block.get_ntype_id_from_dst(ntype)]
frame = g._node_frames[g.get_ntype_id(ntype)]
_pop_subframe_storage(subframe, frame)
for etype in block.canonical_etypes:
if etype not in g.canonical_etypes:
continue
subframe = block._edge_frames[block.get_etype_id(etype)]
frame = g._edge_frames[g.get_etype_id(etype)]
_pop_subframe_storage(subframe, frame)
def _pop_block_storage(block, g):
for ntype in block.srctypes:
if ntype not in g.ntypes:
continue
subframe = block._node_frames[block.get_ntype_id_from_src(ntype)]
frame = g._node_frames[g.get_ntype_id(ntype)]
_pop_subframe_storage(subframe, frame)
for ntype in block.dsttypes:
if ntype not in g.ntypes:
continue
subframe = block._node_frames[block.get_ntype_id_from_dst(ntype)]
frame = g._node_frames[g.get_ntype_id(ntype)]
_pop_subframe_storage(subframe, frame)
for etype in block.canonical_etypes:
if etype not in g.canonical_etypes:
continue
subframe = block._edge_frames[block.get_etype_id(etype)]
frame = g._edge_frames[g.get_etype_id(etype)]
_pop_subframe_storage(subframe, frame)
def _pop_storages(subgs, g):
for subg in subgs:
if subg.is_block:
_pop_block_storage(subg, g)
else:
_pop_subgraph_storage(subg, g)
def _restore_subframe_storage(subframe, frame):
for key, col in subframe._columns.items():
......@@ -253,32 +259,38 @@ def _restore_subgraph_storage(subg, g):
frame = g._edge_frames[g.get_etype_id(etype)]
_restore_subframe_storage(subframe, frame)
def _restore_blocks_storage(blocks, g):
for block in blocks:
for ntype in block.srctypes:
if ntype not in g.ntypes:
continue
subframe = block._node_frames[block.get_ntype_id_from_src(ntype)]
frame = g._node_frames[g.get_ntype_id(ntype)]
_restore_subframe_storage(subframe, frame)
for ntype in block.dsttypes:
if ntype not in g.ntypes:
continue
subframe = block._node_frames[block.get_ntype_id_from_dst(ntype)]
frame = g._node_frames[g.get_ntype_id(ntype)]
_restore_subframe_storage(subframe, frame)
for etype in block.canonical_etypes:
if etype not in g.canonical_etypes:
continue
subframe = block._edge_frames[block.get_etype_id(etype)]
frame = g._edge_frames[g.get_etype_id(etype)]
_restore_subframe_storage(subframe, frame)
def _restore_block_storage(block, g):
for ntype in block.srctypes:
if ntype not in g.ntypes:
continue
subframe = block._node_frames[block.get_ntype_id_from_src(ntype)]
frame = g._node_frames[g.get_ntype_id(ntype)]
_restore_subframe_storage(subframe, frame)
for ntype in block.dsttypes:
if ntype not in g.ntypes:
continue
subframe = block._node_frames[block.get_ntype_id_from_dst(ntype)]
frame = g._node_frames[g.get_ntype_id(ntype)]
_restore_subframe_storage(subframe, frame)
for etype in block.canonical_etypes:
if etype not in g.canonical_etypes:
continue
subframe = block._edge_frames[block.get_etype_id(etype)]
frame = g._edge_frames[g.get_etype_id(etype)]
_restore_subframe_storage(subframe, frame)
def _restore_storages(subgs, g):
for subg in subgs:
if subg.is_block:
_restore_block_storage(subg, g)
else:
_restore_subgraph_storage(subg, g)
class _NodeCollator(NodeCollator):
def collate(self, items):
# input_nodes, output_nodes, blocks
result = super().collate(items)
_pop_blocks_storage(result[-1], self.g)
_pop_storages(result[-1], self.g)
return result
class _EdgeCollator(EdgeCollator):
......@@ -287,16 +299,27 @@ class _EdgeCollator(EdgeCollator):
# input_nodes, pair_graph, blocks
result = super().collate(items)
_pop_subgraph_storage(result[1], self.g)
_pop_blocks_storage(result[-1], self.g_sampling)
_pop_storages(result[-1], self.g_sampling)
return result
else:
# input_nodes, pair_graph, neg_pair_graph, blocks
result = super().collate(items)
_pop_subgraph_storage(result[1], self.g)
_pop_subgraph_storage(result[2], self.g)
_pop_blocks_storage(result[-1], self.g_sampling)
_pop_storages(result[-1], self.g_sampling)
return result
class _GraphCollator(GraphCollator):
def __init__(self, subgraph_iterator, **kwargs):
super().__init__(**kwargs)
self.subgraph_iterator = subgraph_iterator
def collate(self, items):
result = super().collate(items)
if self.subgraph_iterator is not None:
_pop_storages([result], self.subgraph_iterator.g)
return result
def _to_device(data, device):
if isinstance(data, dict):
for k, v in data.items():
......@@ -320,7 +343,7 @@ class _NodeDataLoaderIter:
def __next__(self):
# input_nodes, output_nodes, blocks
result_ = next(self.iter_)
_restore_blocks_storage(result_[-1], self.node_dataloader.collator.g)
_restore_storages(result_[-1], self.node_dataloader.collator.g)
result = [_to_device(data, self.device) for data in result_]
return result
......@@ -343,11 +366,25 @@ class _EdgeDataLoaderIter:
# Otherwise, input_nodes, pair_graph, blocks
_restore_subgraph_storage(result_[2], self.edge_dataloader.collator.g)
_restore_subgraph_storage(result_[1], self.edge_dataloader.collator.g)
_restore_blocks_storage(result_[-1], self.edge_dataloader.collator.g_sampling)
_restore_storages(result_[-1], self.edge_dataloader.collator.g_sampling)
result = [_to_device(data, self.device) for data in result_]
return result
class _GraphDataLoaderIter:
def __init__(self, graph_dataloader):
self.dataloader = graph_dataloader
self.iter_ = iter(graph_dataloader.dataloader)
def __iter__(self):
return self
def __next__(self):
result = next(self.iter_)
if self.dataloader.is_subgraph_loader:
_restore_storages([result], g)
return result
def _init_dataloader(collator, device, dataloader_kwargs, use_ddp, ddp_seed):
dataset = collator.dataset
use_scalar_batcher = False
......@@ -401,7 +438,7 @@ class NodeDataLoader:
The graph.
nids : Tensor or dict[ntype, Tensor]
The node set to compute outputs.
block_sampler : dgl.dataloading.BlockSampler
graph_sampler : dgl.dataloading.Sampler
The neighborhood sampler.
device : device context, optional
The device of the generated MFGs in each iteration, which should be a
......@@ -479,7 +516,7 @@ class NodeDataLoader:
"""
collator_arglist = inspect.getfullargspec(NodeCollator).args
def __init__(self, g, nids, block_sampler, device=None, use_ddp=False, ddp_seed=0, **kwargs):
def __init__(self, g, nids, graph_sampler, device=None, use_ddp=False, ddp_seed=0, **kwargs):
collator_kwargs = {}
dataloader_kwargs = {}
for k, v in kwargs.items():
......@@ -495,7 +532,7 @@ class NodeDataLoader:
assert device == 'cpu', 'Only cpu is supported in the case of a DistGraph.'
# Distributed DataLoader currently does not support heterogeneous graphs
# and does not copy features. Fallback to normal solution
self.collator = NodeCollator(g, nids, block_sampler, **collator_kwargs)
self.collator = NodeCollator(g, nids, graph_sampler, **collator_kwargs)
_remove_kwargs_dist(dataloader_kwargs)
self.dataloader = DistDataLoader(self.collator.dataset,
collate_fn=self.collator.collate,
......@@ -509,10 +546,10 @@ class NodeDataLoader:
# if the sampler supports it, tell it to output to the
# specified device
num_workers = dataloader_kwargs.get('num_workers', 0)
if callable(getattr(block_sampler, "set_output_context", None)) and num_workers == 0:
block_sampler.set_output_context(to_dgl_context(device))
if callable(getattr(graph_sampler, "set_output_context", None)) and num_workers == 0:
graph_sampler.set_output_context(to_dgl_context(device))
self.collator = _NodeCollator(g, nids, block_sampler, **collator_kwargs)
self.collator = _NodeCollator(g, nids, graph_sampler, **collator_kwargs)
self.use_scalar_batcher, self.scalar_batcher, self.dataloader, self.dist_sampler = \
_init_dataloader(self.collator, device, dataloader_kwargs, use_ddp, ddp_seed)
......@@ -586,7 +623,7 @@ class EdgeDataLoader:
The graph. Currently must be on CPU; GPU is not supported.
eids : Tensor or dict[etype, Tensor]
The edge set in graph :attr:`g` to compute outputs.
block_sampler : dgl.dataloading.BlockSampler
graph_sampler : dgl.dataloading.Sampler
The neighborhood sampler.
device : device context, optional
The device of the generated MFGs and graphs in each iteration, which should be a
......@@ -655,7 +692,7 @@ class EdgeDataLoader:
Examples
--------
The following example shows how to train a 3-layer GNN for edge classification on a
set of edges ``train_eid`` on a homogeneous undirected graph. Each node takes
set of edges ``train_eid`` on a homogeneous undirected graph. Each node takes
messages from all neighbors.
Say that you have an array of source node IDs ``src`` and another array of destination
......@@ -767,7 +804,7 @@ class EdgeDataLoader:
"""
collator_arglist = inspect.getfullargspec(EdgeCollator).args
def __init__(self, g, eids, block_sampler, device='cpu', use_ddp=False, ddp_seed=0, **kwargs):
def __init__(self, g, eids, graph_sampler, device='cpu', use_ddp=False, ddp_seed=0, **kwargs):
collator_kwargs = {}
dataloader_kwargs = {}
for k, v in kwargs.items():
......@@ -783,7 +820,7 @@ class EdgeDataLoader:
assert device == 'cpu', 'Only cpu is supported in the case of a DistGraph.'
# Distributed DataLoader currently does not support heterogeneous graphs
# and does not copy features. Fallback to normal solution
self.collator = EdgeCollator(g, eids, block_sampler, **collator_kwargs)
self.collator = EdgeCollator(g, eids, graph_sampler, **collator_kwargs)
_remove_kwargs_dist(dataloader_kwargs)
self.dataloader = DistDataLoader(self.collator.dataset,
collate_fn=self.collator.collate,
......@@ -797,10 +834,10 @@ class EdgeDataLoader:
# if the sampler supports it, tell it to output to the
# specified device
num_workers = dataloader_kwargs.get('num_workers', 0)
if callable(getattr(block_sampler, "set_output_context", None)) and num_workers == 0:
block_sampler.set_output_context(to_dgl_context(device))
if callable(getattr(graph_sampler, "set_output_context", None)) and num_workers == 0:
graph_sampler.set_output_context(to_dgl_context(device))
self.collator = _EdgeCollator(g, eids, block_sampler, **collator_kwargs)
self.collator = _EdgeCollator(g, eids, graph_sampler, **collator_kwargs)
self.use_scalar_batcher, self.scalar_batcher, self.dataloader, self.dist_sampler = \
_init_dataloader(self.collator, device, dataloader_kwargs, use_ddp, ddp_seed)
self.use_ddp = use_ddp
......@@ -902,8 +939,24 @@ class GraphDataLoader:
else:
dataloader_kwargs[k] = v
# If the dataset is an infinite SubgraphIterator (i.e. without __len__) over a
# larger graph, convert it to an IterableDataset.
if isinstance(dataset, SubgraphIterator) and not hasattr(dataset, '__len__'):
class _Dataset(IterableDataset):
def __init__(self, iter_):
self._it = iter_
def __iter__(self):
return iter(self._it)
self.subgraph_iterator = dataset
dataset = _Dataset(dataset)
self.is_subgraph_loader = True
else:
self.is_subgraph_loader = False
self.subgraph_iterator = None
if collate_fn is None:
self.collate = GraphCollator(**collator_kwargs).collate
self.collate = _GraphCollator(self.subgraph_iterator, **collator_kwargs).collate
else:
self.collate = collate_fn
......@@ -918,7 +971,7 @@ class GraphDataLoader:
def __iter__(self):
"""Return the iterator of the data loader."""
return iter(self.dataloader)
return _GraphDataLoaderIter(self)
def __len__(self):
"""Return the number of batches of the data loader."""
......
"""ShaDow-GNN subgraph samplers."""
from ..utils import prepare_tensor_or_dict
from ..base import NID
from .. import transform
from ..sampling import sample_neighbors
from .neighbor import NeighborSamplingMixin
from .dataloader import exclude_edges, Sampler
class ShaDowKHopSampler(NeighborSamplingMixin, Sampler):
"""K-hop subgraph sampler used by
`ShaDow-GNN <https://arxiv.org/abs/2012.01380>`__.
It performs node-wise neighbor sampling but instead of returning a list of
MFGs, it returns a single subgraph induced by all the sampled nodes. The
seed nodes from which the neighbors are sampled will appear the first in the
induced nodes of the subgraph.
This is used in conjunction with :class:`dgl.dataloading.pytorch.NodeDataLoader`
and :class:`dgl.dataloading.pytorch.EdgeDataLoader`.
Parameters
----------
fanouts : list[int] or list[dict[etype, int]]
List of neighbors to sample per edge type for each GNN layer, with the i-th
element being the fanout for the i-th GNN layer.
If only a single integer is provided, DGL assumes that every edge type
will have the same fanout.
If -1 is provided for one edge type on one layer, then all inbound edges
of that edge type will be included.
replace : bool, default True
Whether to sample with replacement
prob : str, optional
If given, the probability of each neighbor being sampled is proportional
to the edge feature value with the given name in ``g.edata``. The feature must be
a scalar on each edge.
Examples
--------
To train a 3-layer GNN for node classification on a set of nodes ``train_nid`` on
a homogeneous graph where each node takes messages from 5, 10, 15 neighbors for
the first, second, and third layer respectively (assuming the backend is PyTorch):
>>> g = dgl.data.CoraFullDataset()[0]
>>> sampler = dgl.dataloading.ShaDowKHopSampler([5, 10, 15])
>>> dataloader = dgl.dataloading.NodeDataLoader(
... g, torch.arange(g.num_nodes()), sampler,
... batch_size=5, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, output_nodes, (subgraph,) in dataloader:
... print(subgraph)
... assert torch.equal(input_nodes, subgraph.ndata[dgl.NID])
... assert torch.equal(input_nodes[:output_nodes.shape[0]], output_nodes)
... break
Graph(num_nodes=529, num_edges=3796,
ndata_schemes={'label': Scheme(shape=(), dtype=torch.int64),
'feat': Scheme(shape=(8710,), dtype=torch.float32),
'_ID': Scheme(shape=(), dtype=torch.int64)}
edata_schemes={'_ID': Scheme(shape=(), dtype=torch.int64)})
If training on a heterogeneous graph and you want different number of neighbors for each
edge type, one should instead provide a list of dicts. Each dict would specify the
number of neighbors to pick per edge type.
>>> sampler = dgl.dataloading.ShaDowKHopSampler([
... {('user', 'follows', 'user'): 5,
... ('user', 'plays', 'game'): 4,
... ('game', 'played-by', 'user'): 3}] * 3)
If you would like non-uniform neighbor sampling:
>>> g.edata['p'] = torch.rand(g.num_edges()) # any non-negative 1D vector works
>>> sampler = dgl.dataloading.MultiLayerNeighborSampler([5, 10, 15], prob='p')
"""
def __init__(self, fanouts, replace=False, prob=None, output_ctx=None):
super().__init__(output_ctx)
self.fanouts = fanouts
self.replace = replace
self.prob = prob
self.set_output_context(output_ctx)
def sample(self, g, seed_nodes, exclude_eids=None):
self._build_fanout(len(self.fanouts), g)
self._build_prob_arrays(g)
seed_nodes = prepare_tensor_or_dict(g, seed_nodes, 'seed nodes')
output_nodes = seed_nodes
for i in range(len(self.fanouts)):
fanout = self.fanouts[i]
frontier = sample_neighbors(
g, seed_nodes, fanout, replace=self.replace, prob=self.prob_arrays)
block = transform.to_block(frontier, seed_nodes)
seed_nodes = block.srcdata[NID]
subg = g.subgraph(seed_nodes, relabel_nodes=True)
subg = exclude_edges(subg, exclude_eids, self.output_device)
return seed_nodes, output_nodes, [subg]
......@@ -146,7 +146,10 @@ def node_subgraph(graph, nodes, *, relabel_nodes=True, store_ids=True):
induced_nodes.append(_process_nodes(ntype, nids))
sgi = graph._graph.node_subgraph(induced_nodes, relabel_nodes)
induced_edges = sgi.induced_edges
induced_nodes = sgi.induced_nodes if relabel_nodes else None
# (BarclayII) should not write induced_nodes = sgi.induced_nodes due to the same
# bug in #1453.
if not relabel_nodes:
induced_nodes = None
return _create_hetero_subgraph(graph, sgi, induced_nodes, induced_edges, store_ids=store_ids)
DGLHeteroGraph.subgraph = utils.alias_func(node_subgraph)
......
......@@ -18,7 +18,8 @@
from collections.abc import Iterable, Mapping
from collections import defaultdict
import numpy as np
from scipy import sparse
import scipy.sparse as sparse
import scipy.sparse.linalg
from ._ffi.function import _init_api
from .base import dgl_warning, DGLError
......@@ -1303,8 +1304,8 @@ def laplacian_lambda_max(g):
adj = g_i.adj(transpose=True, scipy_fmt=g_i.formats()['created'][0]).astype(float)
norm = sparse.diags(F.asnumpy(g_i.in_degrees()).clip(1) ** -0.5, dtype=float)
laplacian = sparse.eye(n) - norm * adj * norm
rst.append(sparse.linalg.eigs(laplacian, 1, which='LM',
return_eigenvectors=False)[0].real)
rst.append(scipy.sparse.linalg.eigs(
laplacian, 1, which='LM', return_eigenvectors=False)[0].real)
return rst
def metapath_reachable_graph(g, metapath):
......
"""Checking and logging utilities."""
# pylint: disable=invalid-name
from __future__ import absolute_import, division
from collections.abc import Mapping
from ..base import DGLError
from .._ffi.function import _init_api
......@@ -52,7 +53,7 @@ def prepare_tensor(g, data, name):
def prepare_tensor_dict(g, data, name):
"""Convert a dictionary of data to a dictionary of ID tensors.
If calls ``prepare_tensor`` on each key-value pair.
Calls ``prepare_tensor`` on each key-value pair.
Parameters
----------
......@@ -70,6 +71,25 @@ def prepare_tensor_dict(g, data, name):
return {key : prepare_tensor(g, val, '{}["{}"]'.format(name, key))
for key, val in data.items()}
def prepare_tensor_or_dict(g, data, name):
"""Convert data to either a tensor or a dictionary depending on input type.
Parameters
----------
g : DGLHeteroGraph
Graph.
data : dict[str, (int, iterable of int, tensor)]
Data dict.
name : str
Name of the data.
Returns
-------
tensor or dict[str, tensor]
"""
return prepare_tensor_dict(g, data, name) if isinstance(data, Mapping) \
else prepare_tensor(g, data, name)
def parse_edges_arg_to_eid(g, edges, etid, argname='edges'):
"""Parse the :attr:`edges` argument and return an edge ID tensor.
......
......@@ -326,3 +326,25 @@ def infer_num_nodes(data, bipartite=False):
if not bipartite:
nsrc = ndst = max(nsrc, ndst)
return nsrc, ndst
def to_device(data, device):
"""Transfer the tensor or dictionary of tensors to the given device.
Nothing will happen if the device of the original tensor is the same as target device.
Parameters
----------
data : Tensor or dict[str, Tensor]
The data.
device : device
The target device.
Returns
-------
Tensor or dict[str, Tensor]
The output data.
"""
if isinstance(data, dict):
return {k: F.copy_to(v, device) for k, v in data.items()}
else:
return F.copy_to(data, device)
......@@ -691,6 +691,11 @@ class FlattenedDict(object):
g = self._groups[k]
return k, g[j]
def maybe_flatten_dict(data):
"""Return a FlattenedDict if the input is a Mapping, or the data itself otherwise.
"""
return FlattenedDict(data) if isinstance(data, Mapping) else data
def compensate(ids, origin_ids):
"""computing the compensate set of ids from origin_ids
......
import os
import dgl
import backend as F
import unittest
import torch
from torch.utils.data import DataLoader
from collections import defaultdict
from collections.abc import Iterator
from itertools import product
import pytest
def _check_neighbor_sampling_dataloader(g, nids, dl, mode, collator):
seeds = defaultdict(list)
......@@ -204,6 +207,76 @@ def test_graph_dataloader():
assert isinstance(graph, dgl.DGLGraph)
assert F.asnumpy(label).shape[0] == batch_size
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@pytest.mark.parametrize('num_workers', [0, 4])
def test_cluster_gcn(num_workers):
dataset = dgl.data.CoraFullDataset()
g = dataset[0]
sgiter = dgl.dataloading.ClusterGCNSubgraphIterator(g, 100, '.', refresh=True)
dataloader = dgl.dataloading.GraphDataLoader(sgiter, batch_size=4, num_workers=num_workers)
for sg in dataloader:
assert sg.batch_size == 4
sgiter = dgl.dataloading.ClusterGCNSubgraphIterator(g, 100, '.', refresh=False) # use cache
dataloader = dgl.dataloading.GraphDataLoader(sgiter, batch_size=4, num_workers=num_workers)
for sg in dataloader:
assert sg.batch_size == 4
@pytest.mark.parametrize('num_workers', [0, 4])
def test_shadow(num_workers):
g = dgl.data.CoraFullDataset()[0]
sampler = dgl.dataloading.ShaDowKHopSampler([5, 10, 15])
dataloader = dgl.dataloading.NodeDataLoader(
g, torch.arange(g.num_nodes()), sampler,
batch_size=5, shuffle=True, drop_last=False, num_workers=num_workers)
for i, (input_nodes, output_nodes, (subgraph,)) in enumerate(dataloader):
assert torch.equal(input_nodes, subgraph.ndata[dgl.NID])
assert torch.equal(input_nodes[:output_nodes.shape[0]], output_nodes)
assert torch.equal(subgraph.ndata['label'], g.ndata['label'][input_nodes])
assert torch.equal(subgraph.ndata['feat'], g.ndata['feat'][input_nodes])
if i == 5:
break
@pytest.mark.parametrize('num_workers', [0, 4])
def test_neighbor_nonuniform(num_workers):
g = dgl.graph(([1, 2, 3, 4, 5, 6, 7, 8], [0, 0, 0, 0, 1, 1, 1, 1]))
g.edata['p'] = torch.FloatTensor([1, 1, 0, 0, 1, 1, 0, 0])
sampler = dgl.dataloading.MultiLayerNeighborSampler([2], prob='p')
dataloader = dgl.dataloading.NodeDataLoader(g, [0, 1], sampler, batch_size=1, device=F.ctx())
for input_nodes, output_nodes, blocks in dataloader:
seed = output_nodes.item()
neighbors = set(input_nodes[1:].cpu().numpy())
if seed == 1:
assert neighbors == {5, 6}
elif seed == 0:
assert neighbors == {1, 2}
g = dgl.heterograph({
('B', 'BA', 'A'): ([1, 2, 3, 4, 5, 6, 7, 8], [0, 0, 0, 0, 1, 1, 1, 1]),
('C', 'CA', 'A'): ([1, 2, 3, 4, 5, 6, 7, 8], [0, 0, 0, 0, 1, 1, 1, 1]),
})
g.edges['BA'].data['p'] = torch.FloatTensor([1, 1, 0, 0, 1, 1, 0, 0])
g.edges['CA'].data['p'] = torch.FloatTensor([0, 0, 1, 1, 0, 0, 1, 1])
sampler = dgl.dataloading.MultiLayerNeighborSampler([2], prob='p')
dataloader = dgl.dataloading.NodeDataLoader(
g, {'A': [0, 1]}, sampler, batch_size=1, device=F.ctx())
for input_nodes, output_nodes, blocks in dataloader:
seed = output_nodes['A'].item()
# Seed and neighbors are of different node types so slicing is not necessary here.
neighbors = set(input_nodes['B'].cpu().numpy())
if seed == 1:
assert neighbors == {5, 6}
elif seed == 0:
assert neighbors == {1, 2}
neighbors = set(input_nodes['C'].cpu().numpy())
if seed == 1:
assert neighbors == {7, 8}
elif seed == 0:
assert neighbors == {3, 4}
def _check_device(data):
if isinstance(data, dict):
for k, v in data.items():
......@@ -214,11 +287,15 @@ def _check_device(data):
else:
assert data.device == F.ctx()
def test_node_dataloader():
sampler = dgl.dataloading.MultiLayerFullNeighborSampler(2)
@pytest.mark.parametrize('sampler_name', ['full', 'neighbor', 'neighbor2', 'shadow'])
def test_node_dataloader(sampler_name):
g1 = dgl.graph(([0, 0, 0, 1, 1], [1, 2, 3, 3, 4]))
g1.ndata['feat'] = F.copy_to(F.randn((5, 8)), F.cpu())
sampler = {
'full': dgl.dataloading.MultiLayerFullNeighborSampler(2),
'neighbor': dgl.dataloading.MultiLayerNeighborSampler([3, 3]),
'neighbor2': dgl.dataloading.MultiLayerNeighborSampler([3, 3]),
'shadow': dgl.dataloading.ShaDowKHopSampler([3, 3])}[sampler_name]
dataloader = dgl.dataloading.NodeDataLoader(
g1, g1.nodes(), sampler, device=F.ctx(), batch_size=g1.num_nodes())
......@@ -236,6 +313,11 @@ def test_node_dataloader():
for ntype in g2.ntypes:
g2.nodes[ntype].data['feat'] = F.copy_to(F.randn((g2.num_nodes(ntype), 8)), F.cpu())
batch_size = max(g2.num_nodes(nty) for nty in g2.ntypes)
sampler = {
'full': dgl.dataloading.MultiLayerFullNeighborSampler(2),
'neighbor': dgl.dataloading.MultiLayerNeighborSampler([{etype: 3 for etype in g2.etypes}] * 2),
'neighbor2': dgl.dataloading.MultiLayerNeighborSampler([3, 3]),
'shadow': dgl.dataloading.ShaDowKHopSampler([{etype: 3 for etype in g2.etypes}] * 2)}[sampler_name]
dataloader = dgl.dataloading.NodeDataLoader(
g2, {nty: g2.nodes(nty) for nty in g2.ntypes},
......@@ -246,13 +328,19 @@ def test_node_dataloader():
_check_device(output_nodes)
_check_device(blocks)
def test_edge_dataloader():
sampler = dgl.dataloading.MultiLayerFullNeighborSampler(2)
@pytest.mark.parametrize('sampler_name', ['full', 'neighbor', 'shadow'])
def test_edge_dataloader(sampler_name):
neg_sampler = dgl.dataloading.negative_sampler.Uniform(2)
g1 = dgl.graph(([0, 0, 0, 1, 1], [1, 2, 3, 3, 4]))
g1.ndata['feat'] = F.copy_to(F.randn((5, 8)), F.cpu())
sampler = {
'full': dgl.dataloading.MultiLayerFullNeighborSampler(2),
'neighbor': dgl.dataloading.MultiLayerNeighborSampler([3, 3]),
'shadow': dgl.dataloading.ShaDowKHopSampler([3, 3])}[sampler_name]
# no negative sampler
dataloader = dgl.dataloading.EdgeDataLoader(
g1, g1.edges(form='eid'), sampler, device=F.ctx(), batch_size=g1.num_edges())
......@@ -280,6 +368,10 @@ def test_edge_dataloader():
for ntype in g2.ntypes:
g2.nodes[ntype].data['feat'] = F.copy_to(F.randn((g2.num_nodes(ntype), 8)), F.cpu())
batch_size = max(g2.num_edges(ety) for ety in g2.canonical_etypes)
sampler = {
'full': dgl.dataloading.MultiLayerFullNeighborSampler(2),
'neighbor': dgl.dataloading.MultiLayerNeighborSampler([{etype: 3 for etype in g2.etypes}] * 2),
'shadow': dgl.dataloading.ShaDowKHopSampler([{etype: 3 for etype in g2.etypes}] * 2)}[sampler_name]
# no negative sampler
dataloader = dgl.dataloading.EdgeDataLoader(
......@@ -306,5 +398,8 @@ def test_edge_dataloader():
if __name__ == '__main__':
test_neighbor_sampler_dataloader()
test_graph_dataloader()
test_node_dataloader()
test_edge_dataloader()
test_cluster_gcn(0)
test_neighbor_nonuniform(0)
for sampler in ['full', 'neighbor', 'shadow']:
test_node_dataloader(sampler)
test_edge_dataloader(sampler)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment