test_nodeflow.py 23.8 KB
Newer Older
Da Zheng's avatar
Da Zheng committed
1
2
import backend as F
import numpy as np
3
from numpy.testing import assert_array_equal, assert_allclose
Da Zheng's avatar
Da Zheng committed
4
import scipy as sp
5
import operator
Da Zheng's avatar
Da Zheng committed
6
import dgl
7
from dgl.contrib.sampling.sampler import create_full_nodeflow, NeighborSampler
Da Zheng's avatar
Da Zheng committed
8
9
10
from dgl import utils
import dgl.function as fn
from functools import partial
11
import itertools
VoVAllen's avatar
VoVAllen committed
12
13
import unittest

14

Da Zheng's avatar
Da Zheng committed
15
def generate_rand_graph(n, connect_more=False, complete=False, add_self_loop=False):
16
    if complete:
VoVAllen's avatar
VoVAllen committed
17
18
        cord = [(i, j)
                for i, j in itertools.product(range(n), range(n)) if i != j]
19
20
21
22
23
        row = [t[0] for t in cord]
        col = [t[1] for t in cord]
        data = np.ones((len(row),))
        arr = sp.sparse.coo_matrix((data, (row, col)), shape=(n, n))
    else:
VoVAllen's avatar
VoVAllen committed
24
25
        arr = (sp.sparse.random(n, n, density=0.1,
                                format='coo') != 0).astype(np.int64)
26
27
28
        # having one node to connect to all other nodes.
        if connect_more:
            arr[0] = 1
VoVAllen's avatar
VoVAllen committed
29
            arr[:, 0] = 1
Da Zheng's avatar
Da Zheng committed
30
31
32
33
34
35
36
    if add_self_loop:
        g = dgl.DGLGraph(arr, readonly=False)
        nodes = np.arange(g.number_of_nodes())
        g.add_edges(nodes, nodes)
        g.readonly()
    else:
        g = dgl.DGLGraph(arr, readonly=True)
Da Zheng's avatar
Da Zheng committed
37
38
39
40
41
    g.ndata['h1'] = F.randn((g.number_of_nodes(), 10))
    g.edata['h2'] = F.randn((g.number_of_edges(), 3))
    return g


42
43
44
45
46
47
48
def test_self_loop():
    n = 100
    num_hops = 2
    g = generate_rand_graph(n, complete=True)
    nf = create_mini_batch(g, num_hops, add_self_loop=True)
    for i in range(1, nf.num_layers):
        in_deg = nf.layer_in_degree(i)
49
        deg = F.copy_to(F.ones(in_deg.shape, dtype=F.int64), F.cpu()) * n
50
        assert_array_equal(F.asnumpy(in_deg), F.asnumpy(deg))
51

Da Zheng's avatar
Da Zheng committed
52
53
54
55
56
57
58
59
60
61
62
63
    g = generate_rand_graph(n, complete=True, add_self_loop=True)
    g = dgl.to_simple_graph(g)
    nf = create_mini_batch(g, num_hops, add_self_loop=True)
    for i in range(nf.num_blocks):
        parent_eid = F.asnumpy(nf.block_parent_eid(i))
        parent_nid = F.asnumpy(nf.layer_parent_nid(i + 1))
        # The loop eid in the parent graph must exist in the block parent eid.
        parent_loop_eid = F.asnumpy(g.edge_ids(parent_nid, parent_nid))
        assert len(parent_loop_eid) == len(parent_nid)
        for eid in parent_loop_eid:
            assert eid in parent_eid

VoVAllen's avatar
VoVAllen committed
64

65
def create_mini_batch(g, num_hops, add_self_loop=False):
66
    seed_ids = np.array([1, 2, 0, 3])
67
    sampler = NeighborSampler(g, batch_size=4, expand_factor=g.number_of_nodes(),
VoVAllen's avatar
VoVAllen committed
68
                              num_hops=num_hops, seed_nodes=seed_ids, add_self_loop=add_self_loop)
69
70
    nfs = list(sampler)
    assert len(nfs) == 1
71
    assert_array_equal(F.asnumpy(nfs[0].layer_parent_nid(-1)), seed_ids)
72
    return nfs[0]
Da Zheng's avatar
Da Zheng committed
73

VoVAllen's avatar
VoVAllen committed
74

Da Zheng's avatar
Da Zheng committed
75
76
77
78
79
80
81
82
83
def check_basic(g, nf):
    num_nodes = 0
    for i in range(nf.num_layers):
        num_nodes += nf.layer_size(i)
    assert nf.number_of_nodes() == num_nodes
    num_edges = 0
    for i in range(nf.num_blocks):
        num_edges += nf.block_size(i)
    assert nf.number_of_edges() == num_edges
84
85
86
87
88
89
    assert len(nf) == num_nodes
    assert nf.is_readonly

    assert np.all(F.asnumpy(nf.has_nodes(list(range(num_nodes)))))
    for i in range(num_nodes):
        assert nf.has_node(i)
VoVAllen's avatar
VoVAllen committed
90
91
    assert np.all(F.asnumpy(nf.has_nodes(
        list(range(num_nodes, 2 * num_nodes)))) == 0)
92
93
94
95
96
97
    for i in range(num_nodes, 2 * num_nodes):
        assert not nf.has_node(i)

    for block_id in range(nf.num_blocks):
        u, v, eid = nf.block_edges(block_id)
        assert np.all(F.asnumpy(nf.has_edges_between(u, v)))
Da Zheng's avatar
Da Zheng committed
98
99

    deg = nf.layer_in_degree(0)
100
    assert_array_equal(F.asnumpy(deg), np.zeros((nf.layer_size(0)), np.int64))
Da Zheng's avatar
Da Zheng committed
101
    deg = nf.layer_out_degree(-1)
102
103
104
    assert_array_equal(F.asnumpy(deg), np.zeros((nf.layer_size(-1)), np.int64))

    nf.copy_from_parent()
Da Zheng's avatar
Da Zheng committed
105
106
107
108
109
    for i in range(1, nf.num_layers):
        in_deg = nf.layer_in_degree(i)
        out_deg = nf.layer_out_degree(i - 1)
        assert F.asnumpy(F.sum(in_deg, 0) == F.sum(out_deg, 0))

110
111
112
113
114
115
116
117
118
119
120
121
122
123
        nids = nf.layer_nid(i)
        parent_nids = nf.map_to_parent_nid(nids)
        nids1 = nf.map_from_parent_nid(i, parent_nids)
        assert_array_equal(F.asnumpy(nids), F.asnumpy(nids1))

        data = nf.layers[i].data['h1']
        data1 = g.nodes[nf.layer_parent_nid(i)].data['h1']
        assert_array_equal(F.asnumpy(data), F.asnumpy(data1))

    for i in range(nf.num_blocks):
        data = nf.blocks[i].data['h2']
        data1 = g.edges[nf.block_parent_eid(i)].data['h2']
        assert_array_equal(F.asnumpy(data), F.asnumpy(data1))

124
125
126
127
128
129
    # negative layer Ids.
    for i in range(-1, -nf.num_layers, -1):
        in_deg = nf.layer_in_degree(i)
        out_deg = nf.layer_out_degree(i - 1)
        assert F.asnumpy(F.sum(in_deg, 0) == F.sum(out_deg, 0))

Da Zheng's avatar
Da Zheng committed
130
131
132
133

def test_basic():
    num_layers = 2
    g = generate_rand_graph(100, connect_more=True)
134
    nf = create_full_nodeflow(g, num_layers)
Da Zheng's avatar
Da Zheng committed
135
136
137
138
139
140
141
    assert nf.number_of_nodes() == g.number_of_nodes() * (num_layers + 1)
    assert nf.number_of_edges() == g.number_of_edges() * num_layers
    assert nf.num_layers == num_layers + 1
    assert nf.layer_size(0) == g.number_of_nodes()
    assert nf.layer_size(1) == g.number_of_nodes()
    check_basic(g, nf)

142
    parent_nids = F.copy_to(F.arange(0, g.number_of_nodes()), F.cpu())
143
144
    nids = nf.map_from_parent_nid(0, parent_nids, remap_local=True)
    assert_array_equal(F.asnumpy(nids), F.asnumpy(parent_nids))
Da Zheng's avatar
Da Zheng committed
145

146
147
    # should also work for negative layer ids
    for l in range(-1, -num_layers, -1):
148
        nids1 = nf.map_from_parent_nid(l, parent_nids, remap_local=True)
VoVAllen's avatar
VoVAllen committed
149
150
        nids2 = nf.map_from_parent_nid(
            l + num_layers, parent_nids, remap_local=True)
151
        assert_array_equal(F.asnumpy(nids1), F.asnumpy(nids2))
152

Da Zheng's avatar
Da Zheng committed
153
154
155
156
157
    g = generate_rand_graph(100)
    nf = create_mini_batch(g, num_layers)
    assert nf.num_layers == num_layers + 1
    check_basic(g, nf)

Da Zheng's avatar
Da Zheng committed
158
159
160
161
162
    g = generate_rand_graph(100, add_self_loop=True)
    nf = create_mini_batch(g, num_layers, add_self_loop=True)
    assert nf.num_layers == num_layers + 1
    check_basic(g, nf)

Da Zheng's avatar
Da Zheng committed
163

164
def check_apply_nodes(create_node_flow, use_negative_block_id):
Da Zheng's avatar
Da Zheng committed
165
166
    num_layers = 2
    for i in range(num_layers):
167
        l = -num_layers + i if use_negative_block_id else i
Da Zheng's avatar
Da Zheng committed
168
169
170
        g = generate_rand_graph(100)
        nf = create_node_flow(g, num_layers)
        nf.copy_from_parent()
171
        new_feats = F.randn((nf.layer_size(l), 5))
VoVAllen's avatar
VoVAllen committed
172

Da Zheng's avatar
Da Zheng committed
173
        def update_func(nodes):
VoVAllen's avatar
VoVAllen committed
174
            return {'h1': new_feats}
175
        nf.apply_layer(l, update_func)
VoVAllen's avatar
VoVAllen committed
176
177
        assert_array_equal(
            F.asnumpy(nf.layers[l].data['h1']), F.asnumpy(new_feats))
Da Zheng's avatar
Da Zheng committed
178
179

        new_feats = F.randn((4, 5))
VoVAllen's avatar
VoVAllen committed
180

Da Zheng's avatar
Da Zheng committed
181
        def update_func1(nodes):
VoVAllen's avatar
VoVAllen committed
182
            return {'h1': new_feats}
183
        nf.apply_layer(l, update_func1, v=nf.layer_nid(l)[0:4])
VoVAllen's avatar
VoVAllen committed
184
185
        assert_array_equal(
            F.asnumpy(nf.layers[l].data['h1'][0:4]), F.asnumpy(new_feats))
Da Zheng's avatar
Da Zheng committed
186
187
188


def test_apply_nodes():
189
190
191
192
    check_apply_nodes(create_full_nodeflow, use_negative_block_id=False)
    check_apply_nodes(create_mini_batch, use_negative_block_id=False)
    check_apply_nodes(create_full_nodeflow, use_negative_block_id=True)
    check_apply_nodes(create_mini_batch, use_negative_block_id=True)
Da Zheng's avatar
Da Zheng committed
193
194
195
196
197
198


def check_apply_edges(create_node_flow):
    num_layers = 2
    for i in range(num_layers):
        g = generate_rand_graph(100)
199
        g.ndata["f"] = F.randn((100, 10))
Da Zheng's avatar
Da Zheng committed
200
201
202
        nf = create_node_flow(g, num_layers)
        nf.copy_from_parent()
        new_feats = F.randn((nf.block_size(i), 5))
203
204
205
206

        def update_func(edges):
            return {'h2': new_feats, "f2": edges.src["f"] + edges.dst["f"]}

Da Zheng's avatar
Da Zheng committed
207
        nf.apply_block(i, update_func)
VoVAllen's avatar
VoVAllen committed
208
209
        assert_array_equal(
            F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
Da Zheng's avatar
Da Zheng committed
210

211
212
        # should also work for negative block ids
        nf.apply_block(-num_layers + i, update_func)
VoVAllen's avatar
VoVAllen committed
213
214
        assert_array_equal(
            F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
215

216
217
        eids = nf.block_parent_eid(i)
        srcs, dsts = g.find_edges(eids)
218
        expected_f_sum = g.nodes[srcs].data["f"] + g.nodes[dsts].data["f"]
VoVAllen's avatar
VoVAllen committed
219
220
        assert_array_equal(
            F.asnumpy(nf.blocks[i].data['f2']), F.asnumpy(expected_f_sum))
221

Da Zheng's avatar
Da Zheng committed
222

223
224
225
226
227
228
229
230
231
232
233
234
235
236
def check_apply_edges1(create_node_flow):
    num_layers = 2
    for i in range(num_layers):
        g = generate_rand_graph(100)
        g.ndata["f"] = F.randn((100, 10))
        nf = create_node_flow(g, num_layers)
        nf.copy_from_parent()
        new_feats = F.randn((nf.block_size(i), 5))

        def update_func(edges):
            return {'h2': new_feats, "f2": edges.src["f"] + edges.dst["f"]}

        nf.register_apply_edge_func(update_func, i)
        nf.apply_block(i)
VoVAllen's avatar
VoVAllen committed
237
238
        assert_array_equal(
            F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
239
240
241
242

        # should also work for negative block ids
        nf.register_apply_edge_func(update_func, -num_layers + i)
        nf.apply_block(-num_layers + i)
VoVAllen's avatar
VoVAllen committed
243
244
        assert_array_equal(
            F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
245
246
247
248
249

        eids = nf.block_parent_eid(i)
        srcs, dsts = g.find_edges(eids)
        expected_f_sum = g.nodes[srcs].data["f"] + g.nodes[dsts].data["f"]
        #expected_f_sum = g.ndata["f"][srcs] + g.ndata["f"][dsts]
VoVAllen's avatar
VoVAllen committed
250
251
        assert_array_equal(
            F.asnumpy(nf.blocks[i].data['f2']), F.asnumpy(expected_f_sum))
252
253


Da Zheng's avatar
Da Zheng committed
254
def test_apply_edges():
255
    check_apply_edges(create_full_nodeflow)
Da Zheng's avatar
Da Zheng committed
256
    check_apply_edges(create_mini_batch)
257
    check_apply_edges1(create_mini_batch)
Da Zheng's avatar
Da Zheng committed
258
259


260
def check_flow_compute(create_node_flow, use_negative_block_id=False):
Da Zheng's avatar
Da Zheng committed
261
262
263
264
265
266
267
268
    num_layers = 2
    g = generate_rand_graph(100)
    nf = create_node_flow(g, num_layers)
    nf.copy_from_parent()
    g.ndata['h'] = g.ndata['h1']
    nf.layers[0].data['h'] = nf.layers[0].data['h1']
    # Test the computation on a layer at a time.
    for i in range(num_layers):
269
270
        l = -num_layers + i if use_negative_block_id else i
        nf.block_compute(l, fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
271
                         lambda nodes: {'h': nodes.data['t'] + 1})
Da Zheng's avatar
Da Zheng committed
272
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
273
                     lambda nodes: {'h': nodes.data['t'] + 1})
274
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
VoVAllen's avatar
VoVAllen committed
275
276
                        F.asnumpy(
                            g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
277
                        rtol=1e-4, atol=1e-4)
Da Zheng's avatar
Da Zheng committed
278
279
280
281

    # Test the computation when only a few nodes are active in a layer.
    g.ndata['h'] = g.ndata['h1']
    for i in range(num_layers):
282
        l = -num_layers + i if use_negative_block_id else i
Da Zheng's avatar
Da Zheng committed
283
        vs = nf.layer_nid(i+1)[0:4]
284
        nf.block_compute(l, fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
285
                         lambda nodes: {'h': nodes.data['t'] + 1}, v=vs)
Da Zheng's avatar
Da Zheng committed
286
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
287
                     lambda nodes: {'h': nodes.data['t'] + 1})
Da Zheng's avatar
Da Zheng committed
288
        data1 = nf.layers[i + 1].data['h'][0:4]
289
        data2 = g.nodes[nf.map_to_parent_nid(vs)].data['h']
VoVAllen's avatar
VoVAllen committed
290
291
292
        assert_allclose(F.asnumpy(data1), F.asnumpy(
            data2), rtol=1e-4, atol=1e-4)

Da Zheng's avatar
Da Zheng committed
293

294
295
296
297
298
299
300
301
302
303
304
305
306
def check_flow_compute1(create_node_flow, use_negative_block_id=False):
    num_layers = 2
    g = generate_rand_graph(100)

    # test the case that we register UDFs per block.
    nf = create_node_flow(g, num_layers)
    nf.copy_from_parent()
    g.ndata['h'] = g.ndata['h1']
    nf.layers[0].data['h'] = nf.layers[0].data['h1']
    for i in range(num_layers):
        l = -num_layers + i if use_negative_block_id else i
        nf.register_message_func(fn.copy_src(src='h', out='m'), l)
        nf.register_reduce_func(fn.sum(msg='m', out='t'), l)
VoVAllen's avatar
VoVAllen committed
307
308
        nf.register_apply_node_func(
            lambda nodes: {'h': nodes.data['t'] + 1}, l)
309
310
        nf.block_compute(l)
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
311
                     lambda nodes: {'h': nodes.data['t'] + 1})
312
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
VoVAllen's avatar
VoVAllen committed
313
314
                        F.asnumpy(
                            g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
315
                        rtol=1e-4, atol=1e-4)
316
317
318
319
320
321
322
323

    # test the case that we register UDFs in all blocks.
    nf = create_node_flow(g, num_layers)
    nf.copy_from_parent()
    g.ndata['h'] = g.ndata['h1']
    nf.layers[0].data['h'] = nf.layers[0].data['h1']
    nf.register_message_func(fn.copy_src(src='h', out='m'))
    nf.register_reduce_func(fn.sum(msg='m', out='t'))
VoVAllen's avatar
VoVAllen committed
324
    nf.register_apply_node_func(lambda nodes: {'h': nodes.data['t'] + 1})
325
326
327
328
    for i in range(num_layers):
        l = -num_layers + i if use_negative_block_id else i
        nf.block_compute(l)
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
329
                     lambda nodes: {'h': nodes.data['t'] + 1})
330
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
VoVAllen's avatar
VoVAllen committed
331
332
                        F.asnumpy(
                            g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
333
334
                        rtol=1e-4, atol=1e-4)

VoVAllen's avatar
VoVAllen committed
335

336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
class SrcMulEdgeMessageFunction(object):
    def __init__(self, src_field, edge_field, out_field):
        self.mul_op = operator.mul
        self.src_field = src_field
        self.edge_field = edge_field
        self.out_field = out_field

    def __call__(self, edges):
        sdata = edges.src[self.src_field]
        edata = edges.data[self.edge_field]
        # Due to the different broadcasting semantics of different backends,
        # we need to broadcast the sdata and edata to be of the same rank.
        rank = max(F.ndim(sdata), F.ndim(edata))
        sshape = F.shape(sdata)
        eshape = F.shape(edata)
        sdata = F.reshape(sdata, sshape + (1,) * (rank - F.ndim(sdata)))
        edata = F.reshape(edata, eshape + (1,) * (rank - F.ndim(edata)))
        ret = self.mul_op(sdata, edata)
VoVAllen's avatar
VoVAllen committed
354
355
        return {self.out_field: ret}

356
357
358
359
360

def check_flow_compute2(create_node_flow):
    num_layers = 2
    g = generate_rand_graph(100)
    g.edata['h'] = F.ones((g.number_of_edges(), 10))
361

362
363
364
365
366
    nf = create_node_flow(g, num_layers)
    nf.copy_from_parent()
    g.ndata['h'] = g.ndata['h1']
    nf.layers[0].data['h'] = nf.layers[0].data['h1']
    for i in range(num_layers):
VoVAllen's avatar
VoVAllen committed
367
368
        nf.block_compute(i, SrcMulEdgeMessageFunction(
            'h', 'h', 't'), fn.sum('t', 'h1'))
369
370
371
372
373
374
        nf.block_compute(i, fn.src_mul_edge('h', 'h', 'h'), fn.sum('h', 'h'))
        g.update_all(fn.src_mul_edge('h', 'h', 'h'), fn.sum('h', 'h'))
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h1']),
                        F.asnumpy(nf.layers[i + 1].data['h']),
                        rtol=1e-4, atol=1e-4)
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
VoVAllen's avatar
VoVAllen committed
375
376
                        F.asnumpy(
                            g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
377
378
379
380
381
382
383
384
385
386
387
                        rtol=1e-4, atol=1e-4)

    nf = create_node_flow(g, num_layers)
    g.ndata['h'] = g.ndata['h1']
    nf.copy_from_parent()
    for i in range(nf.num_layers):
        nf.layers[i].data['h'] = nf.layers[i].data['h1']
    for i in range(num_layers):
        nf.block_compute(i, fn.u_mul_v('h', 'h', 't'), fn.sum('t', 's'))
        g.update_all(fn.u_mul_v('h', 'h', 't'), fn.sum('t', 's'))
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['s']),
VoVAllen's avatar
VoVAllen committed
388
389
                        F.asnumpy(
                            g.nodes[nf.layer_parent_nid(i + 1)].data['s']),
390
                        rtol=1e-4, atol=1e-4)
391

VoVAllen's avatar
VoVAllen committed
392

Da Zheng's avatar
Da Zheng committed
393
def test_flow_compute():
394
    check_flow_compute(create_full_nodeflow)
Da Zheng's avatar
Da Zheng committed
395
    check_flow_compute(create_mini_batch)
396
397
    check_flow_compute(create_full_nodeflow, use_negative_block_id=True)
    check_flow_compute(create_mini_batch, use_negative_block_id=True)
398
399
    check_flow_compute1(create_mini_batch)
    check_flow_compute1(create_mini_batch, use_negative_block_id=True)
400
    check_flow_compute2(create_mini_batch)
Da Zheng's avatar
Da Zheng committed
401
402
403
404
405
406
407
408
409
410
411


def check_prop_flows(create_node_flow):
    num_layers = 2
    g = generate_rand_graph(100)
    g.ndata['h'] = g.ndata['h1']
    nf2 = create_node_flow(g, num_layers)
    nf2.copy_from_parent()
    # Test the computation on a layer at a time.
    for i in range(num_layers):
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
412
                     lambda nodes: {'h': nodes.data['t'] + 1})
Da Zheng's avatar
Da Zheng committed
413
414
415

    # Test the computation on all layers.
    nf2.prop_flow(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
416
                  lambda nodes: {'h': nodes.data['t'] + 1})
417
418
419
    assert_allclose(F.asnumpy(nf2.layers[-1].data['h']),
                    F.asnumpy(g.nodes[nf2.layer_parent_nid(-1)].data['h']),
                    rtol=1e-4, atol=1e-4)
Da Zheng's avatar
Da Zheng committed
420
421
422


def test_prop_flows():
423
    check_prop_flows(create_full_nodeflow)
Da Zheng's avatar
Da Zheng committed
424
425
426
    check_prop_flows(create_mini_batch)


VoVAllen's avatar
VoVAllen committed
427
428
@unittest.skipIf(dgl.backend.backend_name == "tensorflow",
                 reason="TF doesn't support inplace update, nf.copy_to_parent will trigger this")
Da Zheng's avatar
Da Zheng committed
429
430
431
432
433
434
435
436
437
438
def test_copy():
    num_layers = 2
    g = generate_rand_graph(100)
    g.ndata['h'] = g.ndata['h1']
    nf = create_mini_batch(g, num_layers)
    nf.copy_from_parent()
    for i in range(nf.num_layers):
        assert len(g.ndata.keys()) == len(nf.layers[i].data.keys())
        for key in g.ndata.keys():
            assert key in nf.layers[i].data.keys()
439
440
            assert_array_equal(F.asnumpy(nf.layers[i].data[key]),
                               F.asnumpy(g.nodes[nf.layer_parent_nid(i)].data[key]))
Da Zheng's avatar
Da Zheng committed
441
442
443
444
    for i in range(nf.num_blocks):
        assert len(g.edata.keys()) == len(nf.blocks[i].data.keys())
        for key in g.edata.keys():
            assert key in nf.blocks[i].data.keys()
445
446
            assert_array_equal(F.asnumpy(nf.blocks[i].data[key]),
                               F.asnumpy(g.edges[nf.block_parent_eid(i)].data[key]))
Da Zheng's avatar
Da Zheng committed
447
448
449
450

    nf = create_mini_batch(g, num_layers)
    node_embed_names = [['h'], ['h1'], ['h']]
    edge_embed_names = [['h2'], ['h2']]
VoVAllen's avatar
VoVAllen committed
451
452
    nf.copy_from_parent(node_embed_names=node_embed_names,
                        edge_embed_names=edge_embed_names)
Da Zheng's avatar
Da Zheng committed
453
454
455
456
    for i in range(nf.num_layers):
        assert len(node_embed_names[i]) == len(nf.layers[i].data.keys())
        for key in node_embed_names[i]:
            assert key in nf.layers[i].data.keys()
457
458
            assert_array_equal(F.asnumpy(nf.layers[i].data[key]),
                               F.asnumpy(g.nodes[nf.layer_parent_nid(i)].data[key]))
Da Zheng's avatar
Da Zheng committed
459
460
461
462
    for i in range(nf.num_blocks):
        assert len(edge_embed_names[i]) == len(nf.blocks[i].data.keys())
        for key in edge_embed_names[i]:
            assert key in nf.blocks[i].data.keys()
463
464
            assert_array_equal(F.asnumpy(nf.blocks[i].data[key]),
                               F.asnumpy(g.edges[nf.block_parent_eid(i)].data[key]))
Da Zheng's avatar
Da Zheng committed
465
466
467
468

    nf = create_mini_batch(g, num_layers)
    g.ndata['h0'] = F.clone(g.ndata['h'])
    node_embed_names = [['h0'], [], []]
VoVAllen's avatar
VoVAllen committed
469
470
    nf.copy_from_parent(node_embed_names=node_embed_names,
                        edge_embed_names=None)
Da Zheng's avatar
Da Zheng committed
471
472
    for i in range(num_layers):
        nf.block_compute(i, fn.copy_src(src='h%d' % i, out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
473
                         lambda nodes: {'h%d' % (i+1): nodes.data['t'] + 1})
Da Zheng's avatar
Da Zheng committed
474
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
475
                     lambda nodes: {'h': nodes.data['t'] + 1})
476
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h%d' % (i+1)]),
VoVAllen's avatar
VoVAllen committed
477
478
                        F.asnumpy(
                            g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
479
                        rtol=1e-4, atol=1e-4)
Da Zheng's avatar
Da Zheng committed
480
481
    nf.copy_to_parent(node_embed_names=[['h0'], ['h1'], ['h2']])
    for i in range(num_layers + 1):
482
483
        assert_array_equal(F.asnumpy(nf.layers[i].data['h%d' % i]),
                           F.asnumpy(g.nodes[nf.layer_parent_nid(i)].data['h%d' % i]))
Da Zheng's avatar
Da Zheng committed
484
485
486
487
488
489

    nf = create_mini_batch(g, num_layers)
    g.ndata['h0'] = F.clone(g.ndata['h'])
    g.ndata['h1'] = F.clone(g.ndata['h'])
    g.ndata['h2'] = F.clone(g.ndata['h'])
    node_embed_names = [['h0'], ['h1'], ['h2']]
VoVAllen's avatar
VoVAllen committed
490
491
    nf.copy_from_parent(node_embed_names=node_embed_names,
                        edge_embed_names=None)
Da Zheng's avatar
Da Zheng committed
492
493
494

    def msg_func(edge, ind):
        assert 'h%d' % ind in edge.src.keys()
VoVAllen's avatar
VoVAllen committed
495
496
        return {'m': edge.src['h%d' % ind]}

Da Zheng's avatar
Da Zheng committed
497
498
    def reduce_func(node, ind):
        assert 'h%d' % (ind + 1) in node.data.keys()
VoVAllen's avatar
VoVAllen committed
499
        return {'h': F.sum(node.mailbox['m'], 1) + node.data['h%d' % (ind + 1)]}
Da Zheng's avatar
Da Zheng committed
500
501

    for i in range(num_layers):
VoVAllen's avatar
VoVAllen committed
502
503
        nf.block_compute(i, partial(msg_func, ind=i),
                         partial(reduce_func, ind=i))
Da Zheng's avatar
Da Zheng committed
504
505


506
def test_block_edges():
507
508
509
510
511
    num_layers = 3
    g = generate_rand_graph(100)
    nf = create_mini_batch(g, num_layers)
    assert nf.num_layers == num_layers + 1
    for i in range(nf.num_blocks):
512
513
514
515
516
517
518
        dest_nodes = utils.toindex(nf.layer_nid(i + 1))
        src1, dst1, eid1 = nf.in_edges(dest_nodes, 'all')

        src, dst, eid = nf.block_edges(i)
        assert_array_equal(F.asnumpy(src), F.asnumpy(src1))
        assert_array_equal(F.asnumpy(dst), F.asnumpy(dst1))
        assert_array_equal(F.asnumpy(eid), F.asnumpy(eid1))
519

520
        src, dst, eid = nf.block_edges(i, remap_local=True)
521
        # should also work for negative block ids
522
523
524
525
526
        src_by_neg, dst_by_neg, eid_by_neg = nf.block_edges(-nf.num_blocks + i,
                                                            remap_local=True)
        assert_array_equal(F.asnumpy(src), F.asnumpy(src_by_neg))
        assert_array_equal(F.asnumpy(dst), F.asnumpy(dst_by_neg))
        assert_array_equal(F.asnumpy(eid), F.asnumpy(eid_by_neg))
527

528
529
530
531
        src1 = nf._glb2lcl_nid(src1, i)
        dst1 = nf._glb2lcl_nid(dst1, i + 1)
        assert_array_equal(F.asnumpy(src), F.asnumpy(src1))
        assert_array_equal(F.asnumpy(dst), F.asnumpy(dst1))
532

533
534
535
536
537
538
539

def test_block_adj_matrix():
    num_layers = 3
    g = generate_rand_graph(100)
    nf = create_mini_batch(g, num_layers)
    assert nf.num_layers == num_layers + 1
    for i in range(nf.num_blocks):
540
        u, v, _ = nf.block_edges(i, remap_local=True)
541
542
        adj, _ = nf.block_adjacency_matrix(i, F.cpu())
        adj = F.sparse_to_numpy(adj)
543
544
545
546
547

        # should also work for negative block ids
        adj_by_neg, _ = nf.block_adjacency_matrix(-nf.num_blocks + i, F.cpu())
        adj_by_neg = F.sparse_to_numpy(adj_by_neg)

548
549
550
551
552
        data = np.ones((len(u)), dtype=np.float32)
        v = utils.toindex(v)
        u = utils.toindex(u)
        coo = sp.sparse.coo_matrix((data, (v.tonumpy(), u.tonumpy())),
                                   shape=adj.shape).todense()
553
554
        assert_array_equal(adj, coo)
        assert_array_equal(adj_by_neg, coo)
555
556
557
558
559
560
561
562


def test_block_incidence_matrix():
    num_layers = 3
    g = generate_rand_graph(100)
    nf = create_mini_batch(g, num_layers)
    assert nf.num_layers == num_layers + 1
    for i in range(nf.num_blocks):
VoVAllen's avatar
VoVAllen committed
563
        typestrs = ["in", "out"]  # todo need fix for "both"
564
565
566
567
568
569
570
571
572
        adjs = []
        for typestr in typestrs:
            adj, _ = nf.block_incidence_matrix(i, typestr, F.cpu())
            adj = F.sparse_to_numpy(adj)
            adjs.append(adj)

        # should work for negative block ids
        adjs_by_neg = []
        for typestr in typestrs:
VoVAllen's avatar
VoVAllen committed
573
574
            adj_by_neg, _ = nf.block_incidence_matrix(
                -nf.num_blocks + i, typestr, F.cpu())
575
576
577
            adj_by_neg = F.sparse_to_numpy(adj_by_neg)
            adjs_by_neg.append(adj_by_neg)

578
        u, v, e = nf.block_edges(i, remap_local=True)
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
        u = utils.toindex(u)
        v = utils.toindex(v)
        e = utils.toindex(e)

        expected = []
        data_in_and_out = np.ones((len(u)), dtype=np.float32)
        expected.append(
            sp.sparse.coo_matrix((data_in_and_out, (v.tonumpy(), e.tonumpy())),
                                 shape=adjs[0].shape).todense()
        )
        expected.append(
            sp.sparse.coo_matrix((data_in_and_out, (u.tonumpy(), e.tonumpy())),
                                 shape=adjs[1].shape).todense()
        )
        for i in range(len(typestrs)):
594
595
            assert_array_equal(adjs[i], expected[i])
            assert_array_equal(adjs_by_neg[i], expected[i])
596
597


Da Zheng's avatar
Da Zheng committed
598
599
if __name__ == '__main__':
    test_basic()
600
    test_block_adj_matrix()
Da Zheng's avatar
Da Zheng committed
601
602
603
604
605
    test_copy()
    test_apply_nodes()
    test_apply_edges()
    test_flow_compute()
    test_prop_flows()
606
    test_self_loop()
607
    test_block_edges()
608
    test_block_incidence_matrix()