Unverified Commit f5eb80d2 authored by Quan (Andy) Gan's avatar Quan (Andy) Gan Committed by GitHub
Browse files

[Feature] Edge DataLoader for edge classification & link prediction (#1828)

* clean commit

* oops forgot the most important files

* use einsum

* copy feature from frontier to block

* Revert "copy feature from frontier to block"

This reverts commit 5224ec963eb6a3ef1b6ab74d8ecbd44e4e42f285.

* temp fix

* unit test

* fix

* revert jtnn

* lint

* fix win64

* docstring fixes and doc indexing

* revert einsum in sparse bidecoder

* fix some examples

* lint

* fix due to some tediousness in remove_edges

* addresses comments

* fix

* more jtnn fixes

* fix
parent d340ea3a
...@@ -70,8 +70,8 @@ class SAGE(nn.Module): ...@@ -70,8 +70,8 @@ class SAGE(nn.Module):
for l, layer in enumerate(self.layers): for l, layer in enumerate(self.layers):
y = th.zeros(g.number_of_nodes(), self.n_hidden if l != len(self.layers) - 1 else self.n_classes) y = th.zeros(g.number_of_nodes(), self.n_hidden if l != len(self.layers) - 1 else self.n_classes)
sampler = dgl.sampling.MultiLayerNeighborSampler([None]) sampler = dgl.dataloading.MultiLayerFullNeighborSampler(1)
dataloader = dgl.sampling.NodeDataLoader( dataloader = dgl.dataloading.NodeDataLoader(
g, g,
th.arange(g.number_of_nodes()), th.arange(g.number_of_nodes()),
sampler, sampler,
...@@ -132,9 +132,9 @@ def run(args, device, data): ...@@ -132,9 +132,9 @@ def run(args, device, data):
train_nid, val_nid, test_nid, in_feats, labels, n_classes, g = data train_nid, val_nid, test_nid, in_feats, labels, n_classes, g = data
# Create PyTorch DataLoader for constructing blocks # Create PyTorch DataLoader for constructing blocks
sampler = dgl.sampling.MultiLayerNeighborSampler( sampler = dgl.dataloading.MultiLayerNeighborSampler(
[int(fanout) for fanout in args.fan_out.split(',')]) [int(fanout) for fanout in args.fan_out.split(',')])
dataloader = dgl.sampling.NodeDataLoader( dataloader = dgl.dataloading.NodeDataLoader(
g, g,
train_nid, train_nid,
sampler, sampler,
......
...@@ -6,42 +6,25 @@ import scipy.sparse as ssp ...@@ -6,42 +6,25 @@ import scipy.sparse as ssp
# This is the train-test split method most of the recommender system papers running on MovieLens # This is the train-test split method most of the recommender system papers running on MovieLens
# takes. It essentially follows the intuition of "training on the past and predict the future". # takes. It essentially follows the intuition of "training on the past and predict the future".
# One can also change the threshold to make validation and test set take larger proportions. # One can also change the threshold to make validation and test set take larger proportions.
def train_test_split_by_time(g, column, etype, itype): def train_test_split_by_time(df, timestamp, item):
n_edges = g.number_of_edges(etype) df = df.copy()
with g.local_scope(): df['train_mask'] = np.ones((len(df),), dtype=np.bool)
def splits(edges): df['val_mask'] = np.zeros((len(df),), dtype=np.bool)
num_edges, count = edges.data['train_mask'].shape df['test_mask'] = np.zeros((len(df),), dtype=np.bool)
df = df.sort_values([item, timestamp])
# sort by timestamp for track_id in df[item].unique():
_, sorted_idx = edges.data[column].sort(1) idx = (df[item] == track_id).to_numpy().nonzero()[0]
idx = df.index[idx]
train_mask = edges.data['train_mask'] if len(idx) > 1:
val_mask = edges.data['val_mask'] df.loc[idx[-1], 'train_mask'] = False
test_mask = edges.data['test_mask'] df.loc[idx[-1], 'test_mask'] = True
if len(idx) > 2:
x = torch.arange(num_edges) df.loc[idx[-2], 'train_mask'] = False
df.loc[idx[-2], 'val_mask'] = True
# If one user has more than one interactions, select the latest one for test. df = df.sort_index()
if count > 1: return df['train_mask'].to_numpy().nonzero()[0], \
train_mask[x, sorted_idx[:, -1]] = False df['val_mask'].to_numpy().nonzero()[0], \
test_mask[x, sorted_idx[:, -1]] = True df['test_mask'].to_numpy().nonzero()[0]
# If one user has more than two interactions, select the second latest one for validation.
if count > 2:
train_mask[x, sorted_idx[:, -2]] = False
val_mask[x, sorted_idx[:, -2]] = True
return {'train_mask': train_mask, 'val_mask': val_mask, 'test_mask': test_mask}
g.edges[etype].data['train_mask'] = torch.ones(n_edges, dtype=torch.bool)
g.edges[etype].data['val_mask'] = torch.zeros(n_edges, dtype=torch.bool)
g.edges[etype].data['test_mask'] = torch.zeros(n_edges, dtype=torch.bool)
g.nodes[itype].data['count'] = g.in_degrees(etype=etype)
g.group_apply_edges('src', splits, etype=etype)
train_indices = g.filter_edges(lambda edges: edges.data['train_mask'], etype=etype)
val_indices = g.filter_edges(lambda edges: edges.data['val_mask'], etype=etype)
test_indices = g.filter_edges(lambda edges: edges.data['test_mask'], etype=etype)
return train_indices, val_indices, test_indices
def build_train_graph(g, train_indices, utype, itype, etype, etype_rev): def build_train_graph(g, train_indices, utype, itype, etype, etype_rev):
train_g = g.edge_subgraph( train_g = g.edge_subgraph(
......
...@@ -115,7 +115,7 @@ g.edges['watched-by'].data['timestamp'] = torch.LongTensor(ratings['timestamp']. ...@@ -115,7 +115,7 @@ g.edges['watched-by'].data['timestamp'] = torch.LongTensor(ratings['timestamp'].
# Train-validation-test split # Train-validation-test split
# This is a little bit tricky as we want to select the last interaction for test, and the # This is a little bit tricky as we want to select the last interaction for test, and the
# second-to-last interaction for validation. # second-to-last interaction for validation.
train_indices, val_indices, test_indices = train_test_split_by_time(g, 'timestamp', 'watched', 'movie') train_indices, val_indices, test_indices = train_test_split_by_time(ratings, 'timestamp', 'movie_id')
# Build the graph with training interactions only. # Build the graph with training interactions only.
train_g = build_train_graph(g, train_indices, 'user', 'movie', 'watched', 'watched-by') train_g = build_train_graph(g, train_indices, 'user', 'movie', 'watched', 'watched-by')
......
...@@ -52,7 +52,7 @@ g.edges['listened'].data['created_at'] = torch.LongTensor(events['created_at'].v ...@@ -52,7 +52,7 @@ g.edges['listened'].data['created_at'] = torch.LongTensor(events['created_at'].v
g.edges['listened-by'].data['created_at'] = torch.LongTensor(events['created_at'].values) g.edges['listened-by'].data['created_at'] = torch.LongTensor(events['created_at'].values)
n_edges = g.number_of_edges('listened') n_edges = g.number_of_edges('listened')
train_indices, val_indices, test_indices = train_test_split_by_time(g, 'created_at', 'listened', 'track') train_indices, val_indices, test_indices = train_test_split_by_time(events, 'created_at', 'track_id')
train_g = build_train_graph(g, train_indices, 'user', 'track', 'listened', 'listened-by') train_g = build_train_graph(g, train_indices, 'user', 'track', 'listened', 'listened-by')
val_matrix, test_matrix = build_val_test_matrix( val_matrix, test_matrix = build_val_test_matrix(
g, val_indices, test_indices, 'user', 'track', 'listened') g, val_indices, test_indices, 'user', 'track', 'listened')
......
...@@ -98,15 +98,15 @@ def main(args): ...@@ -98,15 +98,15 @@ def main(args):
model.cuda() model.cuda()
# train sampler # train sampler
sampler = dgl.sampling.MultiLayerNeighborSampler([args.fanout] * args.n_layers) sampler = dgl.dataloading.MultiLayerNeighborSampler([args.fanout] * args.n_layers)
loader = dgl.sampling.NodeDataLoader( loader = dgl.dataloading.NodeDataLoader(
g, {category: train_idx}, sampler, g, {category: train_idx}, sampler,
batch_size=args.batch_size, shuffle=True, num_workers=0) batch_size=args.batch_size, shuffle=True, num_workers=0)
# validation sampler # validation sampler
# we do not use full neighbor to save computation resources # we do not use full neighbor to save computation resources
val_sampler = dgl.sampling.MultiLayerNeighborSampler([args.fanout] * args.n_layers) val_sampler = dgl.dataloading.MultiLayerNeighborSampler([args.fanout] * args.n_layers)
val_loader = dgl.sampling.NodeDataLoader( val_loader = dgl.dataloading.NodeDataLoader(
g, {category: val_idx}, val_sampler, g, {category: val_idx}, val_sampler,
batch_size=args.batch_size, shuffle=True, num_workers=0) batch_size=args.batch_size, shuffle=True, num_workers=0)
......
...@@ -106,7 +106,7 @@ class RelGraphConvLayer(nn.Module): ...@@ -106,7 +106,7 @@ class RelGraphConvLayer(nn.Module):
inputs_src = inputs inputs_src = inputs
inputs_dst = {k: v[:g.number_of_dst_nodes(k)] for k, v in inputs.items()} inputs_dst = {k: v[:g.number_of_dst_nodes(k)] for k, v in inputs.items()}
else: else:
inputs_src, inputs_dst = inputs inputs_src = inputs_dst = inputs
hs = self.conv(g, inputs, mod_kwargs=wdict) hs = self.conv(g, inputs, mod_kwargs=wdict)
...@@ -232,8 +232,8 @@ class EntityClassify(nn.Module): ...@@ -232,8 +232,8 @@ class EntityClassify(nn.Module):
self.h_dim if l != len(self.layers) - 1 else self.out_dim) self.h_dim if l != len(self.layers) - 1 else self.out_dim)
for k in g.ntypes} for k in g.ntypes}
sampler = dgl.sampling.MultiLayerNeighborSampler([None]) sampler = dgl.dataloading.MultiLayerFullNeighborSampler(1)
dataloader = dgl.sampling.NodeDataLoader( dataloader = dgl.dataloading.NodeDataLoader(
g, g,
{k: th.arange(g.number_of_nodes(k)) for k in g.ntypes}, {k: th.arange(g.number_of_nodes(k)) for k in g.ntypes},
sampler, sampler,
......
...@@ -13,6 +13,7 @@ from . import container ...@@ -13,6 +13,7 @@ from . import container
from . import distributed from . import distributed
from . import random from . import random
from . import sampling from . import sampling
from . import dataloading
from . import ops from . import ops
from ._ffi.runtime_ctypes import TypeCode from ._ffi.runtime_ctypes import TypeCode
......
...@@ -908,7 +908,7 @@ def ones(shape, dtype, ctx): ...@@ -908,7 +908,7 @@ def ones(shape, dtype, ctx):
pass pass
def uniform(shape, dtype, ctx, low, high): def uniform(shape, dtype, ctx, low, high):
"""Crear a tensor with random value in an uniform """Create a tensor with random value in a uniform
distribution between low (inclusive) and high (exclusive). distribution between low (inclusive) and high (exclusive).
Parameters Parameters
...@@ -927,6 +927,26 @@ def uniform(shape, dtype, ctx, low, high): ...@@ -927,6 +927,26 @@ def uniform(shape, dtype, ctx, low, high):
""" """
pass pass
def randint(shape, dtype, ctx, low, high):
"""Create a tensor with random value in a uniform integer
distribution between low (inclusive) and high (exclusive)
Parameters
----------
shape : tuple of int
The tensor shape.
dtype : data type
It should be one of the values in the data type dict.
ctx : context
The device of the result tensor.
Returns
-------
Tensor
The random tensor.
"""
pass
def pad_packed_tensor(input, lengths, value, l_min=None): def pad_packed_tensor(input, lengths, value, l_min=None):
r"""Pads a packed batch of variable length tensors with given value. r"""Pads a packed batch of variable length tensors with given value.
......
...@@ -276,6 +276,9 @@ def ones(shape, dtype, ctx): ...@@ -276,6 +276,9 @@ def ones(shape, dtype, ctx):
def uniform(shape, dtype, ctx, low, high): def uniform(shape, dtype, ctx, low, high):
return nd.random.uniform(low, high, ctx=ctx, dtype=dtype, shape=shape) return nd.random.uniform(low, high, ctx=ctx, dtype=dtype, shape=shape)
def randint(shape, dtype, ctx, low, high):
return nd.random.randint(low, high, ctx=ctx, dtype=dtype, shape=shape)
def pad_packed_tensor(input, lengths, value, l_min=None): def pad_packed_tensor(input, lengths, value, l_min=None):
old_shape = input.shape old_shape = input.shape
if isinstance(lengths, nd.NDArray): if isinstance(lengths, nd.NDArray):
......
...@@ -216,6 +216,9 @@ def ones(shape, dtype, ctx): ...@@ -216,6 +216,9 @@ def ones(shape, dtype, ctx):
def uniform(shape, dtype, ctx, low, high): def uniform(shape, dtype, ctx, low, high):
return th.empty(shape, dtype=dtype, device=ctx).uniform_(low, high) return th.empty(shape, dtype=dtype, device=ctx).uniform_(low, high)
def randint(shape, dtype, ctx, low, high):
return th.randint(low, high, shape, dtype=dtype, device=ctx)
def pad_packed_tensor(input, lengths, value, l_min=None): def pad_packed_tensor(input, lengths, value, l_min=None):
old_shape = input.shape old_shape = input.shape
if isinstance(lengths, th.Tensor): if isinstance(lengths, th.Tensor):
......
...@@ -336,6 +336,12 @@ def uniform(shape, dtype, ctx, low, high): ...@@ -336,6 +336,12 @@ def uniform(shape, dtype, ctx, low, high):
return t return t
def randint(shape, dtype, ctx, low, high):
with tf.device(ctx):
t = tf.random.uniform(shape, dtype=dtype, minval=low, maxval=high)
return t
def pad_packed_tensor(input, lengths, value, l_min=None): def pad_packed_tensor(input, lengths, value, l_min=None):
old_shape = input.shape old_shape = input.shape
if isinstance(lengths, tf.Tensor): if isinstance(lengths, tf.Tensor):
......
"""Classes that involves iterating over nodes or edges in a graph and generates
computation dependency of necessary nodes with neighborhood sampling methods.
This includes
* :py:class:`~dgl.dataloading.pytorch.NodeDataLoader`` for iterating over the nodes in
a graph in minibatches.
* :py:class:`~dgl.dataloading.pytorch.EdgeDataLoader`` for iterating over the edges in
a graph in minibatches.
* Various sampler classes that perform neighborhood sampling for multi-layer GNNs.
* Negative samplers for link prediction.
NOTE: this module is experimental and the interfaces may be subject to changes in
future releases.
"""
from .neighbor import *
from .dataloader import *
from . import negative_sampler
from .. import backend as F
if F.get_preferred_backend() == 'pytorch':
from .pytorch import *
This diff is collapsed.
"""Negative samplers"""
from collections.abc import Mapping
from .. import backend as F
class _BaseNegativeSampler(object):
def _generate(self, g, eids, canonical_etype):
raise NotImplementedError
def __call__(self, g, eids):
"""Returns negative examples.
Parameters
----------
g : DGLHeteroGraph
The graph.
eids : Tensor or dict[etype, Tensor]
The sampled edges in the minibatch.
Returns
-------
tuple[Tensor, Tensor] or dict[etype, tuple[Tensor, Tensor]]
The returned source-destination pairs as negative examples.
"""
if isinstance(eids, Mapping):
eids = {g.to_canonical_etype(k): v for k, v in eids.items()}
neg_pair = {k: self._generate(g, v, k) for k, v in eids.items()}
else:
assert len(g.etypes) == 1, \
'please specify a dict of etypes and ids for graphs with multiple edge types'
neg_pair = self._generate(g, eids, g.canonical_etypes[0])
return neg_pair
class Uniform(_BaseNegativeSampler):
"""Negative sampler that randomly chooses negative destination nodes
for each source node according to a uniform distribution.
For each edge ``(u, v)`` of type ``(srctype, etype, dsttype)``, DGL generates
:attr:`k` pairs of negative edges ``(u, v')``, where ``v'`` is chosen
uniformly from all the nodes of type ``dsttype``. The resulting edges will
also have type ``(srctype, etype, dsttype)``.
Parameters
----------
k : int
The number of negative examples per edge.
Examples
--------
>>> g = dgl.graph(([0, 1, 2], [1, 2, 3]))
>>> neg_sampler = dgl.sampling.negative_sampler.Uniform(2)
>>> neg_sampler(g, [0, 1])
(tensor([0, 0, 1, 1]), tensor([1, 0, 2, 3]))
"""
def __init__(self, k):
self.k = k
def _generate(self, g, eids, canonical_etype):
_, _, vtype = canonical_etype
shape = F.shape(eids)
dtype = F.dtype(eids)
ctx = F.context(eids)
shape = (shape[0] * self.k,)
src, _ = g.find_edges(eids, etype=canonical_etype)
src = F.repeat(src, self.k, 0)
dst = F.randint(shape, dtype, ctx, 0, g.number_of_nodes(vtype))
return src, dst
"""Data loading components for neighbor sampling"""
from .dataloader import BlockSampler
from .. import sampling, subgraph
class MultiLayerNeighborSampler(BlockSampler):
"""Sampler that builds computational dependency of node representations via
neighbor sampling for multilayer GNN.
This sampler will make every node gather messages from a fixed number of neighbors
per edge type. The neighbors are picked uniformly.
Parameters
----------
fanouts : list[int] or list[dict[etype, int] or None]
List of neighbors to sample per edge type for each GNN layer, starting from the
first layer.
If the graph is homogeneous, only an integer is needed for each layer.
If None is provided for one layer, all neighbors will be included regardless of
edge types.
If -1 is provided for one edge type on one layer, then all inbound edges
of that edge type will be included.
replace : bool, default True
Whether to sample with replacement
return_eids : bool, default False
Whether to return the edge IDs involved in message passing in the block.
If True, the edge IDs will be stored as an edge feature named ``dgl.EID``.
Examples
--------
To train a 3-layer GNN for node classification on a set of nodes ``train_nid`` on
a homogeneous graph where each node takes messages from 5, 10, 15 neighbors for
the first, second, and third layer respectively (assuming the backend is PyTorch):
>>> sampler = dgl.dataloading.MultiLayerNeighborSampler([5, 10, 15])
>>> collator = dgl.dataloading.NodeCollator(g, train_nid, sampler)
>>> dataloader = torch.utils.data.DataLoader(
... collator.dataset, collate_fn=collator.collate,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for blocks in dataloader:
... train_on(blocks)
If training on a heterogeneous graph and you want different number of neighbors for each
edge type, one should instead provide a list of dicts. Each dict would specify the
number of neighbors to pick per edge type.
>>> sampler = dgl.dataloading.MultiLayerNeighborSampler([
... {('user', 'follows', 'user'): 5,
... ('user', 'plays', 'game'): 4,
... ('game', 'played-by', 'user'): 3}] * 3)
"""
def __init__(self, fanouts, replace=False, return_eids=False):
super().__init__(len(fanouts), return_eids)
self.fanouts = fanouts
self.replace = replace
def sample_frontier(self, block_id, g, seed_nodes):
fanout = self.fanouts[block_id]
if fanout is None:
frontier = subgraph.in_subgraph(g, seed_nodes)
else:
frontier = sampling.sample_neighbors(g, seed_nodes, fanout, replace=self.replace)
return frontier
class MultiLayerFullNeighborSampler(MultiLayerNeighborSampler):
"""Sampler that builds computational dependency of node representations by taking messages
from all neighbors for multilayer GNN.
This sampler will make every node gather messages from every single neighbor per edge type.
Parameters
----------
n_layers : int
The number of GNN layers to sample.
return_eids : bool, default False
Whether to return the edge IDs involved in message passing in the block.
If True, the edge IDs will be stored as an edge feature named ``dgl.EID``.
Examples
--------
To train a 3-layer GNN for node classification on a set of nodes ``train_nid`` on
a homogeneous graph where each node takes messages from all neighbors for the first,
second, and third layer respectively (assuming the backend is PyTorch):
>>> sampler = dgl.dataloading.MultiLayerFullNeighborSampler(3)
>>> collator = dgl.dataloading.NodeCollator(g, train_nid, sampler)
>>> dataloader = torch.utils.data.DataLoader(
... collator.dataset, collate_fn=collator.collate,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for blocks in dataloader:
... train_on(blocks)
"""
def __init__(self, n_layers, return_eids=False):
super().__init__([None] * n_layers, return_eids=return_eids)
"""DGL PyTorch DataLoaders"""
import inspect
from torch.utils.data import DataLoader
from ..dataloader import NodeCollator, EdgeCollator
class NodeDataLoader(DataLoader):
"""PyTorch dataloader for batch-iterating over a set of nodes, generating the list
of blocks as computation dependency of the said minibatch.
Parameters
----------
g : DGLHeteroGraph
The graph.
nids : Tensor or dict[ntype, Tensor]
The node set to compute outputs.
block_sampler : :py:class:`~dgl.dataloading.BlockSampler`
The neighborhood sampler.
kwargs : dict
Arguments being passed to ``torch.utils.data.DataLoader``.
Examples
--------
To train a 3-layer GNN for node classification on a set of nodes ``train_nid`` on
a homogeneous graph where each node takes messages from all neighbors (assume
the backend is PyTorch):
>>> sampler = dgl.dataloading.NeighborSampler([None, None, None])
>>> dataloader = dgl.dataloading.NodeDataLoader(
... g, train_nid, sampler,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, output_nodes, blocks in dataloader:
... train_on(input_nodes, output_nodes, blocks)
"""
collator_arglist = inspect.getfullargspec(NodeCollator).args
def __init__(self, g, nids, block_sampler, **kwargs):
collator_kwargs = {}
dataloader_kwargs = {}
for k, v in kwargs.items():
if k in self.collator_arglist:
collator_kwargs[k] = v
else:
dataloader_kwargs[k] = v
self.collator = NodeCollator(g, nids, block_sampler, **collator_kwargs)
super().__init__(
self.collator.dataset, collate_fn=self.collator.collate, **dataloader_kwargs)
class EdgeDataLoader(DataLoader):
"""PyTorch dataloader for batch-iterating over a set of edges, generating the list
of blocks as computation dependency of the said minibatch for edge classification,
edge regression, and link prediction.
Parameters
----------
g : DGLHeteroGraph
The graph.
nids : Tensor or dict[ntype, Tensor]
The node set to compute outputs.
block_sampler : :py:class:`~dgl.dataloading.BlockSampler`
The neighborhood sampler.
g_sampling : DGLHeteroGraph, optional
The graph where neighborhood sampling is performed.
One may wish to iterate over the edges in one graph while perform sampling in
another graph. This may be the case for iterating over validation and test
edge set while perform neighborhood sampling on the graph formed by only
the training edge set.
If None, assume to be the same as ``g``.
exclude : str, optional
Whether and how to exclude dependencies related to the sampled edges in the
minibatch. Possible values are
* None,
* ``reverse``,
* ``reverse_types``
See the docstring in :py:class:`~dgl.dataloading.EdgeCollator`.
reverse_edge_ids : Tensor or dict[etype, Tensor], optional
See the docstring in :py:class:`~dgl.dataloading.EdgeCollator`.
reverse_etypes : dict[etype, etype], optional
See the docstring in :py:class:`~dgl.dataloading.EdgeCollator`.
negative_sampler : callable, optional
The negative sampler.
See the docstring in :py:class:`~dgl.dataloading.EdgeCollator`.
kwargs : dict
Arguments being passed to `torch.utils.data.DataLoader`.
Examples
--------
The following example shows how to train a 3-layer GNN for edge classification on a
set of edges ``train_eid`` on a homogeneous undirected graph. Each node takes
messages from all neighbors.
Say that you have an array of source node IDs ``src`` and another array of destination
node IDs ``dst``. One can make it bidirectional by adding another set of edges
that connects from ``dst`` to ``src``:
>>> g = dgl.graph((torch.cat([src, dst]), torch.cat([dst, src])))
One can then know that the ID difference of an edge and its reverse edge is ``|E|``,
where ``|E|`` is the length of your source/destination array. The reverse edge
mapping can be obtained by
>>> E = len(src)
>>> reverse_eids = torch.cat([torch.arange(E, 2 * E), torch.arange(0, E)])
Note that the sampled edges as well as their reverse edges are removed from
computation dependencies of the incident nodes. This is a common trick to avoid
information leakage.
>>> sampler = dgl.dataloading.NeighborSampler([None, None, None])
>>> dataloader = dgl.dataloading.EdgeDataLoader(
... g, train_eid, sampler, exclude='reverse',
... reverse_eids=reverse_eids,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, pair_graph, blocks in dataloader:
... train_on(input_nodes, pair_graph, blocks)
To train a 3-layer GNN for link prediction on a set of edges ``train_eid`` on a
homogeneous graph where each node takes messages from all neighbors (assume the
backend is PyTorch), with 5 uniformly chosen negative samples per edge:
>>> sampler = dgl.dataloading.NeighborSampler([None, None, None])
>>> neg_sampler = dgl.dataloading.negative_sampler.Uniform(5)
>>> dataloader = dgl.dataloading.EdgeDataLoader(
... g, train_eid, sampler, exclude='reverse',
... reverse_eids=reverse_eids, negative_sampler=neg_sampler,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, pos_pair_graph, neg_pair_graph, blocks in dataloader:
... train_on(input_nodse, pair_graph, neg_pair_graph, blocks)
For heterogeneous graphs, the reverse of an edge may have a different edge type
from the original edge. For instance, consider that you have an array of
user-item clicks, representated by a user array ``user`` and an item array ``item``.
You may want to build a heterogeneous graph with a user-click-item relation and an
item-clicked-by-user relation.
>>> g = dgl.heterograph({
... ('user', 'click', 'item'): (user, item),
... ('item', 'clicked-by', 'user'): (item, user)})
To train a 3-layer GNN for edge classification on a set of edges ``train_eid`` with
type ``click``, you can write
>>> sampler = dgl.dataloading.NeighborSampler([None, None, None])
>>> dataloader = dgl.dataloading.EdgeDataLoader(
... g, {'click': train_eid}, sampler, exclude='reverse_types',
... reverse_etypes={'click': 'clicked-by', 'clicked-by': 'click'},
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, pair_graph, blocks in dataloader:
... train_on(input_nodes, pair_graph, blocks)
To train a 3-layer GNN for link prediction on a set of edges ``train_eid`` with type
``click``, you can write
>>> sampler = dgl.dataloading.NeighborSampler([None, None, None])
>>> neg_sampler = dgl.dataloading.negative_sampler.Uniform(5)
>>> dataloader = dgl.dataloading.EdgeDataLoader(
... g, train_eid, sampler, exclude='reverse_types',
... reverse_etypes={'click': 'clicked-by', 'clicked-by': 'click'},
... negative_sampler=neg_sampler,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, pos_pair_graph, neg_pair_graph, blocks in dataloader:
... train_on(input_nodse, pair_graph, neg_pair_graph, blocks)
See also
--------
:py:class:`~dgl.dataloading.EdgeCollator`
For end-to-end usages, please refer to the following tutorial/examples:
* Edge classification on heterogeneous graph: GCMC
* Link prediction on homogeneous graph: GraphSAGE for unsupervised learning
* Link prediction on heterogeneous graph: RGCN for link prediction.
"""
collator_arglist = inspect.getfullargspec(EdgeCollator).args
def __init__(self, g, eids, block_sampler, **kwargs):
collator_kwargs = {}
dataloader_kwargs = {}
for k, v in kwargs.items():
if k in self.collator_arglist:
collator_kwargs[k] = v
else:
dataloader_kwargs[k] = v
self.collator = EdgeCollator(g, eids, block_sampler, **collator_kwargs)
super().__init__(
self.collator.dataset, collate_fn=self.collator.collate, **dataloader_kwargs)
...@@ -5,9 +5,3 @@ This module contains the implementations of various sampling operators. ...@@ -5,9 +5,3 @@ This module contains the implementations of various sampling operators.
from .randomwalks import * from .randomwalks import *
from .pinsage import * from .pinsage import *
from .neighbor import * from .neighbor import *
from .dataloader import *
from .. import backend as F
if F.get_preferred_backend() == 'pytorch':
from .pytorch import *
"""Data loaders"""
from collections.abc import Mapping
from abc import ABC, abstractproperty, abstractmethod
from .. import transform
from ..base import NID, EID
from .. import utils
# pylint: disable=unused-argument
def assign_block_eids(block, frontier, block_id, g, seed_nodes, *args, **kwargs):
"""Assigns edge IDs from the original graph to the block.
This is the default block postprocessor for samplers created with
``return_eids`` as True.
See also
--------
BlockSampler
MultiLayerNeighborSampler
"""
for etype in block.canonical_etypes:
block.edges[etype].data[EID] = frontier.edges[etype].data[EID][
block.edges[etype].data[EID]]
return block
def _default_frontier_postprocessor(frontier, block_id, g, seed_nodes, *args, **kwargs):
return frontier
def _default_block_postprocessor(block, frontier, block_id, g, seed_nodes, *args, **kwargs):
return block
class BlockSampler(object):
"""Abstract class specifying the neighborhood sampling strategy for DGL data loaders.
The main method for BlockSampler is :func:`~dgl.sampling.BlockSampler.sample_blocks`,
which generates a list of blocks for a multi-layer GNN given a set of seed nodes to
have their outputs computed.
The default implementation of :py:meth:`~dgl.sampling.BlockSampler.sample_blocks` is
to repeat ``num_hops`` times the following:
* Obtain a frontier with the same nodes as the original graph but only the edges
involved in message passing on the last layer.
Customizable via :py:meth:`~dgl.sampling.BlockSampler.sample_frontier`.
* Optionally, post-process the obtained frontier (e.g. by removing edges connecting training
node pairs). One can add such postprocessors via
:py:meth:`~dgl.sampling.BlockSampler.add_frontier_postprocessor`.
* Convert the frontier into a block.
* Optionally, post-process the block (e.g. by assigning edge IDs). One can add such
postprocessors via
:py:meth:`~dgl.sampling.BlockSampler.add_block_postprocessor`.
* Prepend the block to the block list to be returned.
All subclasses should either
* Override :py:meth:`~dgl.sampling.BlockSampler.sample_blocks` method, or
* Override
:py:meth:`~dgl.sampling.BlockSampler.sample_frontier` method while specifying
the number of layers to sample in ``num_hops`` argument.
See also
--------
For the concept of frontiers and blocks, please refer to User Guide Section 6.
"""
def __init__(self, num_hops):
self.num_hops = num_hops
self._frontier_postprocessor = _default_frontier_postprocessor
self._block_postprocessor = _default_block_postprocessor
@property
def frontier_postprocessor(self):
"""Frontier postprocessor."""
return self._frontier_postprocessor
@property
def block_postprocessor(self):
"""B;pcl postprocessor."""
return self._block_postprocessor
def set_frontier_postprocessor(self, postprocessor):
"""Set a frontier postprocessor.
The postprocessor must have the following signature:
.. code::
postprocessor(frontier, block_id, g, seed_nodes, *args, **kwargs)
where
* ``frontier`` represents the frontier obtained by
:py:meth:`~dgl.sampling.BlockSampler.sample_frontier` method.
* ``block_id`` represents which GNN layer the block is currently generated for.
* ``g`` represents the original graph.
* ``seed_nodes`` represents the output nodes on the current layer.
* Other arguments are the same ones passed into
:py:meth:`~dgl.sampling.BlockSampler.sample_blocks` method.
Parameters
----------
postprocessor : callable
The postprocessor.
"""
self._frontier_postprocessor = postprocessor
def set_block_postprocessor(self, postprocessor):
"""Set a block postprocessor.
The postprocessor must have the following signature:
.. code::
postprocessor(block, frontier, block_id, g, seed_nodes, *args, **kwargs)
where
* ``block`` represents the block converted from the frontier.
* ``frontier`` represents the frontier the block is generated from.
* ``block_id`` represents which GNN layer the block is currently generated for.
* ``g`` represents the original graph.
* ``seed_nodes`` represents the output nodes on the current layer.
* Other arguments are the same ones passed into
:py:meth:`~dgl.sampling.BlockSampler.sample_blocks` method.
Parameters
----------
postprocessor : callable
The postprocessor.
"""
self._block_postprocessor = postprocessor
def _postprocess_frontier(self, frontier, block_id, g, seed_nodes, *args, **kwargs):
"""Post-processes the generated frontier."""
return self._frontier_postprocessor(
frontier, block_id, g, seed_nodes, *args, **kwargs)
def _postprocess_block(self, block, frontier, block_id, g, seed_nodes, *args, **kwargs):
"""Post-processes the generated block."""
return self._block_postprocessor(
block, frontier, block_id, g, seed_nodes, *args, **kwargs)
def sample_frontier(self, block_id, g, seed_nodes, *args, **kwargs):
"""
Generate the frontier given the output nodes.
Parameters
----------
block_id : int
Represents which GNN layer the frontier is generated for.
g : DGLHeteroGraph
The original graph.
seed_nodes : Tensor or dict[ntype, Tensor]
The output nodes by node type.
If the graph only has one node type, one can just specify a single tensor
of node IDs.
args, kwargs :
Other arguments being passed by
:py:meth:`~dgl.sampling.BlockSampler.sample_blocks`.
Returns
-------
DGLHeteroGraph
The frontier generated for the current layer.
See also
--------
For the concept of frontiers and blocks, please refer to User Guide Section 6.
"""
raise NotImplementedError
def sample_blocks(self, g, seed_nodes, *args, **kwargs):
"""
Generate the a list of blocks given the output nodes.
Parameters
----------
g : DGLHeteroGraph
The original graph.
seed_nodes : Tensor or dict[ntype, Tensor]
The output nodes by node type.
If the graph only has one node type, one can just specify a single tensor
of node IDs.
args, kwargs :
Other arguments being passed by
:py:meth:`~dgl.sampling.BlockSampler.sample_blocks`.
Returns
-------
list[DGLHeteroGraph]
The blocks generated for computing the multi-layer GNN output.
See also
--------
For the concept of frontiers and blocks, please refer to User Guide Section 6.
"""
blocks = []
for block_id in reversed(range(self.num_hops)):
frontier = self.sample_frontier(block_id, g, seed_nodes, *args, **kwargs)
# Removing edges from the frontier for link prediction training falls
# into the category of frontier postprocessing
frontier = self._postprocess_frontier(
frontier, block_id, g, seed_nodes, *args, **kwargs)
block = transform.to_block(frontier, seed_nodes)
# Assigning edge IDs and/or node/edge features falls into the category of block
# postprocessing
block = self._postprocess_block(
block, frontier, block_id, g, seed_nodes, *args, **kwargs)
seed_nodes = {ntype: block.srcnodes[ntype].data[NID] for ntype in block.srctypes}
blocks.insert(0, block)
return blocks
class Collator(ABC):
"""
Abstract DGL collator for training GNNs on downstream tasks stochastically.
Provides a ``dataset`` object containing the collection of all nodes or edges,
as well as a ``collate`` method that combines a set of items from ``dataset`` and
obtains the blocks.
See also
--------
For the concept of blocks, please refer to User Guide Section 6.
"""
@abstractproperty
def dataset(self):
"""Returns the dataset object of the collator."""
raise NotImplementedError
@abstractmethod
def collate(self, items):
"""Combines the items from the dataset object and obtains the list of blocks.
Parameters
----------
items : list[str, int]
The list of node or edge type-ID pairs.
See also
--------
For the concept of blocks, please refer to User Guide Section 6.
"""
raise NotImplementedError
class NodeCollator(Collator):
"""
DGL collator to combine training node classification or regression on a single graph.
Parameters
----------
g : DGLHeteroGraph
The graph.
nids : Tensor or dict[ntype, Tensor]
The node set to compute outputs.
block_sampler : :py:class:`~dgl.sampling.BlockSampler`
The neighborhood sampler.
Examples
--------
To train a 3-layer GNN for node classification on a set of nodes ``train_nid`` on
a homogeneous graph where each node takes messages from all neighbors (assume
the backend is PyTorch):
>>> sampler = dgl.sampling.NeighborSampler([None, None, None])
>>> collator = dgl.sampling.NodeCollator(g, train_nid, sampler)
>>> dataloader = torch.utils.data.DataLoader(
... collator.dataset, collate_fn=collator.collate,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, output_nodes, blocks in dataloader:
... train_on(input_nodes, output_nodes, blocks)
"""
def __init__(self, g, nids, block_sampler):
self.g = g
if not isinstance(nids, Mapping):
assert len(g.ntypes) == 1, \
"nids should be a dict of node type and ids for graph with multiple node types"
self.nids = nids
self.block_sampler = block_sampler
if isinstance(nids, Mapping):
self._dataset = utils.FlattenedDict(nids)
else:
self._dataset = nids
@property
def dataset(self):
return self._dataset
def collate(self, items):
"""Find the list of blocks necessary for computing the representation of given
nodes for a node classification/regression task.
Returns
-------
input_nodes : Tensor or dict[ntype, Tensor]
The input nodes necessary for computation in this minibatch.
If the original graph has multiple node types, return a dictionary of
node type names and node ID tensors. Otherwise, return a single tensor.
output_nodes : Tensor or dict[ntype, Tensor]
The nodes whose representations are to be computed in this minibatch.
If the original graph has multiple node types, return a dictionary of
node type names and node ID tensors. Otherwise, return a single tensor.
blocks : list[DGLHeteroGraph]
The list of blocks necessary for computing the representation.
"""
if isinstance(items[0], tuple):
# returns a list of pairs: group them by node types into a dict
items = utils.group_as_dict(items)
blocks = self.block_sampler.sample_blocks(self.g, items)
if len(self.g.ntypes) == 1:
output_nodes = blocks[-1].dstdata[NID]
input_nodes = blocks[0].srcdata[NID]
else:
output_nodes = {
ntype: blocks[-1].dstnodes[ntype].data[NID]
for ntype in blocks[-1].dsttypes}
input_nodes = {
ntype: blocks[0].srcnodes[ntype].data[NID]
for ntype in blocks[0].srctypes}
return input_nodes, output_nodes, blocks
...@@ -6,13 +6,10 @@ from ..base import DGLError, EID ...@@ -6,13 +6,10 @@ from ..base import DGLError, EID
from ..heterograph import DGLHeteroGraph from ..heterograph import DGLHeteroGraph
from .. import ndarray as nd from .. import ndarray as nd
from .. import utils from .. import utils
from .. import subgraph as subg
from .dataloader import BlockSampler, assign_block_eids
__all__ = [ __all__ = [
'sample_neighbors', 'sample_neighbors',
'select_topk', 'select_topk']
'MultiLayerNeighborSampler']
def sample_neighbors(g, nodes, fanout, edge_dir='in', prob=None, replace=False): def sample_neighbors(g, nodes, fanout, edge_dir='in', prob=None, replace=False):
"""Sample neighboring edges of the given nodes and return the induced subgraph. """Sample neighboring edges of the given nodes and return the induced subgraph.
...@@ -235,74 +232,4 @@ def select_topk(g, k, weight, nodes=None, edge_dir='in', ascending=False): ...@@ -235,74 +232,4 @@ def select_topk(g, k, weight, nodes=None, edge_dir='in', ascending=False):
ret.edges[etype].data[EID] = induced_edges[i] ret.edges[etype].data[EID] = induced_edges[i]
return ret return ret
class MultiLayerNeighborSampler(BlockSampler):
"""Sampler that builds computational dependency of node representations via
neighbor sampling for multilayer GNN.
This sampler will make every node gather messages from a fixed number of neighbors
per edge type. The neighbors are picked uniformly.
Parameters
----------
fanouts : list[int] or list[dict[etype, int] or None]
List of neighbors to sample per edge type for each GNN layer, starting from the
first layer.
If the graph is homogeneous, only an integer is needed for each layer.
If None is provided for one layer, all neighbors will be included regardless of
edge types.
If -1 is provided for one edge type on one layer, then all inbound edges
of that edge type will be included.
replace : bool, default True
Whether to sample with replacement
return_eids : bool, default False
Whether to return edge IDs of the original graph in the sampled blocks.
If True, the edge IDs will be stored as ``dgl.EID`` feature for each edge type.
Examples
--------
To train a 3-layer GNN for node classification on a set of nodes ``train_nid`` on
a homogeneous graph where each node takes messages from all neighbors (assume
the backend is PyTorch):
>>> sampler = dgl.sampling.NeighborSampler([None, None, None])
>>> collator = dgl.sampling.NodeCollator(g, train_nid, sampler)
>>> dataloader = torch.utils.data.DataLoader(
... collator.dataset, collate_fn=collator.collate,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for blocks in dataloader:
... train_on(blocks)
If we wish to gather from 5 neighbors on the first layer, 10 neighbors on the second,
and 15 layers on the third:
>>> sampler = dgl.sampling.NeighborSampler([5, 10, 15])
If training on a heterogeneous graph and you want different number of neighbors for each
edge type, one should instead provide a list of dicts. Each dict would specify the
number of neighbors to pick per edge type.
>>> sampler = dgl.sampling.NeighborSampler([
... {('user', 'follows', 'user'): 5,
... ('user', 'plays', 'game'): 4,
... ('game', 'played-by', 'user'): 3}] * 3)
"""
def __init__(self, fanouts, replace=False, return_eids=False):
super().__init__(len(fanouts))
self.fanouts = fanouts
self.replace = replace
self.return_eids = return_eids
if return_eids:
self.set_block_postprocessor(assign_block_eids)
def sample_frontier(self, block_id, g, seed_nodes, *args, **kwargs):
fanout = self.fanouts[block_id]
if fanout is None:
frontier = subg.in_subgraph(g, seed_nodes)
else:
frontier = sample_neighbors(g, seed_nodes, fanout, replace=self.replace)
return frontier
_init_api('dgl.sampling.neighbor', __name__) _init_api('dgl.sampling.neighbor', __name__)
"""DGL PyTorch DataLoaders"""
from torch.utils.data import DataLoader
from ..dataloader import NodeCollator
class NodeDataLoader(DataLoader):
"""PyTorch dataloader for batch-iterating over a set of nodes, generating the list
of blocks as computation dependency of the said minibatch.
Parameters
----------
g : DGLHeteroGraph
The graph.
nids : Tensor or dict[ntype, Tensor]
The node set to compute outputs.
block_sampler : :py:class:`~dgl.sampling.BlockSampler`
The neighborhood sampler.
kwargs : dict
Arguments being passed to `torch.utils.data.DataLoader`.
Examples
--------
To train a 3-layer GNN for node classification on a set of nodes ``train_nid`` on
a homogeneous graph where each node takes messages from all neighbors (assume
the backend is PyTorch):
>>> sampler = dgl.sampling.NeighborSampler([None, None, None])
>>> dataloader = dgl.sampling.NodeDataLoader(
... g, train_nid, sampler,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, output_nodes, blocks in dataloader:
... train_on(input_nodes, output_nodes, blocks)
"""
def __init__(self, g, nids, block_sampler, **kwargs):
self.collator = NodeCollator(g, nids, block_sampler)
super().__init__(self.collator.dataset, collate_fn=self.collator.collate, **kwargs)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment