import os import unittest from collections.abc import Iterator, Mapping from functools import partial import backend as F import dgl import dgl.ops as OPS import numpy as np import pytest import torch import torch.distributed as dist from test_utils import parametrize_idtype @pytest.mark.parametrize("batch_size", [None, 16]) def test_graph_dataloader(batch_size): num_batches = 2 num_samples = num_batches * (batch_size if batch_size is not None else 1) minigc_dataset = dgl.data.MiniGCDataset(num_samples, 10, 20) data_loader = dgl.dataloading.GraphDataLoader( minigc_dataset, batch_size=batch_size, shuffle=True ) assert isinstance(iter(data_loader), Iterator) for graph, label in data_loader: assert isinstance(graph, dgl.DGLGraph) if batch_size is not None: assert F.asnumpy(label).shape[0] == batch_size else: # If batch size is None, the label element will be a single scalar following # PyTorch's practice. assert F.asnumpy(label).ndim == 0 @unittest.skipIf(os.name == "nt", reason="Do not support windows yet") @pytest.mark.parametrize("num_workers", [0, 4]) def test_cluster_gcn(num_workers): dataset = dgl.data.CoraFullDataset() g = dataset[0] sampler = dgl.dataloading.ClusterGCNSampler(g, 100) dataloader = dgl.dataloading.DataLoader( g, torch.arange(100), sampler, batch_size=4, num_workers=num_workers ) assert len(dataloader) == 25 for i, sg in enumerate(dataloader): pass @pytest.mark.parametrize("num_workers", [0, 4]) def test_shadow(num_workers): g = dgl.data.CoraFullDataset()[0] sampler = dgl.dataloading.ShaDowKHopSampler([5, 10, 15]) dataloader = dgl.dataloading.DataLoader( g, torch.arange(g.num_nodes()), sampler, batch_size=5, shuffle=True, drop_last=False, num_workers=num_workers, ) for i, (input_nodes, output_nodes, subgraph) in enumerate(dataloader): assert torch.equal(input_nodes, subgraph.ndata[dgl.NID]) assert torch.equal(input_nodes[: output_nodes.shape[0]], output_nodes) assert torch.equal( subgraph.ndata["label"], g.ndata["label"][input_nodes] ) assert torch.equal(subgraph.ndata["feat"], g.ndata["feat"][input_nodes]) if i == 5: break @pytest.mark.parametrize("num_workers", [0, 4]) @pytest.mark.parametrize("mode", ["node", "edge", "walk"]) def test_saint(num_workers, mode): g = dgl.data.CoraFullDataset()[0] if mode == "node": budget = 100 elif mode == "edge": budget = 200 elif mode == "walk": budget = (3, 2) sampler = dgl.dataloading.SAINTSampler(mode, budget) dataloader = dgl.dataloading.DataLoader( g, torch.arange(100), sampler, num_workers=num_workers ) assert len(dataloader) == 100 for sg in dataloader: pass @parametrize_idtype @pytest.mark.parametrize( "mode", ["cpu", "uva_cuda_indices", "uva_cpu_indices", "pure_gpu"] ) @pytest.mark.parametrize("use_ddp", [False, True]) @pytest.mark.parametrize("use_mask", [False, True]) def test_neighbor_nonuniform(idtype, mode, use_ddp, use_mask): if mode != "cpu" and F.ctx() == F.cpu(): pytest.skip("UVA and GPU sampling require a GPU.") if mode != "cpu" and use_mask: pytest.skip("Masked sampling only works on CPU.") if use_ddp: if os.name == "nt": pytest.skip("PyTorch 1.13.0+ has problems in Windows DDP...") dist.init_process_group( "gloo" if F.ctx() == F.cpu() else "nccl", "tcp://127.0.0.1:12347", world_size=1, rank=0, ) g = dgl.graph(([1, 2, 3, 4, 5, 6, 7, 8], [0, 0, 0, 0, 1, 1, 1, 1])).astype( idtype ) g.edata["p"] = torch.FloatTensor([1, 1, 0, 0, 1, 1, 0, 0]) g.edata["mask"] = g.edata["p"] != 0 if mode in ("cpu", "uva_cpu_indices"): indices = F.copy_to(F.tensor([0, 1], idtype), F.cpu()) else: indices = F.copy_to(F.tensor([0, 1], idtype), F.cuda()) if mode == "pure_gpu": g = g.to(F.cuda()) use_uva = mode.startswith("uva") if use_mask: prob, mask = None, "mask" else: prob, mask = "p", None sampler = dgl.dataloading.MultiLayerNeighborSampler( [2], prob=prob, mask=mask ) for num_workers in [0, 1, 2] if mode == "cpu" else [0]: dataloader = dgl.dataloading.DataLoader( g, indices, sampler, batch_size=1, device=F.ctx(), num_workers=num_workers, use_uva=use_uva, use_ddp=use_ddp, ) for input_nodes, output_nodes, blocks in dataloader: seed = output_nodes.item() neighbors = set(input_nodes[1:].cpu().numpy()) if seed == 1: assert neighbors == {5, 6} elif seed == 0: assert neighbors == {1, 2} g = dgl.heterograph( { ("B", "BA", "A"): ( [1, 2, 3, 4, 5, 6, 7, 8], [0, 0, 0, 0, 1, 1, 1, 1], ), ("C", "CA", "A"): ( [1, 2, 3, 4, 5, 6, 7, 8], [0, 0, 0, 0, 1, 1, 1, 1], ), } ).astype(idtype) g.edges["BA"].data["p"] = torch.FloatTensor([1, 1, 0, 0, 1, 1, 0, 0]) g.edges["BA"].data["mask"] = g.edges["BA"].data["p"] != 0 g.edges["CA"].data["p"] = torch.FloatTensor([0, 0, 1, 1, 0, 0, 1, 1]) g.edges["CA"].data["mask"] = g.edges["CA"].data["p"] != 0 if mode == "pure_gpu": g = g.to(F.cuda()) for num_workers in [0, 1, 2] if mode == "cpu" else [0]: dataloader = dgl.dataloading.DataLoader( g, {"A": indices}, sampler, batch_size=1, device=F.ctx(), num_workers=num_workers, use_uva=use_uva, use_ddp=use_ddp, ) for input_nodes, output_nodes, blocks in dataloader: seed = output_nodes["A"].item() # Seed and neighbors are of different node types so slicing is not necessary here. neighbors = set(input_nodes["B"].cpu().numpy()) if seed == 1: assert neighbors == {5, 6} elif seed == 0: assert neighbors == {1, 2} neighbors = set(input_nodes["C"].cpu().numpy()) if seed == 1: assert neighbors == {7, 8} elif seed == 0: assert neighbors == {3, 4} if use_ddp: dist.destroy_process_group() def _check_dtype(data, dtype, attr_name): if isinstance(data, dict): for k, v in data.items(): assert getattr(v, attr_name) == dtype elif isinstance(data, list): for v in data: assert getattr(v, attr_name) == dtype else: assert getattr(data, attr_name) == dtype def _check_device(data): if isinstance(data, dict): for k, v in data.items(): assert v.device == F.ctx() elif isinstance(data, list): for v in data: assert v.device == F.ctx() else: assert data.device == F.ctx() @parametrize_idtype @pytest.mark.parametrize("sampler_name", ["full", "neighbor", "neighbor2"]) @pytest.mark.parametrize( "mode", ["cpu", "uva_cuda_indices", "uva_cpu_indices", "pure_gpu"] ) @pytest.mark.parametrize("use_ddp", [False, True]) def test_node_dataloader(idtype, sampler_name, mode, use_ddp): if mode != "cpu" and F.ctx() == F.cpu(): pytest.skip("UVA and GPU sampling require a GPU.") if use_ddp: if os.name == "nt": pytest.skip("PyTorch 1.13.0+ has problems in Windows DDP...") dist.init_process_group( "gloo" if F.ctx() == F.cpu() else "nccl", "tcp://127.0.0.1:12347", world_size=1, rank=0, ) g1 = dgl.graph(([0, 0, 0, 1, 1], [1, 2, 3, 3, 4])).astype(idtype) g1.ndata["feat"] = F.copy_to(F.randn((5, 8)), F.cpu()) g1.ndata["label"] = F.copy_to(F.randn((g1.num_nodes(),)), F.cpu()) if mode in ("cpu", "uva_cpu_indices"): indices = F.copy_to(F.arange(0, g1.num_nodes(), idtype), F.cpu()) else: indices = F.copy_to(F.arange(0, g1.num_nodes(), idtype), F.cuda()) if mode == "pure_gpu": g1 = g1.to(F.cuda()) use_uva = mode.startswith("uva") sampler = { "full": dgl.dataloading.MultiLayerFullNeighborSampler(2), "neighbor": dgl.dataloading.MultiLayerNeighborSampler([3, 3]), "neighbor2": dgl.dataloading.MultiLayerNeighborSampler([3, 3]), }[sampler_name] for num_workers in [0, 1, 2] if mode == "cpu" else [0]: dataloader = dgl.dataloading.DataLoader( g1, indices, sampler, device=F.ctx(), batch_size=g1.num_nodes(), num_workers=num_workers, use_uva=use_uva, use_ddp=use_ddp, ) for input_nodes, output_nodes, blocks in dataloader: _check_device(input_nodes) _check_device(output_nodes) _check_device(blocks) _check_dtype(input_nodes, idtype, "dtype") _check_dtype(output_nodes, idtype, "dtype") _check_dtype(blocks, idtype, "idtype") g2 = dgl.heterograph( { ("user", "follow", "user"): ( [0, 0, 0, 1, 1, 1, 2], [1, 2, 3, 0, 2, 3, 0], ), ("user", "followed-by", "user"): ( [1, 2, 3, 0, 2, 3, 0], [0, 0, 0, 1, 1, 1, 2], ), ("user", "play", "game"): ([0, 1, 1, 3, 5], [0, 1, 2, 0, 2]), ("game", "played-by", "user"): ([0, 1, 2, 0, 2], [0, 1, 1, 3, 5]), } ).astype(idtype) for ntype in g2.ntypes: g2.nodes[ntype].data["feat"] = F.copy_to( F.randn((g2.num_nodes(ntype), 8)), F.cpu() ) if mode in ("cpu", "uva_cpu_indices"): indices = {nty: F.copy_to(g2.nodes(nty), F.cpu()) for nty in g2.ntypes} else: indices = {nty: F.copy_to(g2.nodes(nty), F.cuda()) for nty in g2.ntypes} if mode == "pure_gpu": g2 = g2.to(F.cuda()) batch_size = max(g2.num_nodes(nty) for nty in g2.ntypes) sampler = { "full": dgl.dataloading.MultiLayerFullNeighborSampler(2), "neighbor": dgl.dataloading.MultiLayerNeighborSampler( [{etype: 3 for etype in g2.etypes}] * 2 ), "neighbor2": dgl.dataloading.MultiLayerNeighborSampler([3, 3]), }[sampler_name] for num_workers in [0, 1, 2] if mode == "cpu" else [0]: dataloader = dgl.dataloading.DataLoader( g2, indices, sampler, device=F.ctx(), batch_size=batch_size, num_workers=num_workers, use_uva=use_uva, use_ddp=use_ddp, ) assert isinstance(iter(dataloader), Iterator) for input_nodes, output_nodes, blocks in dataloader: _check_device(input_nodes) _check_device(output_nodes) _check_device(blocks) _check_dtype(input_nodes, idtype, "dtype") _check_dtype(output_nodes, idtype, "dtype") _check_dtype(blocks, idtype, "idtype") if use_ddp: dist.destroy_process_group() @parametrize_idtype @pytest.mark.parametrize("sampler_name", ["full", "neighbor"]) @pytest.mark.parametrize( "neg_sampler", [ dgl.dataloading.negative_sampler.Uniform(2), dgl.dataloading.negative_sampler.GlobalUniform(15, False, 3), dgl.dataloading.negative_sampler.GlobalUniform(15, True, 3), ], ) @pytest.mark.parametrize("mode", ["cpu", "uva", "pure_gpu"]) @pytest.mark.parametrize("use_ddp", [False, True]) def test_edge_dataloader(idtype, sampler_name, neg_sampler, mode, use_ddp): if mode != "cpu" and F.ctx() == F.cpu(): pytest.skip("UVA and GPU sampling require a GPU.") if mode == "uva" and isinstance( neg_sampler, dgl.dataloading.negative_sampler.GlobalUniform ): pytest.skip("GlobalUniform don't support UVA yet.") if use_ddp: if os.name == "nt": pytest.skip("PyTorch 1.13.0+ has problems in Windows DDP...") dist.init_process_group( "gloo" if F.ctx() == F.cpu() else "nccl", "tcp://127.0.0.1:12347", world_size=1, rank=0, ) g1 = dgl.graph(([0, 0, 0, 1, 1], [1, 2, 3, 3, 4])).astype(idtype) g1.ndata["feat"] = F.copy_to(F.randn((5, 8)), F.cpu()) if mode == "pure_gpu": g1 = g1.to(F.cuda()) sampler = { "full": dgl.dataloading.MultiLayerFullNeighborSampler(2), "neighbor": dgl.dataloading.MultiLayerNeighborSampler([3, 3]), }[sampler_name] # no negative sampler edge_sampler = dgl.dataloading.as_edge_prediction_sampler(sampler) dataloader = dgl.dataloading.DataLoader( g1, g1.edges(form="eid"), edge_sampler, device=F.ctx(), batch_size=g1.num_edges(), use_uva=(mode == "uva"), use_ddp=use_ddp, ) for input_nodes, pos_pair_graph, blocks in dataloader: _check_device(input_nodes) _check_device(pos_pair_graph) _check_device(blocks) # negative sampler edge_sampler = dgl.dataloading.as_edge_prediction_sampler( sampler, negative_sampler=neg_sampler ) dataloader = dgl.dataloading.DataLoader( g1, g1.edges(form="eid"), edge_sampler, device=F.ctx(), batch_size=g1.num_edges(), use_uva=(mode == "uva"), use_ddp=use_ddp, ) for input_nodes, pos_pair_graph, neg_pair_graph, blocks in dataloader: _check_device(input_nodes) _check_device(pos_pair_graph) _check_device(neg_pair_graph) _check_device(blocks) g2 = dgl.heterograph( { ("user", "follow", "user"): ( [0, 0, 0, 1, 1, 1, 2], [1, 2, 3, 0, 2, 3, 0], ), ("user", "followed-by", "user"): ( [1, 2, 3, 0, 2, 3, 0], [0, 0, 0, 1, 1, 1, 2], ), ("user", "play", "game"): ([0, 1, 1, 3, 5], [0, 1, 2, 0, 2]), ("game", "played-by", "user"): ([0, 1, 2, 0, 2], [0, 1, 1, 3, 5]), } ).astype(idtype) for ntype in g2.ntypes: g2.nodes[ntype].data["feat"] = F.copy_to( F.randn((g2.num_nodes(ntype), 8)), F.cpu() ) if mode == "pure_gpu": g2 = g2.to(F.cuda()) batch_size = max(g2.num_edges(ety) for ety in g2.canonical_etypes) sampler = { "full": dgl.dataloading.MultiLayerFullNeighborSampler(2), "neighbor": dgl.dataloading.MultiLayerNeighborSampler( [{etype: 3 for etype in g2.etypes}] * 2 ), }[sampler_name] # no negative sampler edge_sampler = dgl.dataloading.as_edge_prediction_sampler(sampler) dataloader = dgl.dataloading.DataLoader( g2, {ety: g2.edges(form="eid", etype=ety) for ety in g2.canonical_etypes}, edge_sampler, device=F.ctx(), batch_size=batch_size, use_uva=(mode == "uva"), use_ddp=use_ddp, ) for input_nodes, pos_pair_graph, blocks in dataloader: _check_device(input_nodes) _check_device(pos_pair_graph) _check_device(blocks) # negative sampler edge_sampler = dgl.dataloading.as_edge_prediction_sampler( sampler, negative_sampler=neg_sampler ) dataloader = dgl.dataloading.DataLoader( g2, {ety: g2.edges(form="eid", etype=ety) for ety in g2.canonical_etypes}, edge_sampler, device=F.ctx(), batch_size=batch_size, use_uva=(mode == "uva"), use_ddp=use_ddp, ) assert isinstance(iter(dataloader), Iterator) for input_nodes, pos_pair_graph, neg_pair_graph, blocks in dataloader: _check_device(input_nodes) _check_device(pos_pair_graph) _check_device(neg_pair_graph) _check_device(blocks) if use_ddp: dist.destroy_process_group() def _create_homogeneous(): s = torch.randint(0, 200, (1000,), device=F.ctx()) d = torch.randint(0, 200, (1000,), device=F.ctx()) src = torch.cat([s, d]) dst = torch.cat([d, s]) g = dgl.graph((s, d), num_nodes=200) reverse_eids = torch.cat( [torch.arange(1000, 2000), torch.arange(0, 1000)] ).to(F.ctx()) always_exclude = torch.randint(0, 1000, (50,), device=F.ctx()) seed_edges = torch.arange(0, 1000, device=F.ctx()) return g, reverse_eids, always_exclude, seed_edges def _create_heterogeneous(): edges = {} for utype, etype, vtype in [("A", "AA", "A"), ("A", "AB", "B")]: s = torch.randint(0, 200, (1000,), device=F.ctx()) d = torch.randint(0, 200, (1000,), device=F.ctx()) edges[utype, etype, vtype] = (s, d) edges[vtype, "rev-" + etype, utype] = (d, s) g = dgl.heterograph(edges, num_nodes_dict={"A": 200, "B": 200}) reverse_etypes = { "AA": "rev-AA", "AB": "rev-AB", "rev-AA": "AA", "rev-AB": "AB", } always_exclude = { "AA": torch.randint(0, 1000, (50,), device=F.ctx()), "AB": torch.randint(0, 1000, (50,), device=F.ctx()), } seed_edges = { "AA": torch.arange(0, 1000, device=F.ctx()), "AB": torch.arange(0, 1000, device=F.ctx()), } return g, reverse_etypes, always_exclude, seed_edges def _find_edges_to_exclude(g, exclude, always_exclude, pair_eids): if exclude == None: return always_exclude elif exclude == "self": return ( torch.cat([pair_eids, always_exclude]) if always_exclude is not None else pair_eids ) elif exclude == "reverse_id": pair_eids = torch.cat([pair_eids, pair_eids + 1000]) return ( torch.cat([pair_eids, always_exclude]) if always_exclude is not None else pair_eids ) elif exclude == "reverse_types": pair_eids = {g.to_canonical_etype(k): v for k, v in pair_eids.items()} if ("A", "AA", "A") in pair_eids: pair_eids[("A", "rev-AA", "A")] = pair_eids[("A", "AA", "A")] if ("A", "AB", "B") in pair_eids: pair_eids[("B", "rev-AB", "A")] = pair_eids[("A", "AB", "B")] if always_exclude is not None: always_exclude = { g.to_canonical_etype(k): v for k, v in always_exclude.items() } for k in always_exclude.keys(): if k in pair_eids: pair_eids[k] = torch.cat([pair_eids[k], always_exclude[k]]) else: pair_eids[k] = always_exclude[k] return pair_eids @pytest.mark.parametrize("always_exclude_flag", [False, True]) @pytest.mark.parametrize( "exclude", [None, "self", "reverse_id", "reverse_types"] ) @pytest.mark.parametrize( "sampler", [ dgl.dataloading.MultiLayerFullNeighborSampler(1), dgl.dataloading.ShaDowKHopSampler([5]), ], ) @pytest.mark.parametrize("batch_size", [1, 50]) def test_edge_dataloader_excludes( exclude, always_exclude_flag, batch_size, sampler ): if exclude == "reverse_types": g, reverse_etypes, always_exclude, seed_edges = _create_heterogeneous() else: g, reverse_eids, always_exclude, seed_edges = _create_homogeneous() g = g.to(F.ctx()) if not always_exclude_flag: always_exclude = None kwargs = {} kwargs["exclude"] = ( partial(_find_edges_to_exclude, g, exclude, always_exclude) if always_exclude_flag else exclude ) kwargs["reverse_eids"] = reverse_eids if exclude == "reverse_id" else None kwargs["reverse_etypes"] = ( reverse_etypes if exclude == "reverse_types" else None ) sampler = dgl.dataloading.as_edge_prediction_sampler(sampler, **kwargs) dataloader = dgl.dataloading.DataLoader( g, seed_edges, sampler, batch_size=batch_size, device=F.ctx(), use_prefetch_thread=False, ) for i, (input_nodes, pair_graph, blocks) in enumerate(dataloader): if isinstance(blocks, list): subg = blocks[0] else: subg = blocks pair_eids = pair_graph.edata[dgl.EID] block_eids = subg.edata[dgl.EID] edges_to_exclude = _find_edges_to_exclude( g, exclude, always_exclude, pair_eids ) if edges_to_exclude is None: continue edges_to_exclude = dgl.utils.recursive_apply( edges_to_exclude, lambda x: x.cpu().numpy() ) block_eids = dgl.utils.recursive_apply( block_eids, lambda x: x.cpu().numpy() ) if isinstance(edges_to_exclude, Mapping): for k in edges_to_exclude.keys(): assert not np.isin(edges_to_exclude[k], block_eids[k]).any() else: assert not np.isin(edges_to_exclude, block_eids).any() if i == 10: break def dummy_worker_init_fn(worker_id): pass def test_dataloader_worker_init_fn(): dataset = dgl.data.CoraFullDataset() g = dataset[0] sampler = dgl.dataloading.MultiLayerNeighborSampler([2]) dataloader = dgl.dataloading.DataLoader( g, torch.arange(100), sampler, batch_size=4, num_workers=4, worker_init_fn=dummy_worker_init_fn, ) for _ in dataloader: pass if __name__ == "__main__": # test_node_dataloader(F.int32, 'neighbor', None) test_edge_dataloader_excludes( "reverse_types", False, 1, dgl.dataloading.ShaDowKHopSampler([5]) )