test_dataloader.py 14 KB
Newer Older
1
import os
2
import numpy as np
3
import dgl
4
import dgl.ops as OPS
5
6
import backend as F
import unittest
7
import torch
8
from functools import partial
9
10
from torch.utils.data import DataLoader
from collections import defaultdict
11
from collections.abc import Iterator, Mapping
12
from itertools import product
13
import pytest
14
15


16
17
18
19
20
def test_graph_dataloader():
    batch_size = 16
    num_batches = 2
    minigc_dataset = dgl.data.MiniGCDataset(batch_size * num_batches, 10, 20)
    data_loader = dgl.dataloading.GraphDataLoader(minigc_dataset, batch_size=batch_size, shuffle=True)
21
    assert isinstance(iter(data_loader), Iterator)
22
23
24
    for graph, label in data_loader:
        assert isinstance(graph, dgl.DGLGraph)
        assert F.asnumpy(label).shape[0] == batch_size
25

26
27
28
29
30
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@pytest.mark.parametrize('num_workers', [0, 4])
def test_cluster_gcn(num_workers):
    dataset = dgl.data.CoraFullDataset()
    g = dataset[0]
31
32
33
34
35
36
    sampler = dgl.dataloading.ClusterGCNSampler(g, 100)
    dataloader = dgl.dataloading.DataLoader(
        g, torch.arange(100), sampler, batch_size=4, num_workers=num_workers)
    assert len(dataloader) == 25
    for i, sg in enumerate(dataloader):
        pass
37
38
39
40
41
42
43
44

@pytest.mark.parametrize('num_workers', [0, 4])
def test_shadow(num_workers):
    g = dgl.data.CoraFullDataset()[0]
    sampler = dgl.dataloading.ShaDowKHopSampler([5, 10, 15])
    dataloader = dgl.dataloading.NodeDataLoader(
        g, torch.arange(g.num_nodes()), sampler,
        batch_size=5, shuffle=True, drop_last=False, num_workers=num_workers)
45
    for i, (input_nodes, output_nodes, subgraph) in enumerate(dataloader):
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
        assert torch.equal(input_nodes, subgraph.ndata[dgl.NID])
        assert torch.equal(input_nodes[:output_nodes.shape[0]], output_nodes)
        assert torch.equal(subgraph.ndata['label'], g.ndata['label'][input_nodes])
        assert torch.equal(subgraph.ndata['feat'], g.ndata['feat'][input_nodes])
        if i == 5:
            break


@pytest.mark.parametrize('num_workers', [0, 4])
def test_neighbor_nonuniform(num_workers):
    g = dgl.graph(([1, 2, 3, 4, 5, 6, 7, 8], [0, 0, 0, 0, 1, 1, 1, 1]))
    g.edata['p'] = torch.FloatTensor([1, 1, 0, 0, 1, 1, 0, 0])
    sampler = dgl.dataloading.MultiLayerNeighborSampler([2], prob='p')
    dataloader = dgl.dataloading.NodeDataLoader(g, [0, 1], sampler, batch_size=1, device=F.ctx())
    for input_nodes, output_nodes, blocks in dataloader:
        seed = output_nodes.item()
        neighbors = set(input_nodes[1:].cpu().numpy())
        if seed == 1:
            assert neighbors == {5, 6}
        elif seed == 0:
            assert neighbors == {1, 2}

    g = dgl.heterograph({
        ('B', 'BA', 'A'): ([1, 2, 3, 4, 5, 6, 7, 8], [0, 0, 0, 0, 1, 1, 1, 1]),
        ('C', 'CA', 'A'): ([1, 2, 3, 4, 5, 6, 7, 8], [0, 0, 0, 0, 1, 1, 1, 1]),
        })
    g.edges['BA'].data['p'] = torch.FloatTensor([1, 1, 0, 0, 1, 1, 0, 0])
    g.edges['CA'].data['p'] = torch.FloatTensor([0, 0, 1, 1, 0, 0, 1, 1])
    sampler = dgl.dataloading.MultiLayerNeighborSampler([2], prob='p')
    dataloader = dgl.dataloading.NodeDataLoader(
        g, {'A': [0, 1]}, sampler, batch_size=1, device=F.ctx())
    for input_nodes, output_nodes, blocks in dataloader:
        seed = output_nodes['A'].item()
        # Seed and neighbors are of different node types so slicing is not necessary here.
        neighbors = set(input_nodes['B'].cpu().numpy())
        if seed == 1:
            assert neighbors == {5, 6}
        elif seed == 0:
            assert neighbors == {1, 2}

        neighbors = set(input_nodes['C'].cpu().numpy())
        if seed == 1:
            assert neighbors == {7, 8}
        elif seed == 0:
            assert neighbors == {3, 4}


93
94
95
96
97
98
99
100
101
102
def _check_device(data):
    if isinstance(data, dict):
        for k, v in data.items():
            assert v.device == F.ctx()
    elif isinstance(data, list):
        for v in data:
            assert v.device == F.ctx()
    else:
        assert data.device == F.ctx()

103
@pytest.mark.parametrize('sampler_name', ['full', 'neighbor', 'neighbor2'])
104
105
# TODO(BarclayII): Re-enable pin_graph = True after PyTorch is upgraded to 1.9.0 on CI
@pytest.mark.parametrize('pin_graph', [False])
106
def test_node_dataloader(sampler_name, pin_graph):
Xin Yao's avatar
Xin Yao committed
107
    g1 = dgl.graph(([0, 0, 0, 1, 1], [1, 2, 3, 3, 4]))
108
109
110
    if F.ctx() != F.cpu() and pin_graph:
        g1.create_formats_()
        g1.pin_memory_()
Xin Yao's avatar
Xin Yao committed
111
    g1.ndata['feat'] = F.copy_to(F.randn((5, 8)), F.cpu())
112
113
    g1.ndata['label'] = F.copy_to(F.randn((g1.num_nodes(),)), F.cpu())

114
115
116
117
118
119
120
121
122
123
124
125
126
    for num_workers in [0, 1, 2]:
        sampler = {
            'full': dgl.dataloading.MultiLayerFullNeighborSampler(2),
            'neighbor': dgl.dataloading.MultiLayerNeighborSampler([3, 3]),
            'neighbor2': dgl.dataloading.MultiLayerNeighborSampler([3, 3])}[sampler_name]
        dataloader = dgl.dataloading.NodeDataLoader(
            g1, g1.nodes(), sampler, device=F.ctx(),
            batch_size=g1.num_nodes(),
            num_workers=num_workers)
        for input_nodes, output_nodes, blocks in dataloader:
            _check_device(input_nodes)
            _check_device(output_nodes)
            _check_device(blocks)
127
128
129
130
131
132

    g2 = dgl.heterograph({
         ('user', 'follow', 'user'): ([0, 0, 0, 1, 1, 1, 2], [1, 2, 3, 0, 2, 3, 0]),
         ('user', 'followed-by', 'user'): ([1, 2, 3, 0, 2, 3, 0], [0, 0, 0, 1, 1, 1, 2]),
         ('user', 'play', 'game'): ([0, 1, 1, 3, 5], [0, 1, 2, 0, 2]),
         ('game', 'played-by', 'user'): ([0, 1, 2, 0, 2], [0, 1, 1, 3, 5])
Xin Yao's avatar
Xin Yao committed
133
    })
134
    for ntype in g2.ntypes:
Xin Yao's avatar
Xin Yao committed
135
        g2.nodes[ntype].data['feat'] = F.copy_to(F.randn((g2.num_nodes(ntype), 8)), F.cpu())
136
    batch_size = max(g2.num_nodes(nty) for nty in g2.ntypes)
137
138
139
    sampler = {
        'full': dgl.dataloading.MultiLayerFullNeighborSampler(2),
        'neighbor': dgl.dataloading.MultiLayerNeighborSampler([{etype: 3 for etype in g2.etypes}] * 2),
140
        'neighbor2': dgl.dataloading.MultiLayerNeighborSampler([3, 3])}[sampler_name]
141

142
143
144
145
146
147
148
149
    dataloader = dgl.dataloading.NodeDataLoader(
        g2, {nty: g2.nodes(nty) for nty in g2.ntypes},
        sampler, device=F.ctx(), batch_size=batch_size)
    assert isinstance(iter(dataloader), Iterator)
    for input_nodes, output_nodes, blocks in dataloader:
        _check_device(input_nodes)
        _check_device(output_nodes)
        _check_device(blocks)
150

151
152
    if g1.is_pinned():
        g1.unpin_memory_()
153

154
@pytest.mark.parametrize('sampler_name', ['full', 'neighbor'])
155
156
157
158
@pytest.mark.parametrize('neg_sampler', [
    dgl.dataloading.negative_sampler.Uniform(2),
    dgl.dataloading.negative_sampler.GlobalUniform(15, False, 3),
    dgl.dataloading.negative_sampler.GlobalUniform(15, True, 3)])
159
160
# TODO(BarclayII): Re-enable pin_graph = True after PyTorch is upgraded to 1.9.0 on CI
@pytest.mark.parametrize('pin_graph', [False])
161
def test_edge_dataloader(sampler_name, neg_sampler, pin_graph):
Xin Yao's avatar
Xin Yao committed
162
    g1 = dgl.graph(([0, 0, 0, 1, 1], [1, 2, 3, 3, 4]))
163
164
165
    if F.ctx() != F.cpu() and pin_graph:
        g1.create_formats_()
        g1.pin_memory_()
Xin Yao's avatar
Xin Yao committed
166
    g1.ndata['feat'] = F.copy_to(F.randn((5, 8)), F.cpu())
167

168
169
    sampler = {
        'full': dgl.dataloading.MultiLayerFullNeighborSampler(2),
170
        'neighbor': dgl.dataloading.MultiLayerNeighborSampler([3, 3])}[sampler_name]
171

Quan (Andy) Gan's avatar
Quan (Andy) Gan committed
172
    # no negative sampler
173
174
175
176
177
178
179
    dataloader = dgl.dataloading.EdgeDataLoader(
        g1, g1.edges(form='eid'), sampler, device=F.ctx(), batch_size=g1.num_edges())
    for input_nodes, pos_pair_graph, blocks in dataloader:
        _check_device(input_nodes)
        _check_device(pos_pair_graph)
        _check_device(blocks)

Quan (Andy) Gan's avatar
Quan (Andy) Gan committed
180
    # negative sampler
181
182
183
184
185
186
187
188
189
190
191
192
193
194
    dataloader = dgl.dataloading.EdgeDataLoader(
        g1, g1.edges(form='eid'), sampler, device=F.ctx(),
        negative_sampler=neg_sampler, batch_size=g1.num_edges())
    for input_nodes, pos_pair_graph, neg_pair_graph, blocks in dataloader:
        _check_device(input_nodes)
        _check_device(pos_pair_graph)
        _check_device(neg_pair_graph)
        _check_device(blocks)

    g2 = dgl.heterograph({
         ('user', 'follow', 'user'): ([0, 0, 0, 1, 1, 1, 2], [1, 2, 3, 0, 2, 3, 0]),
         ('user', 'followed-by', 'user'): ([1, 2, 3, 0, 2, 3, 0], [0, 0, 0, 1, 1, 1, 2]),
         ('user', 'play', 'game'): ([0, 1, 1, 3, 5], [0, 1, 2, 0, 2]),
         ('game', 'played-by', 'user'): ([0, 1, 2, 0, 2], [0, 1, 1, 3, 5])
Xin Yao's avatar
Xin Yao committed
195
    })
196
    for ntype in g2.ntypes:
Xin Yao's avatar
Xin Yao committed
197
        g2.nodes[ntype].data['feat'] = F.copy_to(F.randn((g2.num_nodes(ntype), 8)), F.cpu())
198
    batch_size = max(g2.num_edges(ety) for ety in g2.canonical_etypes)
199
200
201
    sampler = {
        'full': dgl.dataloading.MultiLayerFullNeighborSampler(2),
        'neighbor': dgl.dataloading.MultiLayerNeighborSampler([{etype: 3 for etype in g2.etypes}] * 2),
202
        }[sampler_name]
203

Quan (Andy) Gan's avatar
Quan (Andy) Gan committed
204
    # no negative sampler
205
206
    dataloader = dgl.dataloading.EdgeDataLoader(
        g2, {ety: g2.edges(form='eid', etype=ety) for ety in g2.canonical_etypes},
Quan (Andy) Gan's avatar
Quan (Andy) Gan committed
207
208
        sampler, device=F.ctx(), batch_size=batch_size)
    for input_nodes, pos_pair_graph, blocks in dataloader:
209
210
211
212
        _check_device(input_nodes)
        _check_device(pos_pair_graph)
        _check_device(blocks)

Quan (Andy) Gan's avatar
Quan (Andy) Gan committed
213
    # negative sampler
214
215
216
    dataloader = dgl.dataloading.EdgeDataLoader(
        g2, {ety: g2.edges(form='eid', etype=ety) for ety in g2.canonical_etypes},
        sampler, device=F.ctx(), negative_sampler=neg_sampler,
Quan (Andy) Gan's avatar
Quan (Andy) Gan committed
217
        batch_size=batch_size)
218

219
    assert isinstance(iter(dataloader), Iterator)
Quan (Andy) Gan's avatar
Quan (Andy) Gan committed
220
    for input_nodes, pos_pair_graph, neg_pair_graph, blocks in dataloader:
221
222
223
224
225
        _check_device(input_nodes)
        _check_device(pos_pair_graph)
        _check_device(neg_pair_graph)
        _check_device(blocks)

226
227
228
    if g1.is_pinned():
        g1.unpin_memory_()

229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def _create_homogeneous():
    s = torch.randint(0, 200, (1000,), device=F.ctx())
    d = torch.randint(0, 200, (1000,), device=F.ctx())
    src = torch.cat([s, d])
    dst = torch.cat([d, s])
    g = dgl.graph((s, d), num_nodes=200)
    reverse_eids = torch.cat([torch.arange(1000, 2000), torch.arange(0, 1000)]).to(F.ctx())
    always_exclude = torch.randint(0, 1000, (50,), device=F.ctx())
    seed_edges = torch.arange(0, 1000, device=F.ctx())
    return g, reverse_eids, always_exclude, seed_edges

def _create_heterogeneous():
    edges = {}
    for utype, etype, vtype in [('A', 'AA', 'A'), ('A', 'AB', 'B')]:
        s = torch.randint(0, 200, (1000,), device=F.ctx())
        d = torch.randint(0, 200, (1000,), device=F.ctx())
        edges[utype, etype, vtype] = (s, d)
        edges[vtype, 'rev-' + etype, utype] = (d, s)
    g = dgl.heterograph(edges, num_nodes_dict={'A': 200, 'B': 200})
    reverse_etypes = {'AA': 'rev-AA', 'AB': 'rev-AB', 'rev-AA': 'AA', 'rev-AB': 'AB'}
    always_exclude = {
        'AA': torch.randint(0, 1000, (50,), device=F.ctx()),
        'AB': torch.randint(0, 1000, (50,), device=F.ctx())}
    seed_edges = {
        'AA': torch.arange(0, 1000, device=F.ctx()),
        'AB': torch.arange(0, 1000, device=F.ctx())}
    return g, reverse_etypes, always_exclude, seed_edges

def _find_edges_to_exclude(g, exclude, always_exclude, pair_eids):
    if exclude == None:
        return always_exclude
    elif exclude == 'self':
        return torch.cat([pair_eids, always_exclude]) if always_exclude is not None else pair_eids
    elif exclude == 'reverse_id':
        pair_eids = torch.cat([pair_eids, pair_eids + 1000])
        return torch.cat([pair_eids, always_exclude]) if always_exclude is not None else pair_eids
    elif exclude == 'reverse_types':
        pair_eids = {g.to_canonical_etype(k): v for k, v in pair_eids.items()}
        if ('A', 'AA', 'A') in pair_eids:
            pair_eids[('A', 'rev-AA', 'A')] = pair_eids[('A', 'AA', 'A')]
        if ('A', 'AB', 'B') in pair_eids:
            pair_eids[('B', 'rev-AB', 'A')] = pair_eids[('A', 'AB', 'B')]
        if always_exclude is not None:
            always_exclude = {g.to_canonical_etype(k): v for k, v in always_exclude.items()}
            for k in always_exclude.keys():
                if k in pair_eids:
                    pair_eids[k] = torch.cat([pair_eids[k], always_exclude[k]])
                else:
                    pair_eids[k] = always_exclude[k]
        return pair_eids

@pytest.mark.parametrize('always_exclude_flag', [False, True])
@pytest.mark.parametrize('exclude', [None, 'self', 'reverse_id', 'reverse_types'])
def test_edge_dataloader_excludes(exclude, always_exclude_flag):
    if exclude == 'reverse_types':
        g, reverse_etypes, always_exclude, seed_edges = _create_heterogeneous()
    else:
        g, reverse_eids, always_exclude, seed_edges = _create_homogeneous()
    g = g.to(F.ctx())
    sampler = dgl.dataloading.MultiLayerFullNeighborSampler(1)
    if not always_exclude_flag:
        always_exclude = None

    kwargs = {}
    kwargs['exclude'] = (
        partial(_find_edges_to_exclude, g, exclude, always_exclude) if always_exclude_flag
        else exclude)
    kwargs['reverse_eids'] = reverse_eids if exclude == 'reverse_id' else None
    kwargs['reverse_etypes'] = reverse_etypes if exclude == 'reverse_types' else None

    dataloader = dgl.dataloading.EdgeDataLoader(
        g, seed_edges, sampler, batch_size=50, device=F.ctx(), **kwargs)
    for input_nodes, pair_graph, blocks in dataloader:
        block = blocks[0]
        pair_eids = pair_graph.edata[dgl.EID]
        block_eids = block.edata[dgl.EID]

        edges_to_exclude = _find_edges_to_exclude(g, exclude, always_exclude, pair_eids)
        if edges_to_exclude is None:
            continue
        edges_to_exclude = dgl.utils.recursive_apply(edges_to_exclude, lambda x: x.cpu().numpy())
        block_eids = dgl.utils.recursive_apply(block_eids, lambda x: x.cpu().numpy())

        if isinstance(edges_to_exclude, Mapping):
            for k in edges_to_exclude.keys():
                assert not np.isin(edges_to_exclude[k], block_eids[k]).any()
        else:
            assert not np.isin(edges_to_exclude, block_eids).any()

318
if __name__ == '__main__':
319
    test_graph_dataloader()
320
321
    test_cluster_gcn(0)
    test_neighbor_nonuniform(0)
322
323
324
    for exclude in [None, 'self', 'reverse_id', 'reverse_types']:
        test_edge_dataloader_excludes(exclude, False)
        test_edge_dataloader_excludes(exclude, True)