Unverified Commit d41d07d0 authored by Quan (Andy) Gan's avatar Quan (Andy) Gan Committed by GitHub
Browse files

[Doc and bugfix] Add docs and user guide and update tutorial for sampling pipeline (#3774)



* huuuuge update

* remove

* lint

* lint

* fix

* what happened to nccl

* update multi-gpu unsupervised graphsage example

* replace most of the dgl.mp.process with torch.mp.spawn

* update if condition for use_uva case

* update user guide

* address comments

* incorporating suggestions from @jermainewang

* oops

* fix tutorial to pass CI

* oops

* fix again
Co-authored-by: default avatarXin Yao <xiny@nvidia.com>
parent 3bd5a9b6
......@@ -11,7 +11,6 @@ import tqdm
# (This is a long-standing issue)
from ogb.linkproppred import DglLinkPropPredDataset
USE_UVA = False
device = 'cuda'
def to_bidirected_with_reverse_mapping(g):
......@@ -119,26 +118,21 @@ def evaluate(model, edge_split, device, num_workers):
dataset = DglLinkPropPredDataset('ogbl-citation2')
graph = dataset[0]
graph, reverse_eids = to_bidirected_with_reverse_mapping(graph)
seed_edges = torch.arange(graph.num_edges())
reverse_eids = reverse_eids.to(device)
seed_edges = torch.arange(graph.num_edges()).to(device)
edge_split = dataset.get_edge_split()
model = SAGE(graph.ndata['feat'].shape[1], 256).to(device)
opt = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=5e-4)
if not USE_UVA:
graph = graph.to(device)
reverse_eids = reverse_eids.to(device)
seed_edges = torch.arange(graph.num_edges()).to(device)
sampler = dgl.dataloading.NeighborSampler([15, 10, 5], prefetch_node_feats=['feat'])
dataloader = dgl.dataloading.EdgeDataLoader(
sampler = dgl.dataloading.as_edge_prediction_sampler(
sampler, exclude='reverse_id', reverse_eids=reverse_eids,
negative_sampler=dgl.dataloading.negative_sampler.Uniform(1))
dataloader = dgl.dataloading.DataLoader(
graph, seed_edges, sampler,
device=device, batch_size=512, shuffle=True,
drop_last=False, num_workers=0,
exclude='reverse_id',
reverse_eids=reverse_eids,
negative_sampler=dgl.dataloading.negative_sampler.Uniform(1),
use_uva=USE_UVA)
drop_last=False, num_workers=0, use_uva=True)
durations = []
for epoch in range(10):
......
......@@ -11,8 +11,6 @@ import numpy as np
from ogb.nodeproppred import DglNodePropPredDataset
import tqdm
USE_UVA = True
class SAGE(nn.Module):
def __init__(self, in_feats, n_hidden, n_classes):
super().__init__()
......@@ -70,18 +68,18 @@ def train(rank, world_size, graph, num_classes, split_idx):
train_idx, valid_idx, test_idx = split_idx['train'], split_idx['valid'], split_idx['test']
if USE_UVA:
train_idx = train_idx.to('cuda')
train_idx = train_idx.to('cuda')
valid_idx = valid_idx.to('cuda')
sampler = dgl.dataloading.NeighborSampler(
[15, 10, 5], prefetch_node_feats=['feat'], prefetch_labels=['label'])
train_dataloader = dgl.dataloading.NodeDataLoader(
train_dataloader = dgl.dataloading.DataLoader(
graph, train_idx, sampler,
device='cuda', batch_size=1000, shuffle=True, drop_last=False,
num_workers=0, use_ddp=True, use_uva=USE_UVA)
num_workers=0, use_ddp=True, use_uva=True)
valid_dataloader = dgl.dataloading.NodeDataLoader(
graph, valid_idx, sampler, device='cuda', batch_size=1024, shuffle=True,
drop_last=False, num_workers=0, use_uva=USE_UVA)
drop_last=False, num_workers=0, use_uva=True)
durations = []
for _ in range(10):
......
......@@ -10,8 +10,6 @@ from ogb.nodeproppred import DglNodePropPredDataset
import tqdm
import argparse
USE_UVA = True # Set to True for UVA sampling
class SAGE(nn.Module):
def __init__(self, in_feats, n_hidden, n_classes):
super().__init__()
......@@ -64,24 +62,21 @@ graph.ndata['label'] = labels.squeeze()
split_idx = dataset.get_idx_split()
train_idx, valid_idx, test_idx = split_idx['train'], split_idx['valid'], split_idx['test']
if not USE_UVA:
graph = graph.to('cuda')
train_idx = train_idx.to('cuda')
valid_idx = valid_idx.to('cuda')
test_idx = test_idx.to('cuda')
device = 'cuda'
train_idx = train_idx.to(device)
valid_idx = valid_idx.to(device)
model = SAGE(graph.ndata['feat'].shape[1], 256, dataset.num_classes).to(device)
opt = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=5e-4)
sampler = dgl.dataloading.NeighborSampler(
[15, 10, 5], prefetch_node_feats=['feat'], prefetch_labels=['label'])
train_dataloader = dgl.dataloading.NodeDataLoader(
train_dataloader = dgl.dataloading.DataLoader(
graph, train_idx, sampler, device=device, batch_size=1024, shuffle=True,
drop_last=False, num_workers=0, use_uva=USE_UVA)
drop_last=False, num_workers=0, use_uva=True)
valid_dataloader = dgl.dataloading.NodeDataLoader(
graph, valid_idx, sampler, device=device, batch_size=1024, shuffle=True,
drop_last=False, num_workers=0, use_uva=USE_UVA)
drop_last=False, num_workers=0, use_uva=True)
durations = []
for _ in range(10):
......@@ -119,6 +114,8 @@ print(np.mean(durations[4:]), np.std(durations[4:]))
# Test accuracy and offline inference of all nodes
model.eval()
with torch.no_grad():
pred = model.inference(graph, device, 4096, 12 if USE_UVA else 0, graph.device)
acc = MF.accuracy(pred.to(graph.device), graph.ndata['label'])
pred = model.inference(graph, device, 4096, 12, graph.device)
pred = pred[test_idx]
label = graph.ndata['label'][test_idx]
acc = MF.accuracy(pred, label)
print('Test acc:', acc.item())
import dgl
import numpy as np
import torch as th
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import dgl.nn.pytorch as dglnn
import time
import argparse
import tqdm
from model import SAGE
from load_graph import load_reddit, inductive_split, load_ogb
def compute_acc(pred, labels):
"""
Compute the accuracy of prediction given the labels.
"""
labels = labels.long()
return (th.argmax(pred, dim=1) == labels).float().sum() / len(pred)
def evaluate(model, g, nfeat, labels, val_nid, device):
"""
Evaluate the model on the validation set specified by ``val_nid``.
g : The entire graph.
inputs : The features of all the nodes.
labels : The labels of all the nodes.
val_nid : the node Ids for validation.
device : The GPU device to evaluate on.
"""
model.eval()
with th.no_grad():
pred = model.inference(g, nfeat, device, args.batch_size, args.num_workers)
model.train()
return compute_acc(pred[val_nid], labels[val_nid].to(pred.device))
def load_subtensor(nfeat, labels, seeds, input_nodes, device):
"""
Extracts features and labels for a subset of nodes
"""
batch_inputs = nfeat[input_nodes].to(device)
batch_labels = labels[seeds].to(device)
return batch_inputs, batch_labels
#### Entry point
def run(args, device, data):
# Unpack data
n_classes, train_g, val_g, test_g, train_nfeat, train_labels, \
val_nfeat, val_labels, test_nfeat, test_labels = data
in_feats = train_nfeat.shape[1]
test_nid = test_g.ndata.pop('test_mask',
~(test_g.ndata['train_mask'] | test_g.ndata['val_mask'])).nonzero().squeeze()
train_nid = train_g.ndata.pop('train_mask').nonzero().squeeze()
val_nid = val_g.ndata.pop('val_mask').nonzero().squeeze()
if args.graph_device == 'gpu':
train_nid = train_nid.to(device)
# copy only the csc to the GPU
train_g = train_g.formats(['csc'])
train_g = train_g.to(device)
args.num_workers = 0
elif args.graph_device == 'uva':
train_nid = train_nid.to(device)
train_g = train_g.formats(['csc'])
train_g.pin_memory_()
args.num_workers = 0
# Create PyTorch DataLoader for constructing blocks
sampler = dgl.dataloading.MultiLayerNeighborSampler(
[int(fanout) for fanout in args.fan_out.split(',')])
dataloader = dgl.dataloading.NodeDataLoader(
train_g,
train_nid,
sampler,
device=device,
batch_size=args.batch_size,
shuffle=True,
drop_last=False,
num_workers=args.num_workers)
# Define model and optimizer
model = SAGE(in_feats, args.num_hidden, n_classes, args.num_layers, F.relu, args.dropout)
model = model.to(device)
loss_fcn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=args.lr)
# Training loop
avg = 0
iter_tput = []
for epoch in range(args.num_epochs):
tic = time.time()
# Loop over the dataloader to sample the computation dependency graph as a list of
# blocks.
tic_step = time.time()
for step, (input_nodes, seeds, blocks) in enumerate(dataloader):
# Load the input features as well as output labels
batch_inputs, batch_labels = load_subtensor(train_nfeat, train_labels,
seeds, input_nodes, device)
blocks = [block.int().to(device) for block in blocks]
# Compute loss and prediction
batch_pred = model(blocks, batch_inputs)
loss = loss_fcn(batch_pred, batch_labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
iter_tput.append(len(seeds) / (time.time() - tic_step))
if step % args.log_every == 0:
acc = compute_acc(batch_pred, batch_labels)
gpu_mem_alloc = th.cuda.max_memory_allocated() / 1000000 if th.cuda.is_available() else 0
print('Epoch {:05d} | Step {:05d} | Loss {:.4f} | Train Acc {:.4f} | Speed (samples/sec) {:.4f} | GPU {:.1f} MB'.format(
epoch, step, loss.item(), acc.item(), np.mean(iter_tput[3:]), gpu_mem_alloc))
tic_step = time.time()
toc = time.time()
print('Epoch Time(s): {:.4f}'.format(toc - tic))
if epoch >= 5:
avg += toc - tic
if epoch % args.eval_every == 0 and epoch != 0:
eval_acc = evaluate(model, val_g, val_nfeat, val_labels, val_nid, device)
print('Eval Acc {:.4f}'.format(eval_acc))
test_acc = evaluate(model, test_g, test_nfeat, test_labels, test_nid, device)
print('Test Acc: {:.4f}'.format(test_acc))
print('Avg epoch time: {}'.format(avg / (epoch - 4)))
if __name__ == '__main__':
argparser = argparse.ArgumentParser()
argparser.add_argument('--gpu', type=int, default=0,
help="GPU device ID. Use -1 for CPU training")
argparser.add_argument('--dataset', type=str, default='reddit')
argparser.add_argument('--num-epochs', type=int, default=20)
argparser.add_argument('--num-hidden', type=int, default=16)
argparser.add_argument('--num-layers', type=int, default=2)
argparser.add_argument('--fan-out', type=str, default='10,25')
argparser.add_argument('--batch-size', type=int, default=1000)
argparser.add_argument('--log-every', type=int, default=20)
argparser.add_argument('--eval-every', type=int, default=5)
argparser.add_argument('--lr', type=float, default=0.003)
argparser.add_argument('--dropout', type=float, default=0.5)
argparser.add_argument('--num-workers', type=int, default=4,
help="Number of sampling processes. Use 0 for no extra process.")
argparser.add_argument('--inductive', action='store_true',
help="Inductive learning setting")
argparser.add_argument('--graph-device', choices=('cpu', 'gpu', 'uva'), default='cpu',
help="Device to perform the sampling. "
"Must have 0 workers for 'gpu' and 'uva'")
argparser.add_argument('--data-device', choices=('cpu', 'gpu', 'uva'), default='gpu',
help="By default the script puts all node features and labels "
"on GPU when using it to save time for data copy. This may "
"be undesired if they cannot fit in GPU memory at once. "
"Use 'cpu' to keep the features on host memory and "
"'uva' to enable UnifiedTensor (GPU zero-copy access on "
"pinned host memory).")
args = argparser.parse_args()
if args.gpu >= 0:
device = th.device('cuda:%d' % args.gpu)
else:
device = th.device('cpu')
assert args.graph_device == 'cpu', \
f"Must have GPUs to enable {args.graph_device} sampling."
assert args.data_device == 'cpu', \
f"Must have GPUs to enable {args.data_device} feature storage."
if args.dataset == 'reddit':
g, n_classes = load_reddit()
elif args.dataset == 'ogbn-products':
g, n_classes = load_ogb('ogbn-products')
else:
raise Exception('unknown dataset')
if args.inductive:
train_g, val_g, test_g = inductive_split(g)
train_nfeat = train_g.ndata.pop('features')
val_nfeat = val_g.ndata.pop('features')
test_nfeat = test_g.ndata.pop('features')
train_labels = train_g.ndata.pop('labels')
val_labels = val_g.ndata.pop('labels')
test_labels = test_g.ndata.pop('labels')
else:
train_g = val_g = test_g = g
train_nfeat = val_nfeat = test_nfeat = g.ndata.pop('features')
train_labels = val_labels = test_labels = g.ndata.pop('labels')
if args.data_device == 'gpu':
train_nfeat = train_nfeat.to(device)
train_labels = train_labels.to(device)
elif args.data_device == 'uva':
train_nfeat = dgl.contrib.UnifiedTensor(train_nfeat, device=device)
train_labels = dgl.contrib.UnifiedTensor(train_labels, device=device)
# Pack data
data = n_classes, train_g, val_g, test_g, train_nfeat, train_labels, \
val_nfeat, val_labels, test_nfeat, test_labels
run(args, device, data)
import os
import dgl
import numpy as np
import torch as th
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import dgl.multiprocessing as mp
import dgl.nn.pytorch as dglnn
import time
import math
import argparse
from torch.nn.parallel import DistributedDataParallel
import tqdm
from model import SAGE
from load_graph import load_reddit, inductive_split, load_ogb
def compute_acc(pred, labels):
"""
Compute the accuracy of prediction given the labels.
"""
return (th.argmax(pred, dim=1) == labels).float().sum() / len(pred)
def evaluate(model, g, nfeat, labels, val_nid, device):
"""
Evaluate the model on the validation set specified by ``val_nid``.
g : The entire graph.
inputs : The features of all the nodes.
labels : The labels of all the nodes.
val_nid : A node ID tensor indicating which nodes do we actually compute the accuracy for.
device : The GPU device to evaluate on.
"""
model.eval()
with th.no_grad():
pred = model.inference(g, nfeat, device, args.batch_size, args.num_workers)
model.train()
return compute_acc(pred[val_nid], labels[val_nid])
def load_subtensor(nfeat, labels, seeds, input_nodes, dev_id):
"""
Extracts features and labels for a subset of nodes.
"""
batch_inputs = nfeat[input_nodes].to(dev_id)
batch_labels = labels[seeds].to(dev_id)
return batch_inputs, batch_labels
#### Entry point
def run(proc_id, n_gpus, args, devices, data):
# Start up distributed training, if enabled.
device = th.device(devices[proc_id])
if n_gpus > 0:
th.cuda.set_device(device)
if n_gpus > 1:
dist_init_method = 'tcp://{master_ip}:{master_port}'.format(
master_ip='127.0.0.1', master_port='12345')
world_size = n_gpus
th.distributed.init_process_group(backend="nccl",
init_method=dist_init_method,
world_size=world_size,
rank=proc_id)
# Unpack data
n_classes, train_g, val_g, test_g, train_nfeat, val_nfeat, test_nfeat, \
train_labels, val_labels, test_labels, train_nid, val_nid, test_nid = data
if args.data_device == 'gpu':
train_nfeat = train_nfeat.to(device)
train_labels = train_labels.to(device)
elif args.data_device == 'uva':
train_nfeat = dgl.contrib.UnifiedTensor(train_nfeat, device=device)
train_labels = dgl.contrib.UnifiedTensor(train_labels, device=device)
in_feats = train_nfeat.shape[1]
if args.graph_device == 'gpu':
train_nid = train_nid.to(device)
train_g = train_g.formats(['csc'])
train_g = train_g.to(device)
args.num_workers = 0
elif args.graph_device == 'uva':
train_nid = train_nid.to(device)
train_g.pin_memory_()
args.num_workers = 0
# Create PyTorch DataLoader for constructing blocks
sampler = dgl.dataloading.MultiLayerNeighborSampler(
[int(fanout) for fanout in args.fan_out.split(',')])
dataloader = dgl.dataloading.NodeDataLoader(
train_g,
train_nid,
sampler,
use_ddp=n_gpus > 1,
device=device,
batch_size=args.batch_size,
shuffle=True,
drop_last=False,
num_workers=args.num_workers)
# Define model and optimizer
model = SAGE(in_feats, args.num_hidden, n_classes, args.num_layers, F.relu, args.dropout)
model = model.to(device)
if n_gpus > 1:
model = DistributedDataParallel(model, device_ids=[device], output_device=device)
loss_fcn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=args.lr)
# Training loop
avg = 0
iter_tput = []
for epoch in range(args.num_epochs):
tic = time.time()
# Loop over the dataloader to sample the computation dependency graph as a list of
# blocks.
for step, (input_nodes, seeds, blocks) in enumerate(dataloader):
if proc_id == 0:
tic_step = time.time()
# Load the input features as well as output labels
batch_inputs, batch_labels = load_subtensor(train_nfeat, train_labels,
seeds, input_nodes, device)
blocks = [block.int().to(device) for block in blocks]
# Compute loss and prediction
batch_pred = model(blocks, batch_inputs)
loss = loss_fcn(batch_pred, batch_labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if proc_id == 0:
iter_tput.append(len(seeds) * n_gpus / (time.time() - tic_step))
if step % args.log_every == 0 and proc_id == 0:
acc = compute_acc(batch_pred, batch_labels)
print('Epoch {:05d} | Step {:05d} | Loss {:.4f} | Train Acc {:.4f} | Speed (samples/sec) {:.4f} | GPU {:.1f} MB'.format(
epoch, step, loss.item(), acc.item(), np.mean(iter_tput[3:]), th.cuda.max_memory_allocated() / 1000000))
if n_gpus > 1:
th.distributed.barrier()
toc = time.time()
if proc_id == 0:
print('Epoch Time(s): {:.4f}'.format(toc - tic))
if epoch >= 5:
avg += toc - tic
if epoch % args.eval_every == 0 and epoch != 0:
if n_gpus == 1:
eval_acc = evaluate(
model, val_g, val_nfeat, val_labels, val_nid, devices[0])
test_acc = evaluate(
model, test_g, test_nfeat, test_labels, test_nid, devices[0])
else:
eval_acc = evaluate(
model.module, val_g, val_nfeat, val_labels, val_nid, devices[0])
test_acc = evaluate(
model.module, test_g, test_nfeat, test_labels, test_nid, devices[0])
print('Eval Acc {:.4f}'.format(eval_acc))
print('Test Acc: {:.4f}'.format(test_acc))
if n_gpus > 1:
th.distributed.barrier()
if proc_id == 0:
print('Avg epoch time: {}'.format(avg / (epoch - 4)))
if __name__ == '__main__':
argparser = argparse.ArgumentParser("multi-gpu training")
argparser.add_argument('--gpu', type=str, default='0',
help="Comma separated list of GPU device IDs.")
argparser.add_argument('--dataset', type=str, default='reddit')
argparser.add_argument('--num-epochs', type=int, default=20)
argparser.add_argument('--num-hidden', type=int, default=16)
argparser.add_argument('--num-layers', type=int, default=2)
argparser.add_argument('--fan-out', type=str, default='10,25')
argparser.add_argument('--batch-size', type=int, default=1000)
argparser.add_argument('--log-every', type=int, default=20)
argparser.add_argument('--eval-every', type=int, default=5)
argparser.add_argument('--lr', type=float, default=0.003)
argparser.add_argument('--dropout', type=float, default=0.5)
argparser.add_argument('--num-workers', type=int, default=0,
help="Number of sampling processes. Use 0 for no extra process.")
argparser.add_argument('--inductive', action='store_true',
help="Inductive learning setting")
argparser.add_argument('--graph-device', choices=('cpu', 'gpu', 'uva'), default='cpu',
help="Device to perform the sampling. "
"Must have 0 workers for 'gpu' and 'uva'")
argparser.add_argument('--data-device', choices=('cpu', 'gpu', 'uva'), default='gpu',
help="By default the script puts all node features and labels "
"on GPU when using it to save time for data copy. This may "
"be undesired if they cannot fit in GPU memory at once. "
"Use 'cpu' to keep the features on host memory and "
"'uva' to enable UnifiedTensor (GPU zero-copy access on "
"pinned host memory).")
args = argparser.parse_args()
devices = list(map(int, args.gpu.split(',')))
n_gpus = len(devices)
if args.dataset == 'reddit':
g, n_classes = load_reddit()
elif args.dataset == 'ogbn-products':
g, n_classes = load_ogb('ogbn-products')
elif args.dataset == 'ogbn-papers100M':
g, n_classes = load_ogb('ogbn-papers100M')
g = dgl.add_reverse_edges(g)
# convert labels to integer
g.ndata['labels'] = th.as_tensor(g.ndata['labels'], dtype=th.int64)
g.ndata.pop('year')
else:
raise Exception('unknown dataset')
if args.inductive:
train_g, val_g, test_g = inductive_split(g)
train_nfeat = train_g.ndata.pop('features')
val_nfeat = val_g.ndata.pop('features')
test_nfeat = test_g.ndata.pop('features')
train_labels = train_g.ndata.pop('labels')
val_labels = val_g.ndata.pop('labels')
test_labels = test_g.ndata.pop('labels')
else:
train_g = val_g = test_g = g
train_nfeat = val_nfeat = test_nfeat = g.ndata.pop('features')
train_labels = val_labels = test_labels = g.ndata.pop('labels')
test_nid = test_g.ndata.pop('test_mask',
~(test_g.ndata['train_mask'] | test_g.ndata['val_mask'])).nonzero().squeeze()
train_nid = train_g.ndata.pop('train_mask').nonzero().squeeze()
val_nid = val_g.ndata.pop('val_mask').nonzero().squeeze()
# Create csr/coo/csc formats before launching training processes with multi-gpu.
# This avoids creating certain formats in each sub-process, which saves momory and CPU.
train_g.create_formats_()
val_g.create_formats_()
test_g.create_formats_()
# this to avoid competition overhead on machines with many cores.
# Change it to a proper number on your machine, especially for multi-GPU training.
os.environ['OMP_NUM_THREADS'] = str(mp.cpu_count() // 2 // n_gpus)
if n_gpus > 1:
# Copy the graph to shared memory explicitly before pinning.
# In other cases, we can just rely on fork's copy-on-write.
# TODO: the original train_g is not freed.
if args.graph_device == 'uva':
train_g = train_g.shared_memory('train_g')
if args.data_device == 'uva':
train_nfeat = train_nfeat.share_memory_()
train_labels = train_labels.share_memory_()
# Pack data
data = n_classes, train_g, val_g, test_g, train_nfeat, val_nfeat, test_nfeat, \
train_labels, val_labels, test_labels, train_nid, val_nid, test_nid
if devices[0] == -1:
assert args.graph_device == 'cpu', \
f"Must have GPUs to enable {args.graph_device} sampling."
assert args.data_device == 'cpu', \
f"Must have GPUs to enable {args.data_device} feature storage."
run(0, 0, args, ['cpu'], data)
elif n_gpus == 1:
run(0, n_gpus, args, devices, data)
else:
procs = []
for proc_id in range(n_gpus):
p = mp.Process(target=run, args=(proc_id, n_gpus, args, devices, data))
p.start()
procs.append(p)
for p in procs:
p.join()
......@@ -11,7 +11,7 @@ import dgl.nn as dglnn
import torch.nn as nn
import torch.nn.functional as F
import argparse
import dgl.multiprocessing as mp
import torch.multiprocessing as mp
import sys
from torch.nn.parallel import DistributedDataParallel
from collections import OrderedDict
......@@ -282,13 +282,6 @@ if __name__ == '__main__':
num_features = dataset.num_paper_features
feats = np.memmap(args.full_feature_path, mode='r', dtype='float16', shape=(num_nodes, num_features))
procs = []
for proc_id in range(n_gpus):
p = mp.Process(target=train, args=(proc_id, n_gpus, args, dataset, g, feats, paper_offset))
p.start()
procs.append(p)
for p in procs:
p.join()
mp.spawn(train, args=(n_gpus, args, dataset, g, feats, paper_offset), nprocs=n_gpus)
test(args, dataset, g, feats, paper_offset)
Relational GAT
==============
This is an adaptation of RGCN where the graph convolution is replaced with graph attention.
Run
```bash
python rgat.py
```
to see the results.
......@@ -5,14 +5,12 @@ import torchmetrics.functional as MF
import dgl
import dgl.function as fn
import dgl.nn as dglnn
from dgl.utils import recursive_apply
from dgl import apply_each
import time
import numpy as np
from ogb.nodeproppred import DglNodePropPredDataset
import tqdm
USE_UVA = False
class HeteroGAT(nn.Module):
def __init__(self, etypes, in_feats, n_hidden, n_classes, n_heads=4):
super().__init__()
......@@ -35,10 +33,10 @@ class HeteroGAT(nn.Module):
h = layer(block, h)
# One thing is that h might return tensors with zero rows if the number of dst nodes
# of one node type is 0. x.view(x.shape[0], -1) wouldn't work in this case.
h = recursive_apply(h, lambda x: x.view(x.shape[0], x.shape[1] * x.shape[2]))
h = apply_each(h, lambda x: x.view(x.shape[0], x.shape[1] * x.shape[2]))
if l != len(self.layers) - 1:
h = recursive_apply(h, F.relu)
h = recursive_apply(h, self.dropout)
h = apply_each(h, F.relu)
h = apply_each(h, self.dropout)
return self.linear(h['paper'])
dataset = DglNodePropPredDataset('ogbn-mag')
......@@ -58,12 +56,9 @@ opt = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=5e-4)
split_idx = dataset.get_idx_split()
train_idx, valid_idx, test_idx = split_idx['train'], split_idx['valid'], split_idx['test']
if not USE_UVA:
graph = graph.to('cuda')
train_idx = recursive_apply(train_idx, lambda x: x.to('cuda'))
valid_idx = recursive_apply(valid_idx, lambda x: x.to('cuda'))
test_idx = recursive_apply(test_idx, lambda x: x.to('cuda'))
train_idx = apply_each(train_idx, lambda x: x.to('cuda'))
valid_idx = apply_each(valid_idx, lambda x: x.to('cuda'))
test_idx = apply_each(test_idx, lambda x: x.to('cuda'))
train_sampler = dgl.dataloading.NeighborSampler(
[5, 5, 5],
......@@ -73,18 +68,18 @@ valid_sampler = dgl.dataloading.NeighborSampler(
[10, 10, 10], # Slightly more
prefetch_node_feats={k: ['feat'] for k in graph.ntypes},
prefetch_labels={'paper': ['label']})
train_dataloader = dgl.dataloading.NodeDataLoader(
train_dataloader = dgl.dataloading.DataLoader(
graph, train_idx, train_sampler,
device='cuda', batch_size=1000, shuffle=True,
drop_last=False, num_workers=0, use_uva=USE_UVA)
valid_dataloader = dgl.dataloading.NodeDataLoader(
drop_last=False, num_workers=0, use_uva=True)
valid_dataloader = dgl.dataloading.DataLoader(
graph, valid_idx, valid_sampler,
device='cuda', batch_size=1000, shuffle=False,
drop_last=False, num_workers=0, use_uva=USE_UVA)
test_dataloader = dgl.dataloading.NodeDataLoader(
drop_last=False, num_workers=0, use_uva=True)
test_dataloader = dgl.dataloading.DataLoader(
graph, test_idx, valid_sampler,
device='cuda', batch_size=1000, shuffle=False,
drop_last=False, num_workers=0, use_uva=USE_UVA)
drop_last=False, num_workers=0, use_uva=True)
def evaluate(model, dataloader):
preds = []
......@@ -94,8 +89,8 @@ def evaluate(model, dataloader):
x = blocks[0].srcdata['feat']
y = blocks[-1].dstdata['label']['paper'][:, 0]
y_hat = model(blocks, x)
preds.append(y_hat)
labels.append(y)
preds.append(y_hat.cpu())
labels.append(y.cpu())
preds = torch.cat(preds, 0)
labels = torch.cat(labels, 0)
acc = MF.accuracy(preds, labels)
......
......@@ -6,7 +6,7 @@ import argparse
import gc
import torch as th
import torch.nn.functional as F
import dgl.multiprocessing as mp
import torch.multiprocessing as mp
import dgl
from torchmetrics.functional import accuracy
......@@ -31,8 +31,8 @@ def collect_eval(n_gpus, queue, labels):
def run(proc_id, n_gpus, n_cpus, args, devices, dataset, queue=None):
dev_id = devices[proc_id]
g, num_classes, num_rels, target_idx, inv_target, train_idx,\
test_idx, labels = dataset
g, num_rels, num_classes, labels, train_idx, test_idx,\
target_idx, inv_target = dataset
dist_init_method = 'tcp://{master_ip}:{master_port}'.format(
master_ip='127.0.0.1', master_port='12345')
......@@ -93,8 +93,7 @@ def run(proc_id, n_gpus, n_cpus, args, devices, dataset, queue=None):
th.distributed.barrier()
def main(args, devices):
g, num_rels, num_classes, labels, train_idx, test_idx, target_idx, inv_target = load_data(
args.dataset, inv_target=True)
data = load_data(args.dataset, inv_target=True)
# Create csr/coo/csc formats before launching training processes.
# This avoids creating certain formats in each sub-process, which saves momory and CPU.
......@@ -103,17 +102,8 @@ def main(args, devices):
n_gpus = len(devices)
n_cpus = mp.cpu_count()
queue = mp.Queue(n_gpus)
procs = []
for proc_id in range(n_gpus):
# We use distributed data parallel dataloader to handle the data splitting
p = mp.Process(target=run, args=(proc_id, n_gpus, n_cpus // n_gpus, args, devices,
(g, num_classes, num_rels, target_idx,
inv_target, train_idx, test_idx, labels),
queue))
p.start()
procs.append(p)
for p in procs:
p.join()
mp.spawn(run, args=(n_gpus, n_cpus // n_gpus, args, devices, data, queue),
nprocs=n_gpus)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='RGCN for entity classification with sampling and multiple gpus')
......
VRGCN (control variate sampling)
================================
Paper: https://arxiv.org/abs/1710.10568
Run with
```bash
python3 train_cv.py --num-epochs 30
python3 train_cv_multi_gpu.py --num-epochs 30 --gpu 0,1,2,3 # multi-GPU
```
......@@ -4,7 +4,6 @@ import torch as th
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import dgl.multiprocessing as mp
import dgl.function as fn
import dgl.nn.pytorch as dglnn
import time
......@@ -12,7 +11,6 @@ import argparse
import tqdm
from dgl.data import RedditDataset
from torch.utils.data import DataLoader
from torch.nn.parallel import DistributedDataParallel
class SAGEConvWithCV(nn.Module):
def __init__(self, in_feats, out_feats, activation):
......
......@@ -4,7 +4,7 @@ import torch as th
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import dgl.multiprocessing as mp
import torch.multiprocessing as mp
import dgl.function as fn
import dgl.nn.pytorch as dglnn
import time
......@@ -370,10 +370,4 @@ if __name__ == '__main__':
if n_gpus == 1:
run(0, n_gpus, args, devices, data)
else:
procs = []
for proc_id in range(n_gpus):
p = mp.Process(target=run, args=(proc_id, n_gpus, args, devices, data))
p.start()
procs.append(p)
for p in procs:
p.join()
mp.spawn(run, args=(n_gpus, args, devices, data), nprocs=n_gpus)
......@@ -355,6 +355,29 @@ IdArray VecToIdArray(const std::vector<T>& vec,
return ret.CopyTo(ctx);
}
/*!
* \brief Get the context of the first non-null array, and check if the non-null arrays'
* contexts are the same.
*
* Throws an error if all the arrays are null arrays.
*/
inline DLContext GetContextOf(const std::vector<IdArray>& arrays) {
bool first = true;
DLContext result;
for (auto& array : arrays) {
if (IsNullArray(array))
continue;
if (first) {
first = false;
result = array->ctx;
} else {
CHECK_EQ(array->ctx, result) << "Context of the input arrays are different";
}
}
CHECK(!first) << "All input arrays are empty.";
return result;
}
} // namespace aten
} // namespace dgl
......
......@@ -38,6 +38,8 @@ from .convert import *
from .generators import *
from .heterograph import DGLHeteroGraph
from .heterograph import DGLHeteroGraph as DGLGraph # pylint: disable=reimported
from .dataloading import set_src_lazy_features, set_dst_lazy_features, set_edge_lazy_features, \
set_node_lazy_features
from .merge import *
from .subgraph import *
from .traversal import *
......@@ -46,6 +48,8 @@ from .propagate import *
from .random import *
from .data.utils import save_graphs, load_graphs
from . import optim
from .frame import LazyFeature
from .utils import recursive_apply
from ._deprecate.graph import DGLGraph as DGLGraphStale
from ._deprecate.nodeflow import *
"""Base classes and functionalities for dataloaders"""
from collections import Mapping
import inspect
from ..base import NID, EID
from ..convert import heterograph
from .. import backend as F
......@@ -17,39 +18,188 @@ def _set_lazy_features(x, xdata, feature_names):
x[type_].data.update({k: LazyFeature(k) for k in names})
def set_node_lazy_features(g, feature_names):
"""Set lazy features for ``g.ndata`` if :attr:`feature_names` is a list of strings,
or ``g.nodes[ntype].data`` if :attr:`feature_names` is a dict of list of strings.
"""Assign :class:`~dgl.LazyFeature`s to the node data of the input graph.
When used in a :class:`~dgl.dataloading.Sampler`, lazy features mark which data
should be fetched before computation in model. See :ref:`guide-minibatch-prefetching`
for a detailed explanation.
If the graph is homogeneous, this is equivalent to:
.. code:: python
g.ndata.update({k: LazyFeature(k, g.ndata[dgl.NID]) for k in feature_names})
If the graph is heterogeneous, this is equivalent to:
.. code:: python
for type_, names in feature_names.items():
g.nodes[type_].data.update(
{k: LazyFeature(k, g.nodes[type_].data[dgl.NID]) for k in names})
Parameters
----------
g : DGLGraph
The graph.
feature_names : list[str] or dict[str, list[str]]
The feature names to prefetch.
See also
--------
dgl.LazyFeature
"""
return _set_lazy_features(g.nodes, g.ndata, feature_names)
def set_edge_lazy_features(g, feature_names):
"""Set lazy features for ``g.edata`` if :attr:`feature_names` is a list of strings,
or ``g.edges[etype].data`` if :attr:`feature_names` is a dict of list of strings.
"""Assign :class:`~dgl.LazyFeature`s to the edge data of the input graph.
When used in a :class:`~dgl.dataloading.Sampler`, lazy features mark which data
should be fetched before computation in model. See :ref:`guide-minibatch-prefetching`
for a detailed explanation.
If the graph is homogeneous, this is equivalent to:
.. code:: python
g.edata.update({k: LazyFeature(k, g.edata[dgl.EID]) for k in feature_names})
If the graph is heterogeneous, this is equivalent to:
.. code:: python
for type_, names in feature_names.items():
g.edges[type_].data.update(
{k: LazyFeature(k, g.edges[type_].data[dgl.EID]) for k in names})
Parameters
----------
g : DGLGraph
The graph.
feature_names : list[str] or dict[etype, list[str]]
The feature names to prefetch. The ``etype`` key is either a string
or a triplet.
See also
--------
dgl.LazyFeature
"""
return _set_lazy_features(g.edges, g.edata, feature_names)
def set_src_lazy_features(g, feature_names):
"""Set lazy features for ``g.srcdata`` if :attr:`feature_names` is a list of strings,
or ``g.srcnodes[srctype].data`` if :attr:`feature_names` is a dict of list of strings.
"""Assign :class:`~dgl.LazyFeature`s to the source node data of the input MFG.
When used in a :class:`~dgl.dataloading.Sampler`, lazy features mark which data
should be fetched before computation in model. See :ref:`guide-minibatch-prefetching`
for a detailed explanation.
If the graph is homogeneous, this is equivalent to:
.. code:: python
g.srcdata.update({k: LazyFeature(k, g.srcdata[dgl.NID]) for k in feature_names})
If the graph is heterogeneous, this is equivalent to:
.. code:: python
for type_, names in feature_names.items():
g.srcnodes[type_].data.update(
{k: LazyFeature(k, g.srcnodes[type_].data[dgl.NID]) for k in names})
Parameters
----------
g : DGLGraph
The graph.
feature_names : list[str] or dict[str, list[str]]
The feature names to prefetch.
See also
--------
dgl.LazyFeature
"""
return _set_lazy_features(g.srcnodes, g.srcdata, feature_names)
def set_dst_lazy_features(g, feature_names):
"""Set lazy features for ``g.dstdata`` if :attr:`feature_names` is a list of strings,
or ``g.dstnodes[dsttype].data`` if :attr:`feature_names` is a dict of list of strings.
"""Assign :class:`~dgl.LazyFeature`s to the destination node data of the input MFG.
When used in a :class:`~dgl.dataloading.Sampler`, lazy features mark which data
should be fetched before computation in model. See :ref:`guide-minibatch-prefetching`
for a detailed explanation.
If the graph is homogeneous, this is equivalent to:
.. code:: python
g.dstdata.update({k: LazyFeature(k, g.dstdata[dgl.NID]) for k in feature_names})
If the graph is heterogeneous, this is equivalent to:
.. code:: python
for type_, names in feature_names.items():
g.dstnodes[type_].data.update(
{k: LazyFeature(k, g.dstnodes[type_].data[dgl.NID]) for k in names})
Parameters
----------
g : DGLGraph
The graph.
feature_names : list[str] or dict[str, list[str]]
The feature names to prefetch.
See also
--------
dgl.LazyFeature
"""
return _set_lazy_features(g.dstnodes, g.dstdata, feature_names)
class BlockSampler(object):
"""BlockSampler is an abstract class assuming to take in a set of nodes whose
outputs are to compute, and return a list of blocks.
class Sampler(object):
"""Abstract sampler class."""
def sample(self, g, indices):
"""Abstract sample method.
Parameters
----------
g : DGLGraph
The graph.
indices : object
Any object representing the indices selected in the current minibatch.
"""
raise NotImplementedError
class BlockSampler(Sampler):
"""Abstract class that assumes to take in a set of nodes whose
outputs are to compute, and returns a list of MFGs.
Moreover, it assumes that the input node features will be put in the first MFG's
``srcdata``, the output node labels will be put in the last MFG's ``dstdata``, and
the edge data will be put in all the MFGs' ``edata``.
Moreover, it assumes that the input node features will be put in the first block's
``srcdata``, the output node labels will be put in the last block's ``dstdata``, and
the edge data will be put in all the blocks' ``edata``.
Parameters
----------
prefetch_node_feats : list[str] or dict[str, list[str]], optional
The node data to prefetch for the first MFG.
DGL will populate the first layer's MFG's ``srcnodes`` and ``srcdata`` with
the node data of the given names from the original graph.
prefetch_labels : list[str] or dict[str, list[str]], optional
The node data to prefetch for the last MFG.
DGL will populate the last layer's MFG's ``dstnodes`` and ``dstdata`` with
the node data of the given names from the original graph.
prefetch_edge_feats : list[str] or dict[etype, list[str]], optional
The edge data names to prefetch for all the MFGs.
DGL will populate every MFG's ``edges`` and ``edata`` with the edge data
of the given names from the original graph.
output_device : device, optional
The device of the output subgraphs or MFGs. Default is the same as the
minibatch of seed nodes.
"""
def __init__(self, prefetch_node_feats=None, prefetch_labels=None,
prefetch_edge_feats=None, output_device=None):
super().__init__()
self.prefetch_node_feats = prefetch_node_feats or []
self.prefetch_labels = prefetch_labels or []
self.prefetch_edge_feats = prefetch_edge_feats or []
......@@ -97,9 +247,9 @@ class BlockSampler(object):
set_edge_lazy_features(block, self.prefetch_edge_feats)
return input_nodes, output_nodes, blocks
def sample(self, g, seed_nodes):
def sample(self, g, seed_nodes, exclude_eids=None): # pylint: disable=arguments-differ
"""Sample a list of blocks from the given seed nodes."""
result = self.sample_blocks(g, seed_nodes)
result = self.sample_blocks(g, seed_nodes, exclude_eids=exclude_eids)
return self.assign_lazy_features(result)
......@@ -198,23 +348,30 @@ def find_exclude_eids(g, seed_edges, exclude, reverse_eids=None, reverse_etypes=
exclude_eids = recursive_apply(exclude_eids, lambda x: F.copy_to(x, output_device))
return exclude_eids
class EdgePredictionSampler(Sampler):
"""Sampler class that wraps an existing sampler for node classification into another
one for edge classification or link prediction.
class EdgeBlockSampler(object):
"""Adapts a :class:`BlockSampler` object's :attr:`sample` method for edge
classification and link prediction.
See also
--------
as_edge_prediction_sampler
"""
def __init__(self, block_sampler, exclude=None, reverse_eids=None,
reverse_etypes=None, negative_sampler=None,
prefetch_node_feats=None, prefetch_labels=None, prefetch_edge_feats=None,):
def __init__(self, sampler, exclude=None, reverse_eids=None,
reverse_etypes=None, negative_sampler=None, prefetch_labels=None):
super().__init__()
# Check if the sampler's sample method has an optional third argument.
argspec = inspect.getfullargspec(sampler.sample)
if len(argspec.args) < 4: # ['self', 'g', 'indices', 'exclude_eids']
raise TypeError(
"This sampler does not support edge or link prediction; please add an"
"optional third argument for edge IDs to exclude in its sample() method.")
self.reverse_eids = reverse_eids
self.reverse_etypes = reverse_etypes
self.exclude = exclude
self.block_sampler = block_sampler
self.sampler = sampler
self.negative_sampler = negative_sampler
self.prefetch_node_feats = prefetch_node_feats or []
self.prefetch_labels = prefetch_labels or []
self.prefetch_edge_feats = prefetch_edge_feats or []
self.output_device = block_sampler.output_device
self.output_device = sampler.output_device
def _build_neg_graph(self, g, seed_edges):
neg_srcdst = self.negative_sampler(g, seed_edges)
......@@ -235,16 +392,11 @@ class EdgeBlockSampler(object):
def assign_lazy_features(self, result):
"""Assign lazy features for prefetching."""
pair_graph = result[1]
blocks = result[-1]
set_src_lazy_features(blocks[0], self.prefetch_node_feats)
set_edge_lazy_features(pair_graph, self.prefetch_labels)
for block in blocks:
set_edge_lazy_features(block, self.prefetch_edge_feats)
# In-place updates
return result
def sample(self, g, seed_edges):
def sample(self, g, seed_edges): # pylint: disable=arguments-differ
"""Samples a list of blocks, as well as a subgraph containing the sampled
edges from the original graph.
......@@ -271,9 +423,159 @@ class EdgeBlockSampler(object):
g, seed_edges, exclude, self.reverse_eids, self.reverse_etypes,
self.output_device)
input_nodes, _, blocks = self.block_sampler.sample_blocks(g, seed_nodes, exclude_eids)
input_nodes, _, blocks = self.sampler.sample(g, seed_nodes, exclude_eids)
if self.negative_sampler is None:
return self.assign_lazy_features((input_nodes, pair_graph, blocks))
else:
return self.assign_lazy_features((input_nodes, pair_graph, neg_graph, blocks))
def as_edge_prediction_sampler(
sampler, exclude=None, reverse_eids=None, reverse_etypes=None, negative_sampler=None,
prefetch_labels=None):
"""Create an edge-wise sampler from a node-wise sampler.
For each batch of edges, the sampler applies the provided node-wise sampler to
their source and destination nodes to extract subgraphs. It also generates negative
edges if a negative sampler is provided, and extract subgraphs for their incident
nodes as well.
For each iteration, the sampler will yield
* A tensor of input nodes necessary for computing the representation on edges, or
a dictionary of node type names and such tensors.
* A subgraph that contains only the edges in the minibatch and their incident nodes.
Note that the graph has an identical metagraph with the original graph.
* If a negative sampler is given, another graph that contains the "negative edges",
connecting the source and destination nodes yielded from the given negative sampler.
* The subgraphs or MFGs returned by the provided node-wise sampler, generated
from the incident nodes of the edges in the minibatch (as well as those of the
negative edges if applicable).
Parameters
----------
sampler : Sampler
The node-wise sampler object. It additionally requires that the :attr:`sample`
method must have an optional third argument :attr:`exclude_eids` representing the
edge IDs to exclude from neighborhood. The argument will be either a tensor
for homogeneous graphs or a dict of edge types and tensors for heterogeneous
graphs.
exclude : str, optional
Whether and how to exclude dependencies related to the sampled edges in the
minibatch. Possible values are
* None, for not excluding any edges.
* ``self``, for excluding the edges in the current minibatch.
* ``reverse_id``, for excluding not only the edges in the current minibatch but
also their reverse edges according to the ID mapping in the argument
:attr:`reverse_eids`.
* ``reverse_types``, for excluding not only the edges in the current minibatch
but also their reverse edges stored in another type according to
the argument :attr:`reverse_etypes`.
* User-defined exclusion rule. It is a callable with edges in the current
minibatch as a single argument and should return the edges to be excluded.
reverse_eids : Tensor or dict[etype, Tensor], optional
A tensor of reverse edge ID mapping. The i-th element indicates the ID of
the i-th edge's reverse edge.
If the graph is heterogeneous, this argument requires a dictionary of edge
types and the reverse edge ID mapping tensors.
reverse_etypes : dict[etype, etype], optional
The mapping from the original edge types to their reverse edge types.
negative_sampler : callable, optional
The negative sampler.
prefetch_labels : list[str] or dict[etype, list[str]], optional
The edge labels to prefetch for the returned positive pair graph.
See :ref:`guide-minibatch-prefetching` for a detailed explanation of prefetching.
Examples
--------
The following example shows how to train a 3-layer GNN for edge classification on a
set of edges ``train_eid`` on a homogeneous undirected graph. Each node takes
messages from all neighbors.
Given an array of source node IDs ``src`` and another array of destination
node IDs ``dst``, the following code creates a bidirectional graph:
>>> g = dgl.graph((torch.cat([src, dst]), torch.cat([dst, src])))
Edge :math:`i`'s reverse edge in the graph above is edge :math:`i + |E|`. Therefore, we can
create a reverse edge mapping ``reverse_eids`` by:
>>> E = len(src)
>>> reverse_eids = torch.cat([torch.arange(E, 2 * E), torch.arange(0, E)])
By passing ``reverse_eids`` to the edge sampler, the edges in the current mini-batch and their
reversed edges will be excluded from the extracted subgraphs to avoid information leakage.
>>> sampler = dgl.dataloading.as_edge_prediction_sampler(
... dgl.dataloading.NeighborSampler([15, 10, 5]),
... exclude='reverse_id', reverse_eids=reverse_eids)
>>> dataloader = dgl.dataloading.EdgeDataLoader(
... g, train_eid, sampler,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, pair_graph, blocks in dataloader:
... train_on(input_nodes, pair_graph, blocks)
For link prediction, one can provide a negative sampler to sample negative edges.
The code below uses DGL's :class:`~dgl.dataloading.negative_sampler.Uniform`
to generate 5 negative samples per edge:
>>> neg_sampler = dgl.dataloading.negative_sampler.Uniform(5)
>>> sampler = dgl.dataloading.as_edge_prediction_sampler(
... dgl.dataloading.NeighborSampler([15, 10, 5]),
... sampler, exclude='reverse_id', reverse_eids=reverse_eids,
... negative_sampler=neg_sampler)
>>> dataloader = dgl.dataloading.EdgeDataLoader(
... g, train_eid, sampler,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, pos_pair_graph, neg_pair_graph, blocks in dataloader:
... train_on(input_nodes, pair_graph, neg_pair_graph, blocks)
For heterogeneous graphs, reverse edges may belong to a different relation. For example,
the relations "user-click-item" and "item-click-by-user" in the graph below are
mutual reverse.
>>> g = dgl.heterograph({
... ('user', 'click', 'item'): (user, item),
... ('item', 'clicked-by', 'user'): (item, user)})
To correctly exclude edges from each mini-batch, set ``exclude='reverse_types'`` and
pass a dictionary ``{'click': 'clicked-by', 'clicked-by': 'click'}`` to the
``reverse_etypes`` argument.
>>> sampler = dgl.dataloading.as_edge_prediction_sampler(
... dgl.dataloading.NeighborSampler([15, 10, 5]),
... exclude='reverse_types',
... reverse_etypes={'click': 'clicked-by', 'clicked-by': 'click'})
>>> dataloader = dgl.dataloading.EdgeDataLoader(
... g, {'click': train_eid}, sampler,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, pair_graph, blocks in dataloader:
... train_on(input_nodes, pair_graph, blocks)
For link prediction, provide a negative sampler to generate negative samples:
>>> neg_sampler = dgl.dataloading.negative_sampler.Uniform(5)
>>> sampler = dgl.dataloading.as_edge_prediction_sampler(
... dgl.dataloading.NeighborSampler([15, 10, 5]),
... exclude='reverse_types',
... reverse_etypes={'click': 'clicked-by', 'clicked-by': 'click'},
... negative_sampler=neg_sampler)
>>> dataloader = dgl.dataloading.EdgeDataLoader(
... g, train_eid, sampler,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, pos_pair_graph, neg_pair_graph, blocks in dataloader:
... train_on(input_nodes, pair_graph, neg_pair_graph, blocks)
"""
return EdgePredictionSampler(
sampler, exclude=exclude, reverse_eids=reverse_eids, reverse_etypes=reverse_etypes,
negative_sampler=negative_sampler, prefetch_labels=prefetch_labels)
......@@ -6,32 +6,61 @@ import numpy as np
from .. import backend as F
from ..base import DGLError
from ..partition import metis_partition_assignment
from .base import set_node_lazy_features, set_edge_lazy_features
from .base import set_node_lazy_features, set_edge_lazy_features, Sampler
class ClusterGCNSampler(object):
class ClusterGCNSampler(Sampler):
"""Cluster-GCN sampler.
This sampler first partitions the graph with METIS partitioning, then it caches the nodes of
each partition to a file within the given cache directory.
This is used in conjunction with :class:`dgl.dataloading.DataLoader`.
Notes
-----
The graph must be homogeneous and on CPU.
The sampler then selects the graph partitions according to the provided
partition IDs, take the union of all nodes in those partitions, and return an
induced subgraph in its :attr:`sample` method.
Parameters
----------
g : DGLGraph
The original graph.
The original graph. Must be homogeneous and on CPU.
k : int
The number of partitions.
cache_path : str
The path to the cache directory for storing the partition result.
balance_ntypes, balkance_edges, mode :
Passed to :func:`dgl.metis_partition_assignment`.
prefetch_ndata : list[str], optional
The node data to prefetch for the subgraph.
See :ref:`guide-minibatch-prefetching` for a detailed explanation of prefetching.
prefetch_edata : list[str], optional
The edge data to prefetch for the subgraph.
See :ref:`guide-minibatch-prefetching` for a detailed explanation of prefetching.
output_device : device, optional
The device of the output subgraphs or MFGs. Default is the same as the
minibatch of partition indices.
Examples
--------
**Node classification**
With this sampler, the data loader will accept the list of partition IDs as
indices to iterate over. For instance, the following code first splits the
graph into 1000 partitions using METIS, and at each iteration it gets a subgraph
induced by the nodes covered by 20 randomly selected partitions.
>>> num_parts = 1000
>>> sampler = dgl.dataloading.ClusterGCNSampler(g, num_parts)
>>> dataloader = dgl.dataloading.DataLoader(
... g, torch.arange(num_parts), sampler,
... batch_size=20, shuffle=True, drop_last=False, num_workers=4)
>>> for subg in dataloader:
... train_on(subg)
"""
def __init__(self, g, k, balance_ntypes=None, balance_edges=False, mode='k-way',
prefetch_node_feats=None, prefetch_edge_feats=None, output_device=None,
cache_path='cluster_gcn.pkl'):
def __init__(self, g, k, cache_path='cluster_gcn.pkl', balance_ntypes=None,
balance_edges=False, mode='k-way', prefetch_ndata=None,
prefetch_edata=None, output_device=None):
super().__init__()
if os.path.exists(cache_path):
try:
with open(cache_path, 'rb') as f:
......@@ -61,16 +90,29 @@ class ClusterGCNSampler(object):
self.partition_offset = partition_offset
self.partition_node_ids = partition_node_ids
self.prefetch_node_feats = prefetch_node_feats or []
self.prefetch_edge_feats = prefetch_edge_feats or []
self.prefetch_ndata = prefetch_ndata or []
self.prefetch_edata = prefetch_edata or []
self.output_device = output_device
def sample(self, g, partition_ids):
"""Samples a subgraph given a list of partition IDs."""
def sample(self, g, partition_ids): # pylint: disable=arguments-differ
"""Sampling function.
Parameters
----------
g : DGLGraph
The graph to sample from.
partition_ids : Tensor
A 1-D integer tensor of partition IDs.
Returns
-------
DGLGraph
The sampled subgraph.
"""
node_ids = F.cat([
self.partition_node_ids[self.partition_offset[i]:self.partition_offset[i+1]]
for i in F.asnumpy(partition_ids)], 0)
sg = g.subgraph(node_ids, relabel_nodes=True, output_device=self.output_device)
set_node_lazy_features(sg, self.prefetch_node_feats)
set_edge_lazy_features(sg, self.prefetch_edge_feats)
set_node_lazy_features(sg, self.prefetch_ndata)
set_edge_lazy_features(sg, self.prefetch_edata)
return sg
......@@ -15,7 +15,7 @@ import torch
import torch.distributed as dist
from torch.utils.data.distributed import DistributedSampler
from ..base import NID, EID
from ..base import NID, EID, dgl_warning
from ..batch import batch as batch_graphs
from ..heterograph import DGLHeteroGraph
from .. import ndarray as nd
......@@ -24,8 +24,9 @@ from ..utils import (
create_shared_mem_array, get_shared_mem_array, context_of, pin_memory_inplace)
from ..frame import LazyFeature
from ..storages import wrap_storage
from .base import BlockSampler, EdgeBlockSampler
from .base import BlockSampler, as_edge_prediction_sampler
from .. import backend as F
from ..distributed import DistGraph
PYTHON_EXIT_STATUS = False
def _set_python_exit_flag():
......@@ -560,11 +561,129 @@ def _get_device(device):
return device
class DataLoader(torch.utils.data.DataLoader):
"""DataLoader class."""
"""PyTorch dataloader for batch-iterating over a set of nodes, edges or any other
kinds of indices. The minibatch of such indices will be then passed to a sampler
generating subgraphs, message flow graphs (MFGs), or any other structures necessary
to compute the representations.
Parameters
----------
graph : DGLGraph
The graph.
indices : Tensor or dict[ntype, Tensor]
The set of indices. It can either be a tensor of integer indices or a dictionary
of types and indices.
The actual meaning of the indices is defined by the :meth:`sample` method of
:attr:`graph_sampler`.
graph_sampler : dgl.dataloading.Sampler
The subgraph sampler.
device : device context, optional
The device of the generated MFGs in each iteration, which should be a
PyTorch device object (e.g., ``torch.device``).
By default this value is the same as the device of :attr:`g`.
use_ddp : boolean, optional
If True, tells the DataLoader to split the training set for each
participating process appropriately using
:class:`torch.utils.data.distributed.DistributedSampler`.
Overrides the :attr:`sampler` argument of :class:`torch.utils.data.DataLoader`.
ddp_seed : int, optional
The seed for shuffling the dataset in
:class:`torch.utils.data.distributed.DistributedSampler`.
Only effective when :attr:`use_ddp` is True.
use_uva : bool, optional
Whether to use Unified Virtual Addressing (UVA) to directly sample the graph
and slice the features from CPU into GPU. Setting it to True will pin the
graph and feature tensors into pinned memory.
If True, requires that :attr:`indices` must have the same device as the
:attr:`device` argument.
Default: False.
use_prefetch_thread : bool, optional
(Advanced option)
Spawns a new Python thread to perform feature slicing
asynchronously. Can make things faster at the cost of GPU memory.
Default: True if the graph is on CPU and :attr:`device` is CUDA. False otherwise.
use_alternate_streams : bool, optional
(Advanced option)
Whether to slice and transfers the features to GPU on a non-default stream.
Default: True if the graph is on CPU, :attr:`device` is CUDA, and :attr:`use_uva`
is False. False otherwise.
pin_prefetcher : bool, optional
(Advanced option)
Whether to pin the feature tensors into pinned memory.
Default: True if the graph is on CPU and :attr:`device` is CUDA. False otherwise.
batch_size : int, optional
drop_last : bool, optional
shuffle : bool, optional
kwargs : dict
Arguments being passed to :py:class:`torch.utils.data.DataLoader`.
Examples
--------
To train a 3-layer GNN for node classification on a set of nodes ``train_nid`` on
a homogeneous graph where each node takes messages from all neighbors (assume
the backend is PyTorch):
>>> sampler = dgl.dataloading.MultiLayerNeighborSampler([15, 10, 5])
>>> dataloader = dgl.dataloading.DataLoader(
... g, train_nid, sampler,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, output_nodes, blocks in dataloader:
... train_on(input_nodes, output_nodes, blocks)
**Using with Distributed Data Parallel**
If you are using PyTorch's distributed training (e.g. when using
:mod:`torch.nn.parallel.DistributedDataParallel`), you can train the model by turning
on the `use_ddp` option:
>>> sampler = dgl.dataloading.MultiLayerNeighborSampler([15, 10, 5])
>>> dataloader = dgl.dataloading.DataLoader(
... g, train_nid, sampler, use_ddp=True,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for epoch in range(start_epoch, n_epochs):
... for input_nodes, output_nodes, blocks in dataloader:
... train_on(input_nodes, output_nodes, blocks)
Notes
-----
Please refer to
:doc:`Minibatch Training Tutorials <tutorials/large/L0_neighbor_sampling_overview>`
and :ref:`User Guide Section 6 <guide-minibatch>` for usage.
**Tips for selecting the proper device**
* If the input graph :attr:`g` is on GPU, the output device :attr:`device` must be the same GPU
and :attr:`num_workers` must be zero. In this case, the sampling and subgraph construction
will take place on the GPU. This is the recommended setting when using a single-GPU and
the whole graph fits in GPU memory.
* If the input graph :attr:`g` is on CPU while the output device :attr:`device` is GPU, then
depending on the value of :attr:`use_uva`:
- If :attr:`use_uva` is set to True, the sampling and subgraph construction will happen
on GPU even if the GPU itself cannot hold the entire graph. This is the recommended
setting unless there are operations not supporting UVA. :attr:`num_workers` must be 0
in this case.
- Otherwise, both the sampling and subgraph construction will take place on the CPU.
"""
def __init__(self, graph, indices, graph_sampler, device='cpu', use_ddp=False,
ddp_seed=0, batch_size=1, drop_last=False, shuffle=False,
use_prefetch_thread=None, use_alternate_streams=None,
pin_prefetcher=None, use_uva=False, **kwargs):
if isinstance(graph, DistGraph):
raise TypeError(
'Please use dgl.dataloading.DistNodeDataLoader or '
'dgl.datalaoding.DistEdgeDataLoader for DistGraphs.')
# (BarclayII) I hoped that pin_prefetcher can be merged into PyTorch's native
# pin_memory argument. But our neighbor samplers and subgraph samplers
# return indices, which could be CUDA tensors (e.g. during UVA sampling)
......@@ -598,6 +717,10 @@ class DataLoader(torch.utils.data.DataLoader):
if use_uva:
if self.graph.device.type != 'cpu':
raise ValueError('Graph must be on CPU if UVA sampling is enabled.')
if indices_device != self.device:
raise ValueError(
f'Indices must be on the same device as the device argument '
f'({self.device})')
if num_workers > 0:
raise ValueError('num_workers must be 0 if UVA sampling is enabled.')
......@@ -608,8 +731,6 @@ class DataLoader(torch.utils.data.DataLoader):
for frame in itertools.chain(self.graph._node_frames, self.graph._edge_frames):
for col in frame._columns.values():
pin_memory_inplace(col.data)
indices = recursive_apply(indices, lambda x: x.to(self.device))
else:
if self.graph.device != indices_device:
raise ValueError(
......@@ -623,7 +744,12 @@ class DataLoader(torch.utils.data.DataLoader):
# Check pin_prefetcher and use_prefetch_thread - should be only effective
# if performing CPU sampling but output device is CUDA
if not (self.device.type == 'cuda' and self.graph.device.type == 'cpu'):
if self.device.type == 'cuda' and self.graph.device.type == 'cpu' and not use_uva:
if pin_prefetcher is None:
pin_prefetcher = True
if use_prefetch_thread is None:
use_prefetch_thread = True
else:
if pin_prefetcher is True:
raise ValueError(
'pin_prefetcher=True is only effective when device=cuda and '
......@@ -635,13 +761,8 @@ class DataLoader(torch.utils.data.DataLoader):
raise ValueError(
'use_prefetch_thread=True is only effective when device=cuda and '
'sampling is performed on CPU.')
if pin_prefetcher is None:
pin_prefetcher = False
else:
if pin_prefetcher is None:
pin_prefetcher = True
if use_prefetch_thread is None:
use_prefetch_thread = True
use_prefetch_thread = False
# Check use_alternate_streams
if use_alternate_streams is None:
......@@ -704,10 +825,8 @@ class NodeDataLoader(DataLoader):
The graph.
indices : Tensor or dict[ntype, Tensor]
The node set to compute outputs.
graph_sampler : object
The neighborhood sampler. It could be any object that has a :attr:`sample`
method. The :attr:`sample` methods must take in a graph object and either a tensor
of node indices or a dict of such tensors.
graph_sampler : dgl.dataloading.Sampler
The subgraph sampler.
device : device context, optional
The device of the generated MFGs in each iteration, which should be a
PyTorch device object (e.g., ``torch.device``).
......@@ -729,15 +848,10 @@ class NodeDataLoader(DataLoader):
and slice the features from CPU into GPU. Setting it to True will pin the
graph and feature tensors into pinned memory.
Default: False.
.. warning::
Using UVA with multiple GPUs may crash with device mismatch errors with
older CUDA drivers. We have confirmed that CUDA driver 450.142 will
crash while 465.19 will work. Therefore we recommend you to upgrade your
CUDA driver if you wish to use UVA with multiple GPUs.
If True, requires that :attr:`indices` must have the same device as the
:attr:`device` argument.
Default: False.
use_prefetch_thread : bool, optional
(Advanced option)
Spawns a new Python thread to perform feature slicing
......@@ -818,20 +932,6 @@ class EdgeDataLoader(DataLoader):
of message flow graphs (MFGs) as computation dependency of the said minibatch for
edge classification, edge regression, and link prediction.
For each iteration, the object will yield
* A tensor of input nodes necessary for computing the representation on edges, or
a dictionary of node type names and such tensors.
* A subgraph that contains only the edges in the minibatch and their incident nodes.
Note that the graph has an identical metagraph with the original graph.
* If a negative sampler is given, another graph that contains the "negative edges",
connecting the source and destination nodes yielded from the given negative sampler.
* A list of MFGs necessary for computing the representation of the incident nodes
of the edges in the minibatch.
For more details, please refer to :ref:`guide-minibatch-edge-classification-sampler`
and :ref:`guide-minibatch-link-classification-sampler`.
......@@ -841,10 +941,8 @@ class EdgeDataLoader(DataLoader):
The graph.
indices : Tensor or dict[etype, Tensor]
The edge set in graph :attr:`g` to compute outputs.
graph_sampler : object
The neighborhood sampler. It could be any object that has a :attr:`sample`
method. The :attr:`sample` methods must take in a graph object and either a tensor
of node indices or a dict of such tensors.
graph_sampler : dgl.dataloading.Sampler
The subgraph sampler
device : device context, optional
The device of the generated MFGs and graphs in each iteration, which should be a
PyTorch device object (e.g., ``torch.device``).
......@@ -878,58 +976,20 @@ class EdgeDataLoader(DataLoader):
Whether to pin the feature tensors into pinned memory.
Default: True if the graph is on CPU and :attr:`device` is CUDA. False otherwise.
exclude : str, optional
Whether and how to exclude dependencies related to the sampled edges in the
minibatch. Possible values are
* None, for not excluding any edges.
* ``self``, for excluding only the edges sampled as seed edges in this minibatch.
* ``reverse_id``, for excluding not only the edges sampled in the minibatch but
also their reverse edges of the same edge type. Requires the argument
:attr:`reverse_eids`.
* ``reverse_types``, for excluding not only the edges sampled in the minibatch
but also their reverse edges of different types but with the same IDs.
Requires the argument :attr:`reverse_etypes`.
* A callable which takes in a tensor or a dictionary of tensors and their
canonical edge types and returns a tensor or dictionary of tensors to
exclude.
reverse_eids : Tensor or dict[etype, Tensor], optional
A tensor of reverse edge ID mapping. The i-th element indicates the ID of
the i-th edge's reverse edge.
If the graph is heterogeneous, this argument requires a dictionary of edge
types and the reverse edge ID mapping tensors.
See the description of the argument with the same name in the docstring of
:class:`~dgl.dataloading.EdgeCollator` for more details.
reverse_etypes : dict[etype, etype], optional
The mapping from the original edge types to their reverse edge types.
See the description of the argument with the same name in the docstring of
:class:`~dgl.dataloading.EdgeCollator` for more details.
negative_sampler : callable, optional
The negative sampler.
See the description of the argument with the same name in the docstring of
:class:`~dgl.dataloading.EdgeCollator` for more details.
use_uva : bool, optional
Whether to use Unified Virtual Addressing (UVA) to directly sample the graph
and slice the features from CPU into GPU. Setting it to True will pin the
graph and feature tensors into pinned memory.
Default: False.
.. warning::
Using UVA with multiple GPUs may crash with device mismatch errors with
older CUDA drivers. We have confirmed that CUDA driver 450.142 will
crash while 465.19 will work. Therefore we recommend you to upgrade your
CUDA driver if you wish to use UVA with multiple GPUs.
If True, requires that :attr:`indices` must have the same device as the
:attr:`device` argument.
Default: False.
exclude : str, optional
reverse_eids : Tensor or dict[etype, Tensor], optional
reverse_etypes : dict[etype, etype], optional
negative_sampler : callable, optional
Deprecated and will be passed to :func:`dgl.dataloading.as_edge_prediction_sampler`.
batch_size : int, optional
drop_last : bool, optional
shuffle : bool, optional
......@@ -1062,6 +1122,10 @@ class EdgeDataLoader(DataLoader):
device = _get_device(device)
if isinstance(graph_sampler, BlockSampler):
dgl_warning(
'EdgeDataLoader directly taking a BlockSampler will be deprecated '
'and it will not support feature prefetching. '
'Please use dgl.dataloading.as_edge_prediction_sampler to wrap it.')
if reverse_eids is not None:
if use_uva:
reverse_eids = recursive_apply(reverse_eids, lambda x: x.to(device))
......@@ -1070,12 +1134,9 @@ class EdgeDataLoader(DataLoader):
indices_device = context_of(indices)
if indices_device != reverse_eids_device:
raise ValueError('Expect the same device for indices and reverse_eids')
graph_sampler = EdgeBlockSampler(
graph_sampler = as_edge_prediction_sampler(
graph_sampler, exclude=exclude, reverse_eids=reverse_eids,
reverse_etypes=reverse_etypes, negative_sampler=negative_sampler,
prefetch_node_feats=graph_sampler.prefetch_node_feats,
prefetch_labels=graph_sampler.prefetch_labels,
prefetch_edge_feats=graph_sampler.prefetch_edge_feats)
reverse_etypes=reverse_etypes, negative_sampler=negative_sampler)
super().__init__(
graph, indices, graph_sampler, device=device, use_ddp=use_ddp, ddp_seed=ddp_seed,
......@@ -1095,12 +1156,12 @@ PYTORCH_17 = PYTORCH_VER >= LooseVersion("1.7.0")
def _create_dist_sampler(dataset, dataloader_kwargs, ddp_seed):
# Note: will change the content of dataloader_kwargs
dist_sampler_kwargs = {'shuffle': dataloader_kwargs['shuffle']}
dist_sampler_kwargs = {'shuffle': dataloader_kwargs.get('shuffle', False)}
dataloader_kwargs['shuffle'] = False
if PYTORCH_16:
dist_sampler_kwargs['seed'] = ddp_seed
if PYTORCH_17:
dist_sampler_kwargs['drop_last'] = dataloader_kwargs['drop_last']
dist_sampler_kwargs['drop_last'] = dataloader_kwargs.get('drop_last', False)
dataloader_kwargs['drop_last'] = False
return DistributedSampler(dataset, **dist_sampler_kwargs)
......
......@@ -21,21 +21,38 @@ class NeighborSampler(BlockSampler):
If -1 is provided for one edge type on one layer, then all inbound edges
of that edge type will be included.
replace : bool, default False
Whether to sample with replacement
edge_dir : str, default ``'in'``
Can be either ``'in' `` where the neighbors will be sampled according to
incoming edges, or ``'out'`` otherwise, same as :func:`dgl.sampling.sample_neighbors`.
prob : str, optional
If given, the probability of each neighbor being sampled is proportional
to the edge feature value with the given name in ``g.edata``. The feature must be
a scalar on each edge.
replace : bool, default False
Whether to sample with replacement
prefetch_node_feats : list[str] or dict[ntype, list[str]], optional
The source node data to prefetch for the first MFG, corresponding to the
input node features necessary for the first GNN layer.
prefetch_labels : list[str] or dict[ntype, list[str]], optional
The destination node data to prefetch for the last MFG, corresponding to
the node labels of the minibatch.
prefetch_edge_feats : list[str] or dict[etype, list[str]], optional
The edge data names to prefetch for all the MFGs, corresponding to the
edge features necessary for all GNN layers.
output_device : device, optional
The device of the output subgraphs or MFGs. Default is the same as the
minibatch of seed nodes.
Examples
--------
**Node classification**
To train a 3-layer GNN for node classification on a set of nodes ``train_nid`` on
a homogeneous graph where each node takes messages from 5, 10, 15 neighbors for
the first, second, and third layer respectively (assuming the backend is PyTorch):
>>> sampler = dgl.dataloading.NeighborSampler([5, 10, 15])
>>> dataloader = dgl.dataloading.NodeDataLoader(
>>> dataloader = dgl.dataloading.DataLoader(
... g, train_nid, sampler,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, output_nodes, blocks in dataloader:
......@@ -55,14 +72,32 @@ class NeighborSampler(BlockSampler):
>>> g.edata['p'] = torch.rand(g.num_edges()) # any non-negative 1D vector works
>>> sampler = dgl.dataloading.NeighborSampler([5, 10, 15], prob='p')
**Edge classification and link prediction**
This class can also work for edge classification and link prediction together
with :func:`as_edge_prediction_sampler`.
>>> sampler = dgl.dataloading.NeighborSampler([5, 10, 15])
>>> sampler = dgl.dataloading.as_edge_prediction_sampler(sampler)
>>> dataloader = dgl.dataloading.DataLoader(
... g, train_eid, sampler,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
See the documentation :func:`as_edge_prediction_sampler` for more details.
Notes
-----
For the concept of MFGs, please refer to
:ref:`User Guide Section 6 <guide-minibatch>` and
:doc:`Minibatch Training Tutorials <tutorials/large/L0_neighbor_sampling_overview>`.
"""
def __init__(self, fanouts, edge_dir='in', prob=None, replace=False, **kwargs):
super().__init__(**kwargs)
def __init__(self, fanouts, edge_dir='in', prob=None, replace=False,
prefetch_node_feats=None, prefetch_labels=None, prefetch_edge_feats=None,
output_device=None):
super().__init__(prefetch_node_feats=prefetch_node_feats,
prefetch_labels=prefetch_labels,
prefetch_edge_feats=prefetch_edge_feats,
output_device=output_device)
self.fanouts = fanouts
self.edge_dir = edge_dir
self.prob = prob
......@@ -96,9 +131,8 @@ class MultiLayerFullNeighborSampler(NeighborSampler):
----------
n_layers : int
The number of GNN layers to sample.
return_eids : bool, default False
Whether to return the edge IDs involved in message passing in the MFG.
If True, the edge IDs will be stored as an edge feature named ``dgl.EID``.
kwargs :
Passed to :class:`dgl.dataloading.NeighborSampler`.
Examples
--------
......@@ -107,7 +141,7 @@ class MultiLayerFullNeighborSampler(NeighborSampler):
second, and third layer respectively (assuming the backend is PyTorch):
>>> sampler = dgl.dataloading.MultiLayerFullNeighborSampler(3)
>>> dataloader = dgl.dataloading.NodeDataLoader(
>>> dataloader = dgl.dataloading.DataLoader(
... g, train_nid, sampler,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, output_nodes, blocks in dataloader:
......@@ -119,6 +153,5 @@ class MultiLayerFullNeighborSampler(NeighborSampler):
:ref:`User Guide Section 6 <guide-minibatch>` and
:doc:`Minibatch Training Tutorials <tutorials/large/L0_neighbor_sampling_overview>`.
"""
def __init__(self, num_layers, edge_dir='in', prob=None, replace=False, **kwargs):
super().__init__([-1] * num_layers, edge_dir=edge_dir, prob=prob, replace=replace,
**kwargs)
def __init__(self, num_layers, **kwargs):
super().__init__([-1] * num_layers, **kwargs)
......@@ -2,19 +2,15 @@
from ..sampling.utils import EidExcluder
from .. import transforms
from ..base import NID
from .base import set_node_lazy_features, set_edge_lazy_features
from .base import set_node_lazy_features, set_edge_lazy_features, Sampler
class ShaDowKHopSampler(object):
class ShaDowKHopSampler(Sampler):
"""K-hop subgraph sampler used by
`ShaDow-GNN <https://arxiv.org/abs/2012.01380>`__.
It performs node-wise neighbor sampling but instead of returning a list of
MFGs, it returns a single subgraph induced by all the sampled nodes. The
seed nodes from which the neighbors are sampled will appear the first in the
induced nodes of the subgraph.
This is used in conjunction with :class:`dgl.dataloading.pytorch.NodeDataLoader`
and :class:`dgl.dataloading.pytorch.EdgeDataLoader`.
It performs node-wise neighbor sampling and returns the subgraph induced by
all the sampled nodes. The seed nodes from which the neighbors are sampled
will appear the first in the induced nodes of the subgraph.
Parameters
----------
......@@ -36,13 +32,15 @@ class ShaDowKHopSampler(object):
Examples
--------
**Node classification**
To train a 3-layer GNN for node classification on a set of nodes ``train_nid`` on
a homogeneous graph where each node takes messages from 5, 10, 15 neighbors for
the first, second, and third layer respectively (assuming the backend is PyTorch):
>>> g = dgl.data.CoraFullDataset()[0]
>>> sampler = dgl.dataloading.ShaDowKHopSampler([5, 10, 15])
>>> dataloader = dgl.dataloading.NodeDataLoader(
>>> dataloader = dgl.dataloading.DataLoader(
... g, torch.arange(g.num_nodes()), sampler,
... batch_size=5, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, output_nodes, (subgraph,) in dataloader:
......@@ -72,6 +70,7 @@ class ShaDowKHopSampler(object):
"""
def __init__(self, fanouts, replace=False, prob=None, prefetch_node_feats=None,
prefetch_edge_feats=None, output_device=None):
super().__init__()
self.fanouts = fanouts
self.replace = replace
self.prob = prob
......@@ -79,19 +78,35 @@ class ShaDowKHopSampler(object):
self.prefetch_edge_feats = prefetch_edge_feats
self.output_device = output_device
def sample(self, g, seed_nodes, exclude_edges=None):
"""Sample a subgraph given a tensor of seed nodes."""
def sample(self, g, seed_nodes, exclude_eids=None): # pylint: disable=arguments-differ
"""Sampling function.
Parameters
----------
g : DGLGraph
The graph to sampler from.
seed_nodes : Tensor or dict[str, Tensor]
The nodes sampled in the current minibatch.
exclude_eids : Tensor or dict[etype, Tensor], optional
The edges to exclude from neighborhood expansion.
Returns
-------
input_nodes, output_nodes, subg
A triplet containing (1) the node IDs inducing the subgraph, (2) the node
IDs that are sampled in this minibatch, and (3) the subgraph itself.
"""
output_nodes = seed_nodes
for fanout in reversed(self.fanouts):
frontier = g.sample_neighbors(
seed_nodes, fanout, output_device=self.output_device,
replace=self.replace, prob=self.prob, exclude_edges=exclude_edges)
replace=self.replace, prob=self.prob, exclude_edges=exclude_eids)
block = transforms.to_block(frontier, seed_nodes)
seed_nodes = block.srcdata[NID]
subg = g.subgraph(seed_nodes, relabel_nodes=True, output_device=self.output_device)
if exclude_edges is not None:
subg = EidExcluder(exclude_edges)(subg)
if exclude_eids is not None:
subg = EidExcluder(exclude_eids)(subg)
set_node_lazy_features(subg, self.prefetch_node_feats)
set_edge_lazy_features(subg, self.prefetch_edge_feats)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment