"examples/multigpu/vscode:/vscode.git/clone" did not exist on "2a92dfca306562e403b7310046c2dfeb506b23ed"
Unverified Commit 463650a7 authored by Xin Yao's avatar Xin Yao Committed by GitHub
Browse files

[Unittest] Improve test_dataloader (#4301)

* test ddp dataloader

* add pure_gpu for edgedataloader

* resolve ddp issue
parent 4dd16f5d
...@@ -7,10 +7,7 @@ import unittest ...@@ -7,10 +7,7 @@ import unittest
import torch import torch
import torch.distributed as dist import torch.distributed as dist
from functools import partial from functools import partial
from torch.utils.data import DataLoader
from collections import defaultdict
from collections.abc import Iterator, Mapping from collections.abc import Iterator, Mapping
from itertools import product
from test_utils import parametrize_idtype from test_utils import parametrize_idtype
import pytest import pytest
...@@ -41,7 +38,7 @@ def test_cluster_gcn(num_workers): ...@@ -41,7 +38,7 @@ def test_cluster_gcn(num_workers):
def test_shadow(num_workers): def test_shadow(num_workers):
g = dgl.data.CoraFullDataset()[0] g = dgl.data.CoraFullDataset()[0]
sampler = dgl.dataloading.ShaDowKHopSampler([5, 10, 15]) sampler = dgl.dataloading.ShaDowKHopSampler([5, 10, 15])
dataloader = dgl.dataloading.NodeDataLoader( dataloader = dgl.dataloading.DataLoader(
g, torch.arange(g.num_nodes()), sampler, g, torch.arange(g.num_nodes()), sampler,
batch_size=5, shuffle=True, drop_last=False, num_workers=num_workers) batch_size=5, shuffle=True, drop_last=False, num_workers=num_workers)
for i, (input_nodes, output_nodes, subgraph) in enumerate(dataloader): for i, (input_nodes, output_nodes, subgraph) in enumerate(dataloader):
...@@ -161,38 +158,37 @@ def _check_device(data): ...@@ -161,38 +158,37 @@ def _check_device(data):
@parametrize_idtype @parametrize_idtype
@pytest.mark.parametrize('sampler_name', ['full', 'neighbor', 'neighbor2']) @pytest.mark.parametrize('sampler_name', ['full', 'neighbor', 'neighbor2'])
@pytest.mark.parametrize('pin_graph', [None, 'cuda_indices', 'cpu_indices']) @pytest.mark.parametrize('mode', ['cpu', 'uva_cuda_indices', 'uva_cpu_indices', 'pure_gpu'])
def test_node_dataloader(idtype, sampler_name, pin_graph): @pytest.mark.parametrize('use_ddp', [False, True])
def test_node_dataloader(idtype, sampler_name, mode, use_ddp):
if mode != 'cpu' and F.ctx() == F.cpu():
pytest.skip('UVA and GPU sampling require a GPU.')
if use_ddp:
dist.init_process_group('gloo' if F.ctx() == F.cpu() else 'nccl',
'tcp://127.0.0.1:12347', world_size=1, rank=0)
g1 = dgl.graph(([0, 0, 0, 1, 1], [1, 2, 3, 3, 4])).astype(idtype) g1 = dgl.graph(([0, 0, 0, 1, 1], [1, 2, 3, 3, 4])).astype(idtype)
g1.ndata['feat'] = F.copy_to(F.randn((5, 8)), F.cpu()) g1.ndata['feat'] = F.copy_to(F.randn((5, 8)), F.cpu())
g1.ndata['label'] = F.copy_to(F.randn((g1.num_nodes(),)), F.cpu()) g1.ndata['label'] = F.copy_to(F.randn((g1.num_nodes(),)), F.cpu())
indices = F.arange(0, g1.num_nodes(), idtype) if mode in ('cpu', 'uva_cpu_indices'):
if F.ctx() != F.cpu(): indices = F.copy_to(F.arange(0, g1.num_nodes(), idtype), F.cpu())
if pin_graph:
g1.create_formats_()
g1.pin_memory_()
if pin_graph == 'cpu_indices':
indices = F.arange(0, g1.num_nodes(), idtype, F.cpu())
elif pin_graph == 'cuda_indices':
if F._default_context_str == 'gpu':
indices = F.arange(0, g1.num_nodes(), idtype, F.cuda())
else:
return # skip
else: else:
g1 = g1.to('cuda') indices = F.copy_to(F.arange(0, g1.num_nodes(), idtype), F.cuda())
if mode == 'pure_gpu':
g1 = g1.to(F.cuda())
use_uva = pin_graph is not None and F.ctx() != F.cpu() use_uva = mode.startswith('uva')
for num_workers in [0, 1, 2]:
sampler = { sampler = {
'full': dgl.dataloading.MultiLayerFullNeighborSampler(2), 'full': dgl.dataloading.MultiLayerFullNeighborSampler(2),
'neighbor': dgl.dataloading.MultiLayerNeighborSampler([3, 3]), 'neighbor': dgl.dataloading.MultiLayerNeighborSampler([3, 3]),
'neighbor2': dgl.dataloading.MultiLayerNeighborSampler([3, 3])}[sampler_name] 'neighbor2': dgl.dataloading.MultiLayerNeighborSampler([3, 3])}[sampler_name]
dataloader = dgl.dataloading.NodeDataLoader( for num_workers in [0, 1, 2] if mode == 'cpu' else [0]:
dataloader = dgl.dataloading.DataLoader(
g1, indices, sampler, device=F.ctx(), g1, indices, sampler, device=F.ctx(),
batch_size=g1.num_nodes(), batch_size=g1.num_nodes(),
num_workers=(num_workers if (pin_graph and F.ctx() == F.cpu()) else 0), num_workers=num_workers,
use_uva=use_uva) use_uva=use_uva,
use_ddp=use_ddp)
for input_nodes, output_nodes, blocks in dataloader: for input_nodes, output_nodes, blocks in dataloader:
_check_device(input_nodes) _check_device(input_nodes)
_check_device(output_nodes) _check_device(output_nodes)
...@@ -200,8 +196,6 @@ def test_node_dataloader(idtype, sampler_name, pin_graph): ...@@ -200,8 +196,6 @@ def test_node_dataloader(idtype, sampler_name, pin_graph):
_check_dtype(input_nodes, idtype, 'dtype') _check_dtype(input_nodes, idtype, 'dtype')
_check_dtype(output_nodes, idtype, 'dtype') _check_dtype(output_nodes, idtype, 'dtype')
_check_dtype(blocks, idtype, 'idtype') _check_dtype(blocks, idtype, 'idtype')
if g1.is_pinned():
g1.unpin_memory_()
g2 = dgl.heterograph({ g2 = dgl.heterograph({
('user', 'follow', 'user'): ([0, 0, 0, 1, 1, 1, 2], [1, 2, 3, 0, 2, 3, 0]), ('user', 'follow', 'user'): ([0, 0, 0, 1, 1, 1, 2], [1, 2, 3, 0, 2, 3, 0]),
...@@ -211,32 +205,25 @@ def test_node_dataloader(idtype, sampler_name, pin_graph): ...@@ -211,32 +205,25 @@ def test_node_dataloader(idtype, sampler_name, pin_graph):
}).astype(idtype) }).astype(idtype)
for ntype in g2.ntypes: for ntype in g2.ntypes:
g2.nodes[ntype].data['feat'] = F.copy_to(F.randn((g2.num_nodes(ntype), 8)), F.cpu()) g2.nodes[ntype].data['feat'] = F.copy_to(F.randn((g2.num_nodes(ntype), 8)), F.cpu())
indices = {nty: F.arange(0, g2.num_nodes(nty)) for nty in g2.ntypes} if mode in ('cpu', 'uva_cpu_indices'):
if F.ctx() != F.cpu(): indices = {nty: F.copy_to(g2.nodes(nty), F.cpu()) for nty in g2.ntypes}
if pin_graph:
g2.create_formats_()
g2.pin_memory_()
if pin_graph == 'cpu_indices':
indices = {nty: F.arange(0, g2.num_nodes(nty), idtype, F.cpu()) for nty in g2.ntypes}
elif pin_graph == 'cuda_indices':
if F._default_context_str == 'gpu':
indices = {nty: F.arange(0, g2.num_nodes(), idtype, F.cuda()) for nty in g2.ntypes}
else:
return # skip
else: else:
g2 = g2.to('cuda') indices = {nty: F.copy_to(g2.nodes(nty), F.cuda()) for nty in g2.ntypes}
if mode == 'pure_gpu':
g2 = g2.to(F.cuda())
batch_size = max(g2.num_nodes(nty) for nty in g2.ntypes) batch_size = max(g2.num_nodes(nty) for nty in g2.ntypes)
sampler = { sampler = {
'full': dgl.dataloading.MultiLayerFullNeighborSampler(2), 'full': dgl.dataloading.MultiLayerFullNeighborSampler(2),
'neighbor': dgl.dataloading.MultiLayerNeighborSampler([{etype: 3 for etype in g2.etypes}] * 2), 'neighbor': dgl.dataloading.MultiLayerNeighborSampler([{etype: 3 for etype in g2.etypes}] * 2),
'neighbor2': dgl.dataloading.MultiLayerNeighborSampler([3, 3])}[sampler_name] 'neighbor2': dgl.dataloading.MultiLayerNeighborSampler([3, 3])}[sampler_name]
for num_workers in [0, 1, 2] if mode == 'cpu' else [0]:
dataloader = dgl.dataloading.NodeDataLoader( dataloader = dgl.dataloading.DataLoader(
g2, {nty: g2.nodes(nty) for nty in g2.ntypes}, g2, indices, sampler,
sampler, device=F.ctx(), batch_size=batch_size, device=F.ctx(), batch_size=batch_size,
num_workers=(num_workers if (pin_graph and F.ctx() == F.cpu()) else 0), num_workers=num_workers,
use_uva=use_uva) use_uva=use_uva,
use_ddp=use_ddp)
assert isinstance(iter(dataloader), Iterator) assert isinstance(iter(dataloader), Iterator)
for input_nodes, output_nodes, blocks in dataloader: for input_nodes, output_nodes, blocks in dataloader:
_check_device(input_nodes) _check_device(input_nodes)
...@@ -246,38 +233,52 @@ def test_node_dataloader(idtype, sampler_name, pin_graph): ...@@ -246,38 +233,52 @@ def test_node_dataloader(idtype, sampler_name, pin_graph):
_check_dtype(output_nodes, idtype, 'dtype') _check_dtype(output_nodes, idtype, 'dtype')
_check_dtype(blocks, idtype, 'idtype') _check_dtype(blocks, idtype, 'idtype')
if g2.is_pinned(): if use_ddp:
g2.unpin_memory_() dist.destroy_process_group()
@parametrize_idtype
@pytest.mark.parametrize('sampler_name', ['full', 'neighbor']) @pytest.mark.parametrize('sampler_name', ['full', 'neighbor'])
@pytest.mark.parametrize('neg_sampler', [ @pytest.mark.parametrize('neg_sampler', [
dgl.dataloading.negative_sampler.Uniform(2), dgl.dataloading.negative_sampler.Uniform(2),
dgl.dataloading.negative_sampler.GlobalUniform(15, False, 3), dgl.dataloading.negative_sampler.GlobalUniform(15, False, 3),
dgl.dataloading.negative_sampler.GlobalUniform(15, True, 3)]) dgl.dataloading.negative_sampler.GlobalUniform(15, True, 3)])
@pytest.mark.parametrize('pin_graph', [False, True]) @pytest.mark.parametrize('mode', ['cpu', 'uva', 'pure_gpu'])
def test_edge_dataloader(sampler_name, neg_sampler, pin_graph): @pytest.mark.parametrize('use_ddp', [False, True])
g1 = dgl.graph(([0, 0, 0, 1, 1], [1, 2, 3, 3, 4])) def test_edge_dataloader(idtype, sampler_name, neg_sampler, mode, use_ddp):
if F.ctx() != F.cpu() and pin_graph: if mode != 'cpu' and F.ctx() == F.cpu():
g1.create_formats_() pytest.skip('UVA and GPU sampling require a GPU.')
g1.pin_memory_() if mode == 'uva' and isinstance(neg_sampler, dgl.dataloading.negative_sampler.GlobalUniform):
pytest.skip("GlobalUniform don't support UVA yet.")
if use_ddp:
dist.init_process_group('gloo' if F.ctx() == F.cpu() else 'nccl',
'tcp://127.0.0.1:12347', world_size=1, rank=0)
g1 = dgl.graph(([0, 0, 0, 1, 1], [1, 2, 3, 3, 4])).astype(idtype)
g1.ndata['feat'] = F.copy_to(F.randn((5, 8)), F.cpu()) g1.ndata['feat'] = F.copy_to(F.randn((5, 8)), F.cpu())
if mode == 'pure_gpu':
g1 = g1.to(F.cuda())
sampler = { sampler = {
'full': dgl.dataloading.MultiLayerFullNeighborSampler(2), 'full': dgl.dataloading.MultiLayerFullNeighborSampler(2),
'neighbor': dgl.dataloading.MultiLayerNeighborSampler([3, 3])}[sampler_name] 'neighbor': dgl.dataloading.MultiLayerNeighborSampler([3, 3])}[sampler_name]
# no negative sampler # no negative sampler
dataloader = dgl.dataloading.EdgeDataLoader( edge_sampler = dgl.dataloading.as_edge_prediction_sampler(sampler)
g1, g1.edges(form='eid'), sampler, device=F.ctx(), batch_size=g1.num_edges()) dataloader = dgl.dataloading.DataLoader(
g1, g1.edges(form='eid'), edge_sampler,
device=F.ctx(), batch_size=g1.num_edges(),
use_uva=(mode == 'uva'), use_ddp=use_ddp)
for input_nodes, pos_pair_graph, blocks in dataloader: for input_nodes, pos_pair_graph, blocks in dataloader:
_check_device(input_nodes) _check_device(input_nodes)
_check_device(pos_pair_graph) _check_device(pos_pair_graph)
_check_device(blocks) _check_device(blocks)
# negative sampler # negative sampler
dataloader = dgl.dataloading.EdgeDataLoader( edge_sampler = dgl.dataloading.as_edge_prediction_sampler(
g1, g1.edges(form='eid'), sampler, device=F.ctx(), sampler, negative_sampler=neg_sampler)
negative_sampler=neg_sampler, batch_size=g1.num_edges()) dataloader = dgl.dataloading.DataLoader(
g1, g1.edges(form='eid'), edge_sampler,
device=F.ctx(), batch_size=g1.num_edges(),
use_uva=(mode == 'uva'), use_ddp=use_ddp)
for input_nodes, pos_pair_graph, neg_pair_graph, blocks in dataloader: for input_nodes, pos_pair_graph, neg_pair_graph, blocks in dataloader:
_check_device(input_nodes) _check_device(input_nodes)
_check_device(pos_pair_graph) _check_device(pos_pair_graph)
...@@ -289,9 +290,12 @@ def test_edge_dataloader(sampler_name, neg_sampler, pin_graph): ...@@ -289,9 +290,12 @@ def test_edge_dataloader(sampler_name, neg_sampler, pin_graph):
('user', 'followed-by', 'user'): ([1, 2, 3, 0, 2, 3, 0], [0, 0, 0, 1, 1, 1, 2]), ('user', 'followed-by', 'user'): ([1, 2, 3, 0, 2, 3, 0], [0, 0, 0, 1, 1, 1, 2]),
('user', 'play', 'game'): ([0, 1, 1, 3, 5], [0, 1, 2, 0, 2]), ('user', 'play', 'game'): ([0, 1, 1, 3, 5], [0, 1, 2, 0, 2]),
('game', 'played-by', 'user'): ([0, 1, 2, 0, 2], [0, 1, 1, 3, 5]) ('game', 'played-by', 'user'): ([0, 1, 2, 0, 2], [0, 1, 1, 3, 5])
}) }).astype(idtype)
for ntype in g2.ntypes: for ntype in g2.ntypes:
g2.nodes[ntype].data['feat'] = F.copy_to(F.randn((g2.num_nodes(ntype), 8)), F.cpu()) g2.nodes[ntype].data['feat'] = F.copy_to(F.randn((g2.num_nodes(ntype), 8)), F.cpu())
if mode == 'pure_gpu':
g2 = g2.to(F.cuda())
batch_size = max(g2.num_edges(ety) for ety in g2.canonical_etypes) batch_size = max(g2.num_edges(ety) for ety in g2.canonical_etypes)
sampler = { sampler = {
'full': dgl.dataloading.MultiLayerFullNeighborSampler(2), 'full': dgl.dataloading.MultiLayerFullNeighborSampler(2),
...@@ -299,19 +303,23 @@ def test_edge_dataloader(sampler_name, neg_sampler, pin_graph): ...@@ -299,19 +303,23 @@ def test_edge_dataloader(sampler_name, neg_sampler, pin_graph):
}[sampler_name] }[sampler_name]
# no negative sampler # no negative sampler
dataloader = dgl.dataloading.EdgeDataLoader( edge_sampler = dgl.dataloading.as_edge_prediction_sampler(sampler)
dataloader = dgl.dataloading.DataLoader(
g2, {ety: g2.edges(form='eid', etype=ety) for ety in g2.canonical_etypes}, g2, {ety: g2.edges(form='eid', etype=ety) for ety in g2.canonical_etypes},
sampler, device=F.ctx(), batch_size=batch_size) edge_sampler, device=F.ctx(), batch_size=batch_size,
use_uva=(mode == 'uva'), use_ddp=use_ddp)
for input_nodes, pos_pair_graph, blocks in dataloader: for input_nodes, pos_pair_graph, blocks in dataloader:
_check_device(input_nodes) _check_device(input_nodes)
_check_device(pos_pair_graph) _check_device(pos_pair_graph)
_check_device(blocks) _check_device(blocks)
# negative sampler # negative sampler
dataloader = dgl.dataloading.EdgeDataLoader( edge_sampler = dgl.dataloading.as_edge_prediction_sampler(
sampler, negative_sampler=neg_sampler)
dataloader = dgl.dataloading.DataLoader(
g2, {ety: g2.edges(form='eid', etype=ety) for ety in g2.canonical_etypes}, g2, {ety: g2.edges(form='eid', etype=ety) for ety in g2.canonical_etypes},
sampler, device=F.ctx(), negative_sampler=neg_sampler, edge_sampler, device=F.ctx(),batch_size=batch_size,
batch_size=batch_size) use_uva=(mode == 'uva'), use_ddp=use_ddp)
assert isinstance(iter(dataloader), Iterator) assert isinstance(iter(dataloader), Iterator)
for input_nodes, pos_pair_graph, neg_pair_graph, blocks in dataloader: for input_nodes, pos_pair_graph, neg_pair_graph, blocks in dataloader:
...@@ -320,8 +328,8 @@ def test_edge_dataloader(sampler_name, neg_sampler, pin_graph): ...@@ -320,8 +328,8 @@ def test_edge_dataloader(sampler_name, neg_sampler, pin_graph):
_check_device(neg_pair_graph) _check_device(neg_pair_graph)
_check_device(blocks) _check_device(blocks)
if g1.is_pinned(): if use_ddp:
g1.unpin_memory_() dist.destroy_process_group()
def _create_homogeneous(): def _create_homogeneous():
s = torch.randint(0, 200, (1000,), device=F.ctx()) s = torch.randint(0, 200, (1000,), device=F.ctx())
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment