test_nodeflow.py 24.1 KB
Newer Older
Da Zheng's avatar
Da Zheng committed
1
2
import backend as F
import numpy as np
3
from numpy.testing import assert_array_equal, assert_allclose
Da Zheng's avatar
Da Zheng committed
4
import scipy as sp
5
import operator
Da Zheng's avatar
Da Zheng committed
6
import dgl
7
from dgl.contrib.sampling.sampler import create_full_nodeflow, NeighborSampler
Da Zheng's avatar
Da Zheng committed
8
9
10
from dgl import utils
import dgl.function as fn
from functools import partial
11
import itertools
VoVAllen's avatar
VoVAllen committed
12
13
import unittest

14

Da Zheng's avatar
Da Zheng committed
15
def generate_rand_graph(n, connect_more=False, complete=False, add_self_loop=False):
16
    if complete:
VoVAllen's avatar
VoVAllen committed
17
18
        cord = [(i, j)
                for i, j in itertools.product(range(n), range(n)) if i != j]
19
20
21
22
23
        row = [t[0] for t in cord]
        col = [t[1] for t in cord]
        data = np.ones((len(row),))
        arr = sp.sparse.coo_matrix((data, (row, col)), shape=(n, n))
    else:
VoVAllen's avatar
VoVAllen committed
24
25
        arr = (sp.sparse.random(n, n, density=0.1,
                                format='coo') != 0).astype(np.int64)
26
27
28
        # having one node to connect to all other nodes.
        if connect_more:
            arr[0] = 1
VoVAllen's avatar
VoVAllen committed
29
            arr[:, 0] = 1
Da Zheng's avatar
Da Zheng committed
30
31
32
33
34
35
36
    if add_self_loop:
        g = dgl.DGLGraph(arr, readonly=False)
        nodes = np.arange(g.number_of_nodes())
        g.add_edges(nodes, nodes)
        g.readonly()
    else:
        g = dgl.DGLGraph(arr, readonly=True)
Da Zheng's avatar
Da Zheng committed
37
38
39
40
41
    g.ndata['h1'] = F.randn((g.number_of_nodes(), 10))
    g.edata['h2'] = F.randn((g.number_of_edges(), 3))
    return g


42
43
44
45
46
47
48
def test_self_loop():
    n = 100
    num_hops = 2
    g = generate_rand_graph(n, complete=True)
    nf = create_mini_batch(g, num_hops, add_self_loop=True)
    for i in range(1, nf.num_layers):
        in_deg = nf.layer_in_degree(i)
49
        deg = F.copy_to(F.ones(in_deg.shape, dtype=F.int64), F.cpu()) * n
50
        assert_array_equal(F.asnumpy(in_deg), F.asnumpy(deg))
51

Da Zheng's avatar
Da Zheng committed
52
53
54
55
56
57
58
59
60
61
62
63
    g = generate_rand_graph(n, complete=True, add_self_loop=True)
    g = dgl.to_simple_graph(g)
    nf = create_mini_batch(g, num_hops, add_self_loop=True)
    for i in range(nf.num_blocks):
        parent_eid = F.asnumpy(nf.block_parent_eid(i))
        parent_nid = F.asnumpy(nf.layer_parent_nid(i + 1))
        # The loop eid in the parent graph must exist in the block parent eid.
        parent_loop_eid = F.asnumpy(g.edge_ids(parent_nid, parent_nid))
        assert len(parent_loop_eid) == len(parent_nid)
        for eid in parent_loop_eid:
            assert eid in parent_eid

VoVAllen's avatar
VoVAllen committed
64

65
def create_mini_batch(g, num_hops, add_self_loop=False):
66
    seed_ids = np.array([1, 2, 0, 3])
67
    sampler = NeighborSampler(g, batch_size=4, expand_factor=g.number_of_nodes(),
VoVAllen's avatar
VoVAllen committed
68
                              num_hops=num_hops, seed_nodes=seed_ids, add_self_loop=add_self_loop)
69
70
    nfs = list(sampler)
    assert len(nfs) == 1
71
    assert_array_equal(F.asnumpy(nfs[0].layer_parent_nid(-1)), seed_ids)
72
    return nfs[0]
Da Zheng's avatar
Da Zheng committed
73

VoVAllen's avatar
VoVAllen committed
74

Da Zheng's avatar
Da Zheng committed
75
76
77
78
79
80
81
82
83
def check_basic(g, nf):
    num_nodes = 0
    for i in range(nf.num_layers):
        num_nodes += nf.layer_size(i)
    assert nf.number_of_nodes() == num_nodes
    num_edges = 0
    for i in range(nf.num_blocks):
        num_edges += nf.block_size(i)
    assert nf.number_of_edges() == num_edges
84
85
86
87
88
89
    assert len(nf) == num_nodes
    assert nf.is_readonly

    assert np.all(F.asnumpy(nf.has_nodes(list(range(num_nodes)))))
    for i in range(num_nodes):
        assert nf.has_node(i)
VoVAllen's avatar
VoVAllen committed
90
91
    assert np.all(F.asnumpy(nf.has_nodes(
        list(range(num_nodes, 2 * num_nodes)))) == 0)
92
93
94
95
96
97
    for i in range(num_nodes, 2 * num_nodes):
        assert not nf.has_node(i)

    for block_id in range(nf.num_blocks):
        u, v, eid = nf.block_edges(block_id)
        assert np.all(F.asnumpy(nf.has_edges_between(u, v)))
Da Zheng's avatar
Da Zheng committed
98
99

    deg = nf.layer_in_degree(0)
100
    assert_array_equal(F.asnumpy(deg), np.zeros((nf.layer_size(0)), np.int64))
Da Zheng's avatar
Da Zheng committed
101
    deg = nf.layer_out_degree(-1)
102
103
104
    assert_array_equal(F.asnumpy(deg), np.zeros((nf.layer_size(-1)), np.int64))

    nf.copy_from_parent()
Da Zheng's avatar
Da Zheng committed
105
106
107
108
109
    for i in range(1, nf.num_layers):
        in_deg = nf.layer_in_degree(i)
        out_deg = nf.layer_out_degree(i - 1)
        assert F.asnumpy(F.sum(in_deg, 0) == F.sum(out_deg, 0))

110
111
112
113
114
115
116
117
118
119
120
121
122
123
        nids = nf.layer_nid(i)
        parent_nids = nf.map_to_parent_nid(nids)
        nids1 = nf.map_from_parent_nid(i, parent_nids)
        assert_array_equal(F.asnumpy(nids), F.asnumpy(nids1))

        data = nf.layers[i].data['h1']
        data1 = g.nodes[nf.layer_parent_nid(i)].data['h1']
        assert_array_equal(F.asnumpy(data), F.asnumpy(data1))

    for i in range(nf.num_blocks):
        data = nf.blocks[i].data['h2']
        data1 = g.edges[nf.block_parent_eid(i)].data['h2']
        assert_array_equal(F.asnumpy(data), F.asnumpy(data1))

124
125
126
127
128
129
    # negative layer Ids.
    for i in range(-1, -nf.num_layers, -1):
        in_deg = nf.layer_in_degree(i)
        out_deg = nf.layer_out_degree(i - 1)
        assert F.asnumpy(F.sum(in_deg, 0) == F.sum(out_deg, 0))

Da Zheng's avatar
Da Zheng committed
130
131
132
133

def test_basic():
    num_layers = 2
    g = generate_rand_graph(100, connect_more=True)
134
    nf = create_full_nodeflow(g, num_layers)
Da Zheng's avatar
Da Zheng committed
135
136
137
138
139
140
141
    assert nf.number_of_nodes() == g.number_of_nodes() * (num_layers + 1)
    assert nf.number_of_edges() == g.number_of_edges() * num_layers
    assert nf.num_layers == num_layers + 1
    assert nf.layer_size(0) == g.number_of_nodes()
    assert nf.layer_size(1) == g.number_of_nodes()
    check_basic(g, nf)

142
    parent_nids = F.copy_to(F.arange(0, g.number_of_nodes()), F.cpu())
143
144
    nids = nf.map_from_parent_nid(0, parent_nids, remap_local=True)
    assert_array_equal(F.asnumpy(nids), F.asnumpy(parent_nids))
Da Zheng's avatar
Da Zheng committed
145

146
147
    # should also work for negative layer ids
    for l in range(-1, -num_layers, -1):
148
        nids1 = nf.map_from_parent_nid(l, parent_nids, remap_local=True)
VoVAllen's avatar
VoVAllen committed
149
150
        nids2 = nf.map_from_parent_nid(
            l + num_layers, parent_nids, remap_local=True)
151
        assert_array_equal(F.asnumpy(nids1), F.asnumpy(nids2))
152

Da Zheng's avatar
Da Zheng committed
153
154
155
156
157
    g = generate_rand_graph(100)
    nf = create_mini_batch(g, num_layers)
    assert nf.num_layers == num_layers + 1
    check_basic(g, nf)

Da Zheng's avatar
Da Zheng committed
158
159
160
161
162
    g = generate_rand_graph(100, add_self_loop=True)
    nf = create_mini_batch(g, num_layers, add_self_loop=True)
    assert nf.num_layers == num_layers + 1
    check_basic(g, nf)

Da Zheng's avatar
Da Zheng committed
163

164
def check_apply_nodes(create_node_flow, use_negative_block_id):
Da Zheng's avatar
Da Zheng committed
165
166
    num_layers = 2
    for i in range(num_layers):
167
        l = -num_layers + i if use_negative_block_id else i
Da Zheng's avatar
Da Zheng committed
168
169
170
        g = generate_rand_graph(100)
        nf = create_node_flow(g, num_layers)
        nf.copy_from_parent()
171
        new_feats = F.randn((nf.layer_size(l), 5))
VoVAllen's avatar
VoVAllen committed
172

Da Zheng's avatar
Da Zheng committed
173
        def update_func(nodes):
VoVAllen's avatar
VoVAllen committed
174
            return {'h1': new_feats}
175
        nf.apply_layer(l, update_func)
VoVAllen's avatar
VoVAllen committed
176
177
        assert_array_equal(
            F.asnumpy(nf.layers[l].data['h1']), F.asnumpy(new_feats))
Da Zheng's avatar
Da Zheng committed
178
179

        new_feats = F.randn((4, 5))
VoVAllen's avatar
VoVAllen committed
180

Da Zheng's avatar
Da Zheng committed
181
        def update_func1(nodes):
VoVAllen's avatar
VoVAllen committed
182
            return {'h1': new_feats}
183
        nf.apply_layer(l, update_func1, v=nf.layer_nid(l)[0:4])
VoVAllen's avatar
VoVAllen committed
184
185
        assert_array_equal(
            F.asnumpy(nf.layers[l].data['h1'][0:4]), F.asnumpy(new_feats))
Da Zheng's avatar
Da Zheng committed
186
187
188


def test_apply_nodes():
189
190
191
192
    check_apply_nodes(create_full_nodeflow, use_negative_block_id=False)
    check_apply_nodes(create_mini_batch, use_negative_block_id=False)
    check_apply_nodes(create_full_nodeflow, use_negative_block_id=True)
    check_apply_nodes(create_mini_batch, use_negative_block_id=True)
Da Zheng's avatar
Da Zheng committed
193
194
195
196
197
198


def check_apply_edges(create_node_flow):
    num_layers = 2
    for i in range(num_layers):
        g = generate_rand_graph(100)
199
        g.ndata["f"] = F.randn((100, 10))
Da Zheng's avatar
Da Zheng committed
200
201
202
        nf = create_node_flow(g, num_layers)
        nf.copy_from_parent()
        new_feats = F.randn((nf.block_size(i), 5))
203
204
205
206

        def update_func(edges):
            return {'h2': new_feats, "f2": edges.src["f"] + edges.dst["f"]}

Da Zheng's avatar
Da Zheng committed
207
        nf.apply_block(i, update_func)
VoVAllen's avatar
VoVAllen committed
208
209
        assert_array_equal(
            F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
Da Zheng's avatar
Da Zheng committed
210

211
212
        # should also work for negative block ids
        nf.apply_block(-num_layers + i, update_func)
VoVAllen's avatar
VoVAllen committed
213
214
        assert_array_equal(
            F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
215

216
217
        eids = nf.block_parent_eid(i)
        srcs, dsts = g.find_edges(eids)
218
        expected_f_sum = g.nodes[srcs].data["f"] + g.nodes[dsts].data["f"]
VoVAllen's avatar
VoVAllen committed
219
220
        assert_array_equal(
            F.asnumpy(nf.blocks[i].data['f2']), F.asnumpy(expected_f_sum))
221

222
223
224
225
226
227
228
        # test built-in
        nf.apply_block(i, fn.u_add_v('f', 'f', 'f2'))
        eids = nf.block_parent_eid(i)
        srcs, dsts = g.find_edges(eids)
        expected_f_sum = g.nodes[srcs].data["f"] + g.nodes[dsts].data["f"]
        assert_array_equal(
        F.asnumpy(nf.blocks[i].data['f2']), F.asnumpy(expected_f_sum))
Da Zheng's avatar
Da Zheng committed
229

230
231
232
233
234
235
236
237
238
239
240
241
242
243
def check_apply_edges1(create_node_flow):
    num_layers = 2
    for i in range(num_layers):
        g = generate_rand_graph(100)
        g.ndata["f"] = F.randn((100, 10))
        nf = create_node_flow(g, num_layers)
        nf.copy_from_parent()
        new_feats = F.randn((nf.block_size(i), 5))

        def update_func(edges):
            return {'h2': new_feats, "f2": edges.src["f"] + edges.dst["f"]}

        nf.register_apply_edge_func(update_func, i)
        nf.apply_block(i)
VoVAllen's avatar
VoVAllen committed
244
245
        assert_array_equal(
            F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
246
247
248
249

        # should also work for negative block ids
        nf.register_apply_edge_func(update_func, -num_layers + i)
        nf.apply_block(-num_layers + i)
VoVAllen's avatar
VoVAllen committed
250
251
        assert_array_equal(
            F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
252
253
254
255
256

        eids = nf.block_parent_eid(i)
        srcs, dsts = g.find_edges(eids)
        expected_f_sum = g.nodes[srcs].data["f"] + g.nodes[dsts].data["f"]
        #expected_f_sum = g.ndata["f"][srcs] + g.ndata["f"][dsts]
VoVAllen's avatar
VoVAllen committed
257
258
        assert_array_equal(
            F.asnumpy(nf.blocks[i].data['f2']), F.asnumpy(expected_f_sum))
259
260


Da Zheng's avatar
Da Zheng committed
261
def test_apply_edges():
262
    check_apply_edges(create_full_nodeflow)
Da Zheng's avatar
Da Zheng committed
263
    check_apply_edges(create_mini_batch)
264
    check_apply_edges1(create_mini_batch)
Da Zheng's avatar
Da Zheng committed
265
266


267
def check_flow_compute(create_node_flow, use_negative_block_id=False):
Da Zheng's avatar
Da Zheng committed
268
269
270
271
272
273
274
275
    num_layers = 2
    g = generate_rand_graph(100)
    nf = create_node_flow(g, num_layers)
    nf.copy_from_parent()
    g.ndata['h'] = g.ndata['h1']
    nf.layers[0].data['h'] = nf.layers[0].data['h1']
    # Test the computation on a layer at a time.
    for i in range(num_layers):
276
277
        l = -num_layers + i if use_negative_block_id else i
        nf.block_compute(l, fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
278
                         lambda nodes: {'h': nodes.data['t'] + 1})
Da Zheng's avatar
Da Zheng committed
279
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
280
                     lambda nodes: {'h': nodes.data['t'] + 1})
281
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
VoVAllen's avatar
VoVAllen committed
282
283
                        F.asnumpy(
                            g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
284
                        rtol=1e-4, atol=1e-4)
Da Zheng's avatar
Da Zheng committed
285
286
287
288

    # Test the computation when only a few nodes are active in a layer.
    g.ndata['h'] = g.ndata['h1']
    for i in range(num_layers):
289
        l = -num_layers + i if use_negative_block_id else i
Da Zheng's avatar
Da Zheng committed
290
        vs = nf.layer_nid(i+1)[0:4]
291
        nf.block_compute(l, fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
292
                         lambda nodes: {'h': nodes.data['t'] + 1}, v=vs)
Da Zheng's avatar
Da Zheng committed
293
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
294
                     lambda nodes: {'h': nodes.data['t'] + 1})
Da Zheng's avatar
Da Zheng committed
295
        data1 = nf.layers[i + 1].data['h'][0:4]
296
        data2 = g.nodes[nf.map_to_parent_nid(vs)].data['h']
VoVAllen's avatar
VoVAllen committed
297
298
299
        assert_allclose(F.asnumpy(data1), F.asnumpy(
            data2), rtol=1e-4, atol=1e-4)

Da Zheng's avatar
Da Zheng committed
300

301
302
303
304
305
306
307
308
309
310
311
312
313
def check_flow_compute1(create_node_flow, use_negative_block_id=False):
    num_layers = 2
    g = generate_rand_graph(100)

    # test the case that we register UDFs per block.
    nf = create_node_flow(g, num_layers)
    nf.copy_from_parent()
    g.ndata['h'] = g.ndata['h1']
    nf.layers[0].data['h'] = nf.layers[0].data['h1']
    for i in range(num_layers):
        l = -num_layers + i if use_negative_block_id else i
        nf.register_message_func(fn.copy_src(src='h', out='m'), l)
        nf.register_reduce_func(fn.sum(msg='m', out='t'), l)
VoVAllen's avatar
VoVAllen committed
314
315
        nf.register_apply_node_func(
            lambda nodes: {'h': nodes.data['t'] + 1}, l)
316
317
        nf.block_compute(l)
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
318
                     lambda nodes: {'h': nodes.data['t'] + 1})
319
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
VoVAllen's avatar
VoVAllen committed
320
321
                        F.asnumpy(
                            g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
322
                        rtol=1e-4, atol=1e-4)
323
324
325
326
327
328
329
330

    # test the case that we register UDFs in all blocks.
    nf = create_node_flow(g, num_layers)
    nf.copy_from_parent()
    g.ndata['h'] = g.ndata['h1']
    nf.layers[0].data['h'] = nf.layers[0].data['h1']
    nf.register_message_func(fn.copy_src(src='h', out='m'))
    nf.register_reduce_func(fn.sum(msg='m', out='t'))
VoVAllen's avatar
VoVAllen committed
331
    nf.register_apply_node_func(lambda nodes: {'h': nodes.data['t'] + 1})
332
333
334
335
    for i in range(num_layers):
        l = -num_layers + i if use_negative_block_id else i
        nf.block_compute(l)
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
336
                     lambda nodes: {'h': nodes.data['t'] + 1})
337
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
VoVAllen's avatar
VoVAllen committed
338
339
                        F.asnumpy(
                            g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
340
341
                        rtol=1e-4, atol=1e-4)

VoVAllen's avatar
VoVAllen committed
342

343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
class SrcMulEdgeMessageFunction(object):
    def __init__(self, src_field, edge_field, out_field):
        self.mul_op = operator.mul
        self.src_field = src_field
        self.edge_field = edge_field
        self.out_field = out_field

    def __call__(self, edges):
        sdata = edges.src[self.src_field]
        edata = edges.data[self.edge_field]
        # Due to the different broadcasting semantics of different backends,
        # we need to broadcast the sdata and edata to be of the same rank.
        rank = max(F.ndim(sdata), F.ndim(edata))
        sshape = F.shape(sdata)
        eshape = F.shape(edata)
        sdata = F.reshape(sdata, sshape + (1,) * (rank - F.ndim(sdata)))
        edata = F.reshape(edata, eshape + (1,) * (rank - F.ndim(edata)))
        ret = self.mul_op(sdata, edata)
VoVAllen's avatar
VoVAllen committed
361
362
        return {self.out_field: ret}

363
364
365
366
367

def check_flow_compute2(create_node_flow):
    num_layers = 2
    g = generate_rand_graph(100)
    g.edata['h'] = F.ones((g.number_of_edges(), 10))
368

369
370
371
372
373
    nf = create_node_flow(g, num_layers)
    nf.copy_from_parent()
    g.ndata['h'] = g.ndata['h1']
    nf.layers[0].data['h'] = nf.layers[0].data['h1']
    for i in range(num_layers):
VoVAllen's avatar
VoVAllen committed
374
375
        nf.block_compute(i, SrcMulEdgeMessageFunction(
            'h', 'h', 't'), fn.sum('t', 'h1'))
376
377
378
379
380
381
        nf.block_compute(i, fn.src_mul_edge('h', 'h', 'h'), fn.sum('h', 'h'))
        g.update_all(fn.src_mul_edge('h', 'h', 'h'), fn.sum('h', 'h'))
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h1']),
                        F.asnumpy(nf.layers[i + 1].data['h']),
                        rtol=1e-4, atol=1e-4)
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
VoVAllen's avatar
VoVAllen committed
382
383
                        F.asnumpy(
                            g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
384
385
386
387
388
389
390
391
392
393
394
                        rtol=1e-4, atol=1e-4)

    nf = create_node_flow(g, num_layers)
    g.ndata['h'] = g.ndata['h1']
    nf.copy_from_parent()
    for i in range(nf.num_layers):
        nf.layers[i].data['h'] = nf.layers[i].data['h1']
    for i in range(num_layers):
        nf.block_compute(i, fn.u_mul_v('h', 'h', 't'), fn.sum('t', 's'))
        g.update_all(fn.u_mul_v('h', 'h', 't'), fn.sum('t', 's'))
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['s']),
VoVAllen's avatar
VoVAllen committed
395
396
                        F.asnumpy(
                            g.nodes[nf.layer_parent_nid(i + 1)].data['s']),
397
                        rtol=1e-4, atol=1e-4)
398

VoVAllen's avatar
VoVAllen committed
399

Da Zheng's avatar
Da Zheng committed
400
def test_flow_compute():
401
    check_flow_compute(create_full_nodeflow)
Da Zheng's avatar
Da Zheng committed
402
    check_flow_compute(create_mini_batch)
403
404
    check_flow_compute(create_full_nodeflow, use_negative_block_id=True)
    check_flow_compute(create_mini_batch, use_negative_block_id=True)
405
406
    check_flow_compute1(create_mini_batch)
    check_flow_compute1(create_mini_batch, use_negative_block_id=True)
407
    check_flow_compute2(create_mini_batch)
Da Zheng's avatar
Da Zheng committed
408
409
410
411
412
413
414
415
416
417
418


def check_prop_flows(create_node_flow):
    num_layers = 2
    g = generate_rand_graph(100)
    g.ndata['h'] = g.ndata['h1']
    nf2 = create_node_flow(g, num_layers)
    nf2.copy_from_parent()
    # Test the computation on a layer at a time.
    for i in range(num_layers):
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
419
                     lambda nodes: {'h': nodes.data['t'] + 1})
Da Zheng's avatar
Da Zheng committed
420
421
422

    # Test the computation on all layers.
    nf2.prop_flow(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
423
                  lambda nodes: {'h': nodes.data['t'] + 1})
424
425
426
    assert_allclose(F.asnumpy(nf2.layers[-1].data['h']),
                    F.asnumpy(g.nodes[nf2.layer_parent_nid(-1)].data['h']),
                    rtol=1e-4, atol=1e-4)
Da Zheng's avatar
Da Zheng committed
427
428
429


def test_prop_flows():
430
    check_prop_flows(create_full_nodeflow)
Da Zheng's avatar
Da Zheng committed
431
432
433
    check_prop_flows(create_mini_batch)


VoVAllen's avatar
VoVAllen committed
434
435
@unittest.skipIf(dgl.backend.backend_name == "tensorflow",
                 reason="TF doesn't support inplace update, nf.copy_to_parent will trigger this")
Da Zheng's avatar
Da Zheng committed
436
437
438
439
440
441
442
443
444
445
def test_copy():
    num_layers = 2
    g = generate_rand_graph(100)
    g.ndata['h'] = g.ndata['h1']
    nf = create_mini_batch(g, num_layers)
    nf.copy_from_parent()
    for i in range(nf.num_layers):
        assert len(g.ndata.keys()) == len(nf.layers[i].data.keys())
        for key in g.ndata.keys():
            assert key in nf.layers[i].data.keys()
446
447
            assert_array_equal(F.asnumpy(nf.layers[i].data[key]),
                               F.asnumpy(g.nodes[nf.layer_parent_nid(i)].data[key]))
Da Zheng's avatar
Da Zheng committed
448
449
450
451
    for i in range(nf.num_blocks):
        assert len(g.edata.keys()) == len(nf.blocks[i].data.keys())
        for key in g.edata.keys():
            assert key in nf.blocks[i].data.keys()
452
453
            assert_array_equal(F.asnumpy(nf.blocks[i].data[key]),
                               F.asnumpy(g.edges[nf.block_parent_eid(i)].data[key]))
Da Zheng's avatar
Da Zheng committed
454
455
456
457

    nf = create_mini_batch(g, num_layers)
    node_embed_names = [['h'], ['h1'], ['h']]
    edge_embed_names = [['h2'], ['h2']]
VoVAllen's avatar
VoVAllen committed
458
459
    nf.copy_from_parent(node_embed_names=node_embed_names,
                        edge_embed_names=edge_embed_names)
Da Zheng's avatar
Da Zheng committed
460
461
462
463
    for i in range(nf.num_layers):
        assert len(node_embed_names[i]) == len(nf.layers[i].data.keys())
        for key in node_embed_names[i]:
            assert key in nf.layers[i].data.keys()
464
465
            assert_array_equal(F.asnumpy(nf.layers[i].data[key]),
                               F.asnumpy(g.nodes[nf.layer_parent_nid(i)].data[key]))
Da Zheng's avatar
Da Zheng committed
466
467
468
469
    for i in range(nf.num_blocks):
        assert len(edge_embed_names[i]) == len(nf.blocks[i].data.keys())
        for key in edge_embed_names[i]:
            assert key in nf.blocks[i].data.keys()
470
471
            assert_array_equal(F.asnumpy(nf.blocks[i].data[key]),
                               F.asnumpy(g.edges[nf.block_parent_eid(i)].data[key]))
Da Zheng's avatar
Da Zheng committed
472
473
474
475

    nf = create_mini_batch(g, num_layers)
    g.ndata['h0'] = F.clone(g.ndata['h'])
    node_embed_names = [['h0'], [], []]
VoVAllen's avatar
VoVAllen committed
476
477
    nf.copy_from_parent(node_embed_names=node_embed_names,
                        edge_embed_names=None)
Da Zheng's avatar
Da Zheng committed
478
479
    for i in range(num_layers):
        nf.block_compute(i, fn.copy_src(src='h%d' % i, out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
480
                         lambda nodes: {'h%d' % (i+1): nodes.data['t'] + 1})
Da Zheng's avatar
Da Zheng committed
481
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
482
                     lambda nodes: {'h': nodes.data['t'] + 1})
483
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h%d' % (i+1)]),
VoVAllen's avatar
VoVAllen committed
484
485
                        F.asnumpy(
                            g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
486
                        rtol=1e-4, atol=1e-4)
Da Zheng's avatar
Da Zheng committed
487
488
    nf.copy_to_parent(node_embed_names=[['h0'], ['h1'], ['h2']])
    for i in range(num_layers + 1):
489
490
        assert_array_equal(F.asnumpy(nf.layers[i].data['h%d' % i]),
                           F.asnumpy(g.nodes[nf.layer_parent_nid(i)].data['h%d' % i]))
Da Zheng's avatar
Da Zheng committed
491
492
493
494
495
496

    nf = create_mini_batch(g, num_layers)
    g.ndata['h0'] = F.clone(g.ndata['h'])
    g.ndata['h1'] = F.clone(g.ndata['h'])
    g.ndata['h2'] = F.clone(g.ndata['h'])
    node_embed_names = [['h0'], ['h1'], ['h2']]
VoVAllen's avatar
VoVAllen committed
497
498
    nf.copy_from_parent(node_embed_names=node_embed_names,
                        edge_embed_names=None)
Da Zheng's avatar
Da Zheng committed
499
500
501

    def msg_func(edge, ind):
        assert 'h%d' % ind in edge.src.keys()
VoVAllen's avatar
VoVAllen committed
502
503
        return {'m': edge.src['h%d' % ind]}

Da Zheng's avatar
Da Zheng committed
504
505
    def reduce_func(node, ind):
        assert 'h%d' % (ind + 1) in node.data.keys()
VoVAllen's avatar
VoVAllen committed
506
        return {'h': F.sum(node.mailbox['m'], 1) + node.data['h%d' % (ind + 1)]}
Da Zheng's avatar
Da Zheng committed
507
508

    for i in range(num_layers):
VoVAllen's avatar
VoVAllen committed
509
510
        nf.block_compute(i, partial(msg_func, ind=i),
                         partial(reduce_func, ind=i))
Da Zheng's avatar
Da Zheng committed
511
512


513
def test_block_edges():
514
515
516
517
518
    num_layers = 3
    g = generate_rand_graph(100)
    nf = create_mini_batch(g, num_layers)
    assert nf.num_layers == num_layers + 1
    for i in range(nf.num_blocks):
519
520
521
522
523
524
525
        dest_nodes = utils.toindex(nf.layer_nid(i + 1))
        src1, dst1, eid1 = nf.in_edges(dest_nodes, 'all')

        src, dst, eid = nf.block_edges(i)
        assert_array_equal(F.asnumpy(src), F.asnumpy(src1))
        assert_array_equal(F.asnumpy(dst), F.asnumpy(dst1))
        assert_array_equal(F.asnumpy(eid), F.asnumpy(eid1))
526

527
        src, dst, eid = nf.block_edges(i, remap_local=True)
528
        # should also work for negative block ids
529
530
531
532
533
        src_by_neg, dst_by_neg, eid_by_neg = nf.block_edges(-nf.num_blocks + i,
                                                            remap_local=True)
        assert_array_equal(F.asnumpy(src), F.asnumpy(src_by_neg))
        assert_array_equal(F.asnumpy(dst), F.asnumpy(dst_by_neg))
        assert_array_equal(F.asnumpy(eid), F.asnumpy(eid_by_neg))
534

535
536
537
538
        src1 = nf._glb2lcl_nid(src1, i)
        dst1 = nf._glb2lcl_nid(dst1, i + 1)
        assert_array_equal(F.asnumpy(src), F.asnumpy(src1))
        assert_array_equal(F.asnumpy(dst), F.asnumpy(dst1))
539

540
541
542
543
544
545
546

def test_block_adj_matrix():
    num_layers = 3
    g = generate_rand_graph(100)
    nf = create_mini_batch(g, num_layers)
    assert nf.num_layers == num_layers + 1
    for i in range(nf.num_blocks):
547
        u, v, _ = nf.block_edges(i, remap_local=True)
548
549
        adj, _ = nf.block_adjacency_matrix(i, F.cpu())
        adj = F.sparse_to_numpy(adj)
550
551
552
553
554

        # should also work for negative block ids
        adj_by_neg, _ = nf.block_adjacency_matrix(-nf.num_blocks + i, F.cpu())
        adj_by_neg = F.sparse_to_numpy(adj_by_neg)

555
556
557
558
559
        data = np.ones((len(u)), dtype=np.float32)
        v = utils.toindex(v)
        u = utils.toindex(u)
        coo = sp.sparse.coo_matrix((data, (v.tonumpy(), u.tonumpy())),
                                   shape=adj.shape).todense()
560
561
        assert_array_equal(adj, coo)
        assert_array_equal(adj_by_neg, coo)
562
563
564
565
566
567
568
569


def test_block_incidence_matrix():
    num_layers = 3
    g = generate_rand_graph(100)
    nf = create_mini_batch(g, num_layers)
    assert nf.num_layers == num_layers + 1
    for i in range(nf.num_blocks):
VoVAllen's avatar
VoVAllen committed
570
        typestrs = ["in", "out"]  # todo need fix for "both"
571
572
573
574
575
576
577
578
579
        adjs = []
        for typestr in typestrs:
            adj, _ = nf.block_incidence_matrix(i, typestr, F.cpu())
            adj = F.sparse_to_numpy(adj)
            adjs.append(adj)

        # should work for negative block ids
        adjs_by_neg = []
        for typestr in typestrs:
VoVAllen's avatar
VoVAllen committed
580
581
            adj_by_neg, _ = nf.block_incidence_matrix(
                -nf.num_blocks + i, typestr, F.cpu())
582
583
584
            adj_by_neg = F.sparse_to_numpy(adj_by_neg)
            adjs_by_neg.append(adj_by_neg)

585
        u, v, e = nf.block_edges(i, remap_local=True)
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
        u = utils.toindex(u)
        v = utils.toindex(v)
        e = utils.toindex(e)

        expected = []
        data_in_and_out = np.ones((len(u)), dtype=np.float32)
        expected.append(
            sp.sparse.coo_matrix((data_in_and_out, (v.tonumpy(), e.tonumpy())),
                                 shape=adjs[0].shape).todense()
        )
        expected.append(
            sp.sparse.coo_matrix((data_in_and_out, (u.tonumpy(), e.tonumpy())),
                                 shape=adjs[1].shape).todense()
        )
        for i in range(len(typestrs)):
601
602
            assert_array_equal(adjs[i], expected[i])
            assert_array_equal(adjs_by_neg[i], expected[i])
603
604


Da Zheng's avatar
Da Zheng committed
605
606
if __name__ == '__main__':
    test_basic()
607
    test_block_adj_matrix()
Da Zheng's avatar
Da Zheng committed
608
609
610
611
612
    test_copy()
    test_apply_nodes()
    test_apply_edges()
    test_flow_compute()
    test_prop_flows()
613
    test_self_loop()
614
    test_block_edges()
615
    test_block_incidence_matrix()