Unverified Commit c55ab2d1 authored by peizhou001's avatar peizhou001 Committed by GitHub
Browse files

deprecate candidates in dataloader (#5117)

parent 3a2a5031
......@@ -261,7 +261,7 @@ def track_acc(data):
g = g.formats('csc')
sampler = dgl.dataloading.MultiLayerNeighborSampler(fanouts)
train_loader = dgl.dataloading.NodeDataLoader(
train_loader = dgl.dataloading.DataLoader(
g,
target_nids[train_idx],
sampler,
......@@ -269,7 +269,7 @@ def track_acc(data):
shuffle=True,
drop_last=False,
num_workers=num_workers)
test_loader = dgl.dataloading.NodeDataLoader(
test_loader = dgl.dataloading.DataLoader(
g,
target_nids[test_idx],
sampler,
......
......@@ -59,7 +59,7 @@ class SAGE(nn.Module):
len(self.layers) - 1 else self.n_classes)
sampler = dgl.dataloading.MultiLayerFullNeighborSampler(1)
dataloader = dgl.dataloading.NodeDataLoader(
dataloader = dgl.dataloading.DataLoader(
g,
th.arange(g.number_of_nodes()),
sampler,
......@@ -147,7 +147,7 @@ def track_acc(data):
# Create PyTorch DataLoader for constructing blocks
sampler = dgl.dataloading.MultiLayerNeighborSampler(
[int(fanout) for fanout in fan_out.split(',')])
dataloader = dgl.dataloading.NodeDataLoader(
dataloader = dgl.dataloading.DataLoader(
g,
train_nid,
sampler,
......
......@@ -98,7 +98,7 @@ def track_time(data):
# Create PyTorch DataLoader for constructing blocks
sampler = dgl.dataloading.MultiLayerNeighborSampler(
[int(fanout) for fanout in fan_out.split(',')])
dataloader = dgl.dataloading.NodeDataLoader(
dataloader = dgl.dataloading.DataLoader(
g,
train_nid,
sampler,
......
......@@ -278,7 +278,7 @@ def track_time(data):
sparse_optimizer = th.optim.SparseAdam(list(embed_layer.node_embeds.parameters()), lr=lr)
sampler = dgl.dataloading.MultiLayerNeighborSampler([fanout] * n_layers)
loader = dgl.dataloading.NodeDataLoader(
loader = dgl.dataloading.DataLoader(
hg, {category: train_idx}, sampler,
batch_size=batch_size, shuffle=True, num_workers=4)
......
......@@ -239,7 +239,7 @@ def track_time(data):
g = g.formats('csc')
sampler = dgl.dataloading.MultiLayerNeighborSampler(fanouts)
loader = dgl.dataloading.NodeDataLoader(
loader = dgl.dataloading.DataLoader(
g,
target_nids[train_idx],
sampler,
......
......@@ -78,7 +78,7 @@ def track_time(data):
# Create PyTorch DataLoader for constructing blocks
sampler = dgl.dataloading.MultiLayerNeighborSampler(
[int(fanout) for fanout in fan_out.split(',')])
dataloader = dgl.dataloading.NodeDataLoader(
dataloader = dgl.dataloading.DataLoader(
g,
train_nid,
sampler,
......
......@@ -90,7 +90,7 @@ def run(result_queue, proc_id, n_gpus, args, devices, data):
# Create PyTorch DataLoader for constructing blocks
sampler = dgl.dataloading.MultiLayerNeighborSampler(
[int(fanout) for fanout in args.fan_out.split(',')])
dataloader = dgl.dataloading.NodeDataLoader(
dataloader = dgl.dataloading.DataLoader(
train_g,
train_nid,
sampler,
......
......@@ -49,21 +49,22 @@ def main(args):
val_eid_dict['forward'] = graph.edges['forward'].data["val_mask"].nonzero().squeeze()
test_eid_dict['forward'] = graph.edges['forward'].data["test_mask"].nonzero().squeeze()
tr_loader = dgl.dataloading.EdgeDataLoader(graph,
sampler = dgl.dataloading.as_edge_prediction_sampler(sampler)
tr_loader = dgl.dataloading.DataLoader(graph,
tr_eid_dict,
sampler,
batch_size=args.batch_size,
shuffle=True,
drop_last=False,
num_workers=args.num_workers)
val_loader = dgl.dataloading.EdgeDataLoader(graph,
val_loader = dgl.dataloading.DataLoader(graph,
val_eid_dict,
sampler,
batch_size=args.batch_size,
shuffle=True,
drop_last=False,
num_workers=args.num_workers)
test_loader = dgl.dataloading.EdgeDataLoader(graph,
test_loader = dgl.dataloading.DataLoader(graph,
test_eid_dict,
sampler,
batch_size=args.batch_size,
......
......@@ -200,7 +200,8 @@ def run(proc_id, n_gpus, args, devices, dataset):
for k in dataset.possible_rating_values}
reverse_types.update({v: k for k, v in reverse_types.items()})
sampler = dgl.dataloading.MultiLayerNeighborSampler([None], return_eids=True)
dataloader = dgl.dataloading.EdgeDataLoader(
sampler = dgl.dataloading.as_edge_prediction_sampler(sampler)
dataloader = dgl.dataloading.DataLoader(
dataset.train_enc_graph,
{to_etype_name(k): th.arange(
dataset.train_enc_graph.number_of_edges(etype=to_etype_name(k)))
......@@ -212,7 +213,7 @@ def run(proc_id, n_gpus, args, devices, dataset):
drop_last=False)
if proc_id == 0:
valid_dataloader = dgl.dataloading.EdgeDataLoader(
valid_dataloader = dgl.dataloading.DataLoader(
dataset.valid_dec_graph,
th.arange(dataset.valid_dec_graph.number_of_edges()),
sampler,
......@@ -220,7 +221,7 @@ def run(proc_id, n_gpus, args, devices, dataset):
batch_size=args.minibatch_size,
shuffle=False,
drop_last=False)
test_dataloader = dgl.dataloading.EdgeDataLoader(
test_dataloader = dgl.dataloading.DataLoader(
dataset.test_dec_graph,
th.arange(dataset.test_dec_graph.number_of_edges()),
sampler,
......
......@@ -122,13 +122,14 @@ class DataModule(LightningDataModule):
self.reverse_eids = reverse_eids
def train_dataloader(self):
return dgl.dataloading.EdgeDataLoader(
sampler = dgl.dataloading.as_edge_prediction_sampler(
self.sampler, exclude='reverse_id',
reverse_eids=self.reverse_eids,
negative_sampler=NegativeSampler(self.g, args.num_negs, args.neg_share))
return dgl.dataloading.DataLoader(
self.g,
np.arange(self.g.num_edges()),
self.sampler,
exclude='reverse_id',
reverse_eids=self.reverse_eids,
negative_sampler=NegativeSampler(self.g, args.num_negs, args.neg_share),
sampler,
device=self.device,
batch_size=self.batch_size,
shuffle=True,
......
import os
import dgl
import numpy as np
import torch as th
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.multiprocessing as mp
import dgl.function as fn
import dgl.nn.pytorch as dglnn
import time
import argparse
from torch.nn.parallel import DistributedDataParallel
from model import SAGE, compute_acc_unsupervised as compute_acc
from negative_sampler import NegativeSampler
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from load_graph import load_reddit, load_ogb
class CrossEntropyLoss(nn.Module):
def forward(self, block_outputs, pos_graph, neg_graph):
with pos_graph.local_scope():
pos_graph.ndata['h'] = block_outputs
pos_graph.apply_edges(fn.u_dot_v('h', 'h', 'score'))
pos_score = pos_graph.edata['score']
with neg_graph.local_scope():
neg_graph.ndata['h'] = block_outputs
neg_graph.apply_edges(fn.u_dot_v('h', 'h', 'score'))
neg_score = neg_graph.edata['score']
score = th.cat([pos_score, neg_score])
label = th.cat([th.ones_like(pos_score), th.zeros_like(neg_score)]).long()
loss = F.binary_cross_entropy_with_logits(score, label.float())
return loss
def evaluate(model, g, nfeat, labels, train_nids, val_nids, test_nids, device, args):
"""
Evaluate the model on the validation set specified by ``val_mask``.
g : The entire graph.
inputs : The features of all the nodes.
labels : The labels of all the nodes.
val_mask : A 0-1 mask indicating which nodes do we actually compute the accuracy for.
device : The GPU device to evaluate on.
"""
model.eval()
with th.no_grad():
# single gpu
if isinstance(model, SAGE):
pred = model.inference(g, nfeat, device, args.batch_size, args.num_workers)
# multi gpu
else:
pred = model.module.inference(g, nfeat, device, args.batch_size, args.num_workers)
model.train()
return compute_acc(pred, labels, train_nids, val_nids, test_nids)
#### Entry point
def run(proc_id, n_gpus, args, devices, data):
# Unpack data
device = th.device(devices[proc_id])
if n_gpus > 0:
th.cuda.set_device(device)
if n_gpus > 1:
dist_init_method = 'tcp://{master_ip}:{master_port}'.format(
master_ip='127.0.0.1', master_port='12345')
world_size = n_gpus
th.distributed.init_process_group(backend="nccl",
init_method=dist_init_method,
world_size=world_size,
rank=proc_id)
train_nid, val_nid, test_nid, n_classes, g, nfeat, labels = data
if args.data_device == 'gpu':
nfeat = nfeat.to(device)
elif args.data_device == 'uva':
nfeat = nfeat.pin_memory()
in_feats = nfeat.shape[1]
# Create PyTorch DataLoader for constructing blocks
n_edges = g.num_edges()
train_seeds = th.arange(n_edges)
if args.graph_device == 'gpu':
train_seeds = train_seeds.to(device)
g = g.to(device)
args.num_workers = 0
elif args.graph_device == 'uva':
train_seeds = train_seeds.to(device)
g.pin_memory_()
args.num_workers = 0
# Create sampler
sampler = dgl.dataloading.NeighborSampler(
[int(fanout) for fanout in args.fan_out.split(',')])
sampler = dgl.dataloading.as_edge_prediction_sampler(
sampler, exclude='reverse_id',
# For each edge with ID e in Reddit dataset, the reverse edge is e ± |E|/2.
reverse_eids=th.cat([
th.arange(n_edges // 2, n_edges),
th.arange(0, n_edges // 2)]).to(train_seeds),
negative_sampler=NegativeSampler(g, args.num_negs, args.neg_share,
device if args.graph_device == 'uva' else None))
dataloader = dgl.dataloading.DataLoader(
g, train_seeds, sampler,
device=device,
use_ddp=n_gpus > 1,
batch_size=args.batch_size,
shuffle=True,
drop_last=False,
num_workers=args.num_workers,
use_uva=args.graph_device == 'uva')
# Define model and optimizer
model = SAGE(in_feats, args.num_hidden, args.num_hidden, args.num_layers, F.relu, args.dropout)
model = model.to(device)
if n_gpus > 1:
model = DistributedDataParallel(model, device_ids=[device], output_device=device)
loss_fcn = CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=args.lr)
# Training loop
avg = 0
iter_pos = []
iter_neg = []
iter_d = []
iter_t = []
best_eval_acc = 0
best_test_acc = 0
for epoch in range(args.num_epochs):
tic = time.time()
# Loop over the dataloader to sample the computation dependency graph as a list of
# blocks.
tic_step = time.time()
for step, (input_nodes, pos_graph, neg_graph, blocks) in enumerate(dataloader):
input_nodes = input_nodes.to(device)
batch_inputs = dgl.utils.gather_pinned_tensor_rows(nfeat, input_nodes)
blocks = [block.int() for block in blocks]
d_step = time.time()
# Compute loss and prediction
batch_pred = model(blocks, batch_inputs)
loss = loss_fcn(batch_pred, pos_graph, neg_graph)
optimizer.zero_grad()
loss.backward()
optimizer.step()
t = time.time()
pos_edges = pos_graph.num_edges()
neg_edges = neg_graph.num_edges()
iter_pos.append(pos_edges / (t - tic_step))
iter_neg.append(neg_edges / (t - tic_step))
iter_d.append(d_step - tic_step)
iter_t.append(t - d_step)
if step % args.log_every == 0 and proc_id == 0:
gpu_mem_alloc = th.cuda.max_memory_allocated() / 1000000 if th.cuda.is_available() else 0
print('[{}]Epoch {:05d} | Step {:05d} | Loss {:.4f} | Speed (samples/sec) {:.4f}|{:.4f} | Load {:.4f}| train {:.4f} | GPU {:.1f} MB'.format(
proc_id, epoch, step, loss.item(), np.mean(iter_pos[3:]), np.mean(iter_neg[3:]), np.mean(iter_d[3:]), np.mean(iter_t[3:]), gpu_mem_alloc))
tic_step = time.time()
toc = time.time()
if proc_id == 0:
print('Epoch Time(s): {:.4f}'.format(toc - tic))
if epoch >= 5:
avg += toc - tic
if (epoch + 1) % args.eval_every == 0:
eval_acc, test_acc = evaluate(model, g, nfeat, labels, train_nid, val_nid, test_nid, device, args)
print('Eval Acc {:.4f} Test Acc {:.4f}'.format(eval_acc, test_acc))
if eval_acc > best_eval_acc:
best_eval_acc = eval_acc
best_test_acc = test_acc
print('Best Eval Acc {:.4f} Test Acc {:.4f}'.format(best_eval_acc, best_test_acc))
if n_gpus > 1:
th.distributed.barrier()
if proc_id == 0:
print('Avg epoch time: {}'.format(avg / (epoch - 4)))
def main(args):
devices = list(map(int, args.gpu.split(',')))
n_gpus = len(devices)
# load dataset
if args.dataset == 'reddit':
g, n_classes = load_reddit(self_loop=False)
elif args.dataset == 'ogbn-products':
g, n_classes = load_ogb('ogbn-products')
else:
raise Exception('unknown dataset')
train_nid = g.ndata.pop('train_mask').nonzero().squeeze()
val_nid = g.ndata.pop('val_mask').nonzero().squeeze()
test_nid = g.ndata.pop('test_mask').nonzero().squeeze()
nfeat = g.ndata.pop('features')
labels = g.ndata.pop('labels')
# Create csr/coo/csc formats before launching training processes with multi-gpu.
# This avoids creating certain formats in each sub-process, which saves memory and CPU.
g.create_formats_()
# this to avoid competition overhead on machines with many cores.
# Change it to a proper number on your machine, especially for multi-GPU training.
os.environ['OMP_NUM_THREADS'] = str(mp.cpu_count() // 2 // n_gpus)
# Pack data
data = train_nid, val_nid, test_nid, n_classes, g, nfeat, labels
if devices[0] == -1:
assert args.graph_device == 'cpu', \
f"Must have GPUs to enable {args.graph_device} sampling."
assert args.data_device == 'cpu', \
f"Must have GPUs to enable {args.data_device} feature storage."
run(0, 0, args, ['cpu'], data)
elif n_gpus == 1:
run(0, n_gpus, args, devices, data)
else:
mp.spawn(run, args=(n_gpus, args, devices, data), nprocs=n_gpus)
if __name__ == '__main__':
argparser = argparse.ArgumentParser("multi-gpu training")
argparser.add_argument("--gpu", type=str, default='0',
help="GPU, can be a list of gpus for multi-gpu training,"
" e.g., 0,1,2,3; -1 for CPU")
argparser.add_argument('--dataset', type=str, default='reddit',
choices=('reddit', 'ogbn-products'))
argparser.add_argument('--num-epochs', type=int, default=20)
argparser.add_argument('--num-hidden', type=int, default=16)
argparser.add_argument('--num-layers', type=int, default=2)
argparser.add_argument('--num-negs', type=int, default=1)
argparser.add_argument('--neg-share', default=False, action='store_true',
help="sharing neg nodes for positive nodes")
argparser.add_argument('--fan-out', type=str, default='10,25')
argparser.add_argument('--batch-size', type=int, default=10000)
argparser.add_argument('--log-every', type=int, default=20)
argparser.add_argument('--eval-every', type=int, default=5)
argparser.add_argument('--lr', type=float, default=0.003)
argparser.add_argument('--dropout', type=float, default=0.5)
argparser.add_argument('--num-workers', type=int, default=0,
help="Number of sampling processes. Use 0 for no extra process.")
argparser.add_argument('--graph-device', choices=('cpu', 'gpu', 'uva'), default='cpu',
help="Device to perform the sampling. "
"Must have 0 workers for 'gpu' and 'uva'")
argparser.add_argument('--data-device', choices=('cpu', 'gpu', 'uva'), default='gpu',
help="By default the script puts all node features and labels "
"on GPU when using it to save time for data copy. This may "
"be undesired if they cannot fit in GPU memory at once. "
"Use 'cpu' to keep the features on host memory and "
"'uva' to enable UnifiedTensor (GPU zero-copy access on "
"pinned host memory).")
args = argparser.parse_args()
main(args)
......@@ -142,7 +142,7 @@ xs = np.arange(len(global_densities))
fanouts = [args.knn_k - 1 for i in range(args.num_conv + 1)]
sampler = dgl.dataloading.MultiLayerNeighborSampler(fanouts)
# fix the number of edges
test_loader = dgl.dataloading.NodeDataLoader(
test_loader = dgl.dataloading.DataLoader(
g, torch.arange(g.number_of_nodes()), sampler,
batch_size=args.batch_size,
shuffle=False,
......@@ -213,7 +213,7 @@ for level in range(args.levels):
g = dataset.gs[0]
g.ndata['pred_den'] = torch.zeros((g.number_of_nodes()))
g.edata['prob_conn'] = torch.zeros((g.number_of_edges(), 2))
test_loader = dgl.dataloading.NodeDataLoader(
test_loader = dgl.dataloading.DataLoader(
g, torch.arange(g.number_of_nodes()), sampler,
batch_size=args.batch_size,
shuffle=False,
......
......@@ -110,7 +110,7 @@ def set_train_sampler_loader(g, k):
fanouts = [k-1 for i in range(args.num_conv + 1)]
sampler = dgl.dataloading.MultiLayerNeighborSampler(fanouts)
# fix the number of edges
train_dataloader = dgl.dataloading.NodeDataLoader(
train_dataloader = dgl.dataloading.DataLoader(
g, torch.arange(g.number_of_nodes()), sampler,
batch_size=args.batch_size,
shuffle=True,
......
......@@ -513,7 +513,7 @@ def as_edge_prediction_sampler(
>>> sampler = dgl.dataloading.as_edge_prediction_sampler(
... dgl.dataloading.NeighborSampler([15, 10, 5]),
... exclude='reverse_id', reverse_eids=reverse_eids)
>>> dataloader = dgl.dataloading.EdgeDataLoader(
>>> dataloader = dgl.dataloading.DataLoader(
... g, train_eid, sampler,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, pair_graph, blocks in dataloader:
......@@ -528,7 +528,7 @@ def as_edge_prediction_sampler(
... dgl.dataloading.NeighborSampler([15, 10, 5]),
... sampler, exclude='reverse_id', reverse_eids=reverse_eids,
... negative_sampler=neg_sampler)
>>> dataloader = dgl.dataloading.EdgeDataLoader(
>>> dataloader = dgl.dataloading.DataLoader(
... g, train_eid, sampler,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, pos_pair_graph, neg_pair_graph, blocks in dataloader:
......@@ -550,7 +550,7 @@ def as_edge_prediction_sampler(
... dgl.dataloading.NeighborSampler([15, 10, 5]),
... exclude='reverse_types',
... reverse_etypes={'click': 'clicked-by', 'clicked-by': 'click'})
>>> dataloader = dgl.dataloading.EdgeDataLoader(
>>> dataloader = dgl.dataloading.DataLoader(
... g, {'click': train_eid}, sampler,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, pair_graph, blocks in dataloader:
......@@ -564,7 +564,7 @@ def as_edge_prediction_sampler(
... exclude='reverse_types',
... reverse_etypes={'click': 'clicked-by', 'clicked-by': 'click'},
... negative_sampler=neg_sampler)
>>> dataloader = dgl.dataloading.EdgeDataLoader(
>>> dataloader = dgl.dataloading.DataLoader(
... g, train_eid, sampler,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, pos_pair_graph, neg_pair_graph, blocks in dataloader:
......
......@@ -22,10 +22,9 @@ from .._ffi.base import is_tensor_adaptor_enabled
from ..heterograph import DGLGraph
from ..utils import (
recursive_apply, ExceptionWrapper, recursive_apply_pair, set_num_threads, get_num_threads,
get_numa_nodes_cores, context_of, dtype_of, version)
get_numa_nodes_cores, dtype_of, version)
from ..frame import LazyFeature
from ..storages import wrap_storage
from .base import BlockSampler, as_edge_prediction_sampler
from .. import backend as F
from ..distributed import DistGraph
from ..multiprocessing import call_once_and_share
......@@ -967,87 +966,6 @@ class DataLoader(torch.utils.data.DataLoader):
self.other_storages[name] = wrap_storage(data)
# Alias
class NodeDataLoader(DataLoader):
"""(DEPRECATED) Sampled graph data loader over a set of nodes.
.. deprecated:: 0.8
The class is deprecated since v0.8, replaced by :class:`~dgl.dataloading.DataLoader`.
"""
class EdgeDataLoader(DataLoader):
"""(DEPRECATED) Sampled graph data loader over a set of edges.
.. deprecated:: 0.8
The class is deprecated since v0.8 -- its function has been covered by
:class:`~dgl.dataloading.DataLoader` and :func:`~dgl.as_edge_prediction_sampler`.
To migrate, change the legacy usage like:
.. code:: python
sampler = dgl.dataloading.MultiLayerNeighborSampler([15, 10, 5])
dataloader = dgl.dataloading.EdgeDataLoader(
g, train_eid, sampler, exclude='reverse_id',
reverse_eids=reverse_eids,
negative_sampler=dgl.dataloading.negative_sampler.Uniform(5),
batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
to:
.. code:: python
sampler = dgl.dataloading.MultiLayerNeighborSampler([15, 10, 5])
sampler = dgl.dataloading.as_edge_prediction_sampler(
sampler, exclude='reverse_id',
reverse_eids=reverse_eids,
negative_sampler=dgl.dataloading.negative_sampler.Uniform(5))
dataloader = dgl.dataloading.DataLoader(
g, train_eid, sampler,
batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
"""
def __init__(self, graph, indices, graph_sampler, device=None, use_ddp=False,
ddp_seed=0, batch_size=1, drop_last=False, shuffle=False,
use_prefetch_thread=False, use_alternate_streams=True,
pin_prefetcher=False,
exclude=None, reverse_eids=None, reverse_etypes=None, negative_sampler=None,
use_uva=False, **kwargs):
if device is None:
if use_uva:
device = torch.cuda.current_device()
else:
device = graph.device
device = _get_device(device)
if isinstance(graph_sampler, BlockSampler):
dgl_warning(
'EdgeDataLoader directly taking a BlockSampler will be deprecated '
'and it will not support feature prefetching. '
'Please use dgl.dataloading.as_edge_prediction_sampler to wrap it.')
if reverse_eids is not None:
if use_uva:
reverse_eids = recursive_apply(reverse_eids, lambda x: x.to(device))
else:
reverse_eids_device = context_of(reverse_eids)
indices_device = context_of(indices)
if indices_device != reverse_eids_device:
raise ValueError('Expect the same device for indices and reverse_eids')
graph_sampler = as_edge_prediction_sampler(
graph_sampler, exclude=exclude, reverse_eids=reverse_eids,
reverse_etypes=reverse_etypes, negative_sampler=negative_sampler)
super().__init__(
graph, indices, graph_sampler, device=device, use_ddp=use_ddp, ddp_seed=ddp_seed,
batch_size=batch_size, drop_last=drop_last, shuffle=shuffle,
use_prefetch_thread=use_prefetch_thread, use_alternate_streams=use_alternate_streams,
pin_prefetcher=pin_prefetcher, use_uva=use_uva,
**kwargs)
######## Graph DataLoaders ########
# GraphDataLoader loads a set of graphs so it's not relevant to the above. They are currently
# copied from the old DataLoader implementation.
......
......@@ -578,7 +578,7 @@ class DistEdgeDataLoader(DistDataLoader):
graph.
All the arguments have the same meaning as the single-machine counterpart
:class:`dgl.dataloading.EdgeDataLoader` except the first argument
:class:`dgl.dataloading.DataLoader` except the first argument
:attr:`g` which must be a :class:`dgl.distributed.DistGraph`.
Parameters
......@@ -587,11 +587,11 @@ class DistEdgeDataLoader(DistDataLoader):
The distributed graph.
eids, graph_sampler, device, kwargs :
See :class:`dgl.dataloading.EdgeDataLoader`.
See :class:`dgl.dataloading.DataLoader`.
See also
--------
dgl.dataloading.EdgeDataLoader
dgl.dataloading.DataLoader
"""
def __init__(self, g, eids, graph_sampler, device=None, **kwargs):
collator_kwargs = {}
......
......@@ -98,7 +98,7 @@ def test_neighbor_nonuniform(idtype, mode, use_ddp, use_mask):
sampler = dgl.dataloading.MultiLayerNeighborSampler([2], prob=prob, mask=mask)
for num_workers in [0, 1, 2] if mode == 'cpu' else [0]:
dataloader = dgl.dataloading.NodeDataLoader(
dataloader = dgl.dataloading.DataLoader(
g, indices, sampler,
batch_size=1, device=F.ctx(),
num_workers=num_workers,
......@@ -123,7 +123,7 @@ def test_neighbor_nonuniform(idtype, mode, use_ddp, use_mask):
if mode == 'pure_gpu':
g = g.to(F.cuda())
for num_workers in [0, 1, 2] if mode == 'cpu' else [0]:
dataloader = dgl.dataloading.NodeDataLoader(
dataloader = dgl.dataloading.DataLoader(
g, {'A': indices}, sampler,
batch_size=1, device=F.ctx(),
num_workers=num_workers,
......
......@@ -30,7 +30,7 @@ train_nids = idx_split['train']
node_features = graph.ndata['feat']
sampler = dgl.dataloading.MultiLayerNeighborSampler([4, 4])
train_dataloader = dgl.dataloading.NodeDataLoader(
train_dataloader = dgl.dataloading.DataLoader(
graph, train_nids, sampler,
batch_size=1024,
shuffle=True,
......@@ -94,8 +94,8 @@ mfg.srcdata[dgl.NID], mfg.dstdata[dgl.NID]
######################################################################
# Recall that the MFGs yielded by the ``NodeDataLoader`` and
# ``EdgeDataLoader`` have the property that the first few source nodes are
# Recall that the MFGs yielded by the ``DataLoader``
# have the property that the first few source nodes are
# always identical to the destination nodes:
#
# |image1|
......@@ -195,7 +195,7 @@ class Model(nn.Module):
return h
sampler = dgl.dataloading.MultiLayerNeighborSampler([4, 4])
train_dataloader = dgl.dataloading.NodeDataLoader(
train_dataloader = dgl.dataloading.DataLoader(
graph, train_nids, sampler,
device=device,
batch_size=1024,
......
......@@ -120,7 +120,7 @@ def run(proc_id, devices):
# data loading.
sampler = dgl.dataloading.NeighborSampler([4, 4])
train_dataloader = dgl.dataloading.DataLoader(
# The following arguments are specific to NodeDataLoader.
# The following arguments are specific to DataLoader.
graph, # The graph
train_nids, # The node IDs to iterate over in minibatches
sampler, # The neighbor sampler
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment