test_nodeflow.py 23.2 KB
Newer Older
Da Zheng's avatar
Da Zheng committed
1
2
import backend as F
import numpy as np
3
from numpy.testing import assert_array_equal, assert_allclose
Da Zheng's avatar
Da Zheng committed
4
import scipy as sp
5
import operator
Da Zheng's avatar
Da Zheng committed
6
import dgl
7
from dgl.contrib.sampling.sampler import create_full_nodeflow, NeighborSampler
Da Zheng's avatar
Da Zheng committed
8
9
10
from dgl import utils
import dgl.function as fn
from functools import partial
11
import itertools
VoVAllen's avatar
VoVAllen committed
12
13
import unittest

14

Da Zheng's avatar
Da Zheng committed
15
def generate_rand_graph(n, connect_more=False, complete=False, add_self_loop=False):
16
    if complete:
VoVAllen's avatar
VoVAllen committed
17
18
        cord = [(i, j)
                for i, j in itertools.product(range(n), range(n)) if i != j]
19
20
21
22
23
        row = [t[0] for t in cord]
        col = [t[1] for t in cord]
        data = np.ones((len(row),))
        arr = sp.sparse.coo_matrix((data, (row, col)), shape=(n, n))
    else:
VoVAllen's avatar
VoVAllen committed
24
25
        arr = (sp.sparse.random(n, n, density=0.1,
                                format='coo') != 0).astype(np.int64)
26
27
28
        # having one node to connect to all other nodes.
        if connect_more:
            arr[0] = 1
VoVAllen's avatar
VoVAllen committed
29
            arr[:, 0] = 1
Da Zheng's avatar
Da Zheng committed
30
    if add_self_loop:
31
        g = dgl.DGLGraphStale(arr, readonly=False)
Da Zheng's avatar
Da Zheng committed
32
33
34
35
        nodes = np.arange(g.number_of_nodes())
        g.add_edges(nodes, nodes)
        g.readonly()
    else:
36
        g = dgl.DGLGraphStale(arr, readonly=True)
Da Zheng's avatar
Da Zheng committed
37
38
39
40
41
    g.ndata['h1'] = F.randn((g.number_of_nodes(), 10))
    g.edata['h2'] = F.randn((g.number_of_edges(), 3))
    return g


42
def create_mini_batch(g, num_hops, add_self_loop=False):
43
    seed_ids = np.array([1, 2, 0, 3])
44
    sampler = NeighborSampler(g, batch_size=4, expand_factor=g.number_of_nodes(),
VoVAllen's avatar
VoVAllen committed
45
                              num_hops=num_hops, seed_nodes=seed_ids, add_self_loop=add_self_loop)
46
47
    nfs = list(sampler)
    assert len(nfs) == 1
48
    assert_array_equal(F.asnumpy(nfs[0].layer_parent_nid(-1)), seed_ids)
49
    return nfs[0]
Da Zheng's avatar
Da Zheng committed
50

VoVAllen's avatar
VoVAllen committed
51

Da Zheng's avatar
Da Zheng committed
52
53
54
55
56
57
58
59
60
def check_basic(g, nf):
    num_nodes = 0
    for i in range(nf.num_layers):
        num_nodes += nf.layer_size(i)
    assert nf.number_of_nodes() == num_nodes
    num_edges = 0
    for i in range(nf.num_blocks):
        num_edges += nf.block_size(i)
    assert nf.number_of_edges() == num_edges
61
62
63
64
65
66
    assert len(nf) == num_nodes
    assert nf.is_readonly

    assert np.all(F.asnumpy(nf.has_nodes(list(range(num_nodes)))))
    for i in range(num_nodes):
        assert nf.has_node(i)
VoVAllen's avatar
VoVAllen committed
67
68
    assert np.all(F.asnumpy(nf.has_nodes(
        list(range(num_nodes, 2 * num_nodes)))) == 0)
69
70
71
72
73
74
    for i in range(num_nodes, 2 * num_nodes):
        assert not nf.has_node(i)

    for block_id in range(nf.num_blocks):
        u, v, eid = nf.block_edges(block_id)
        assert np.all(F.asnumpy(nf.has_edges_between(u, v)))
Da Zheng's avatar
Da Zheng committed
75
76

    deg = nf.layer_in_degree(0)
77
    assert_array_equal(F.asnumpy(deg), np.zeros((nf.layer_size(0)), np.int64))
Da Zheng's avatar
Da Zheng committed
78
    deg = nf.layer_out_degree(-1)
79
80
81
    assert_array_equal(F.asnumpy(deg), np.zeros((nf.layer_size(-1)), np.int64))

    nf.copy_from_parent()
Da Zheng's avatar
Da Zheng committed
82
83
84
85
86
    for i in range(1, nf.num_layers):
        in_deg = nf.layer_in_degree(i)
        out_deg = nf.layer_out_degree(i - 1)
        assert F.asnumpy(F.sum(in_deg, 0) == F.sum(out_deg, 0))

87
88
89
90
91
92
93
94
95
96
97
98
99
100
        nids = nf.layer_nid(i)
        parent_nids = nf.map_to_parent_nid(nids)
        nids1 = nf.map_from_parent_nid(i, parent_nids)
        assert_array_equal(F.asnumpy(nids), F.asnumpy(nids1))

        data = nf.layers[i].data['h1']
        data1 = g.nodes[nf.layer_parent_nid(i)].data['h1']
        assert_array_equal(F.asnumpy(data), F.asnumpy(data1))

    for i in range(nf.num_blocks):
        data = nf.blocks[i].data['h2']
        data1 = g.edges[nf.block_parent_eid(i)].data['h2']
        assert_array_equal(F.asnumpy(data), F.asnumpy(data1))

101
102
103
104
105
106
    # negative layer Ids.
    for i in range(-1, -nf.num_layers, -1):
        in_deg = nf.layer_in_degree(i)
        out_deg = nf.layer_out_degree(i - 1)
        assert F.asnumpy(F.sum(in_deg, 0) == F.sum(out_deg, 0))

Da Zheng's avatar
Da Zheng committed
107
108
109
110

def test_basic():
    num_layers = 2
    g = generate_rand_graph(100, connect_more=True)
111
    nf = create_full_nodeflow(g, num_layers)
Da Zheng's avatar
Da Zheng committed
112
113
114
115
116
117
118
    assert nf.number_of_nodes() == g.number_of_nodes() * (num_layers + 1)
    assert nf.number_of_edges() == g.number_of_edges() * num_layers
    assert nf.num_layers == num_layers + 1
    assert nf.layer_size(0) == g.number_of_nodes()
    assert nf.layer_size(1) == g.number_of_nodes()
    check_basic(g, nf)

119
    parent_nids = F.copy_to(F.arange(0, g.number_of_nodes()), F.cpu())
120
121
    nids = nf.map_from_parent_nid(0, parent_nids, remap_local=True)
    assert_array_equal(F.asnumpy(nids), F.asnumpy(parent_nids))
Da Zheng's avatar
Da Zheng committed
122

123
124
    # should also work for negative layer ids
    for l in range(-1, -num_layers, -1):
125
        nids1 = nf.map_from_parent_nid(l, parent_nids, remap_local=True)
VoVAllen's avatar
VoVAllen committed
126
127
        nids2 = nf.map_from_parent_nid(
            l + num_layers, parent_nids, remap_local=True)
128
        assert_array_equal(F.asnumpy(nids1), F.asnumpy(nids2))
129

Da Zheng's avatar
Da Zheng committed
130
131
132
133
134
    g = generate_rand_graph(100)
    nf = create_mini_batch(g, num_layers)
    assert nf.num_layers == num_layers + 1
    check_basic(g, nf)

Da Zheng's avatar
Da Zheng committed
135
136
137
138
139
    g = generate_rand_graph(100, add_self_loop=True)
    nf = create_mini_batch(g, num_layers, add_self_loop=True)
    assert nf.num_layers == num_layers + 1
    check_basic(g, nf)

Da Zheng's avatar
Da Zheng committed
140

141
def check_apply_nodes(create_node_flow, use_negative_block_id):
Da Zheng's avatar
Da Zheng committed
142
143
    num_layers = 2
    for i in range(num_layers):
144
        l = -num_layers + i if use_negative_block_id else i
Da Zheng's avatar
Da Zheng committed
145
146
147
        g = generate_rand_graph(100)
        nf = create_node_flow(g, num_layers)
        nf.copy_from_parent()
148
        new_feats = F.randn((nf.layer_size(l), 5))
VoVAllen's avatar
VoVAllen committed
149

Da Zheng's avatar
Da Zheng committed
150
        def update_func(nodes):
VoVAllen's avatar
VoVAllen committed
151
            return {'h1': new_feats}
152
        nf.apply_layer(l, update_func)
VoVAllen's avatar
VoVAllen committed
153
154
        assert_array_equal(
            F.asnumpy(nf.layers[l].data['h1']), F.asnumpy(new_feats))
Da Zheng's avatar
Da Zheng committed
155
156

        new_feats = F.randn((4, 5))
VoVAllen's avatar
VoVAllen committed
157

Da Zheng's avatar
Da Zheng committed
158
        def update_func1(nodes):
VoVAllen's avatar
VoVAllen committed
159
            return {'h1': new_feats}
160
        nf.apply_layer(l, update_func1, v=nf.layer_nid(l)[0:4])
VoVAllen's avatar
VoVAllen committed
161
162
        assert_array_equal(
            F.asnumpy(nf.layers[l].data['h1'][0:4]), F.asnumpy(new_feats))
Da Zheng's avatar
Da Zheng committed
163
164
165


def test_apply_nodes():
166
167
168
169
    check_apply_nodes(create_full_nodeflow, use_negative_block_id=False)
    check_apply_nodes(create_mini_batch, use_negative_block_id=False)
    check_apply_nodes(create_full_nodeflow, use_negative_block_id=True)
    check_apply_nodes(create_mini_batch, use_negative_block_id=True)
Da Zheng's avatar
Da Zheng committed
170
171
172
173
174
175


def check_apply_edges(create_node_flow):
    num_layers = 2
    for i in range(num_layers):
        g = generate_rand_graph(100)
176
        g.ndata["f"] = F.randn((100, 10))
Da Zheng's avatar
Da Zheng committed
177
178
179
        nf = create_node_flow(g, num_layers)
        nf.copy_from_parent()
        new_feats = F.randn((nf.block_size(i), 5))
180
181
182
183

        def update_func(edges):
            return {'h2': new_feats, "f2": edges.src["f"] + edges.dst["f"]}

Da Zheng's avatar
Da Zheng committed
184
        nf.apply_block(i, update_func)
VoVAllen's avatar
VoVAllen committed
185
186
        assert_array_equal(
            F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
Da Zheng's avatar
Da Zheng committed
187

188
189
        # should also work for negative block ids
        nf.apply_block(-num_layers + i, update_func)
VoVAllen's avatar
VoVAllen committed
190
191
        assert_array_equal(
            F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
192

193
194
        eids = nf.block_parent_eid(i)
        srcs, dsts = g.find_edges(eids)
195
        expected_f_sum = g.nodes[srcs].data["f"] + g.nodes[dsts].data["f"]
VoVAllen's avatar
VoVAllen committed
196
197
        assert_array_equal(
            F.asnumpy(nf.blocks[i].data['f2']), F.asnumpy(expected_f_sum))
198

199
200
201
202
203
204
205
        # test built-in
        nf.apply_block(i, fn.u_add_v('f', 'f', 'f2'))
        eids = nf.block_parent_eid(i)
        srcs, dsts = g.find_edges(eids)
        expected_f_sum = g.nodes[srcs].data["f"] + g.nodes[dsts].data["f"]
        assert_array_equal(
        F.asnumpy(nf.blocks[i].data['f2']), F.asnumpy(expected_f_sum))
Da Zheng's avatar
Da Zheng committed
206

207
208
209
210
211
212
213
214
215
216
217
218
219
220
def check_apply_edges1(create_node_flow):
    num_layers = 2
    for i in range(num_layers):
        g = generate_rand_graph(100)
        g.ndata["f"] = F.randn((100, 10))
        nf = create_node_flow(g, num_layers)
        nf.copy_from_parent()
        new_feats = F.randn((nf.block_size(i), 5))

        def update_func(edges):
            return {'h2': new_feats, "f2": edges.src["f"] + edges.dst["f"]}

        nf.register_apply_edge_func(update_func, i)
        nf.apply_block(i)
VoVAllen's avatar
VoVAllen committed
221
222
        assert_array_equal(
            F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
223
224
225
226

        # should also work for negative block ids
        nf.register_apply_edge_func(update_func, -num_layers + i)
        nf.apply_block(-num_layers + i)
VoVAllen's avatar
VoVAllen committed
227
228
        assert_array_equal(
            F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
229
230
231
232
233

        eids = nf.block_parent_eid(i)
        srcs, dsts = g.find_edges(eids)
        expected_f_sum = g.nodes[srcs].data["f"] + g.nodes[dsts].data["f"]
        #expected_f_sum = g.ndata["f"][srcs] + g.ndata["f"][dsts]
VoVAllen's avatar
VoVAllen committed
234
235
        assert_array_equal(
            F.asnumpy(nf.blocks[i].data['f2']), F.asnumpy(expected_f_sum))
236
237


Da Zheng's avatar
Da Zheng committed
238
def test_apply_edges():
239
    check_apply_edges(create_full_nodeflow)
Da Zheng's avatar
Da Zheng committed
240
    check_apply_edges(create_mini_batch)
241
    check_apply_edges1(create_mini_batch)
Da Zheng's avatar
Da Zheng committed
242
243


244
def check_flow_compute(create_node_flow, use_negative_block_id=False):
Da Zheng's avatar
Da Zheng committed
245
246
247
248
249
250
251
252
    num_layers = 2
    g = generate_rand_graph(100)
    nf = create_node_flow(g, num_layers)
    nf.copy_from_parent()
    g.ndata['h'] = g.ndata['h1']
    nf.layers[0].data['h'] = nf.layers[0].data['h1']
    # Test the computation on a layer at a time.
    for i in range(num_layers):
253
254
        l = -num_layers + i if use_negative_block_id else i
        nf.block_compute(l, fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
255
                         lambda nodes: {'h': nodes.data['t'] + 1})
Da Zheng's avatar
Da Zheng committed
256
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
257
                     lambda nodes: {'h': nodes.data['t'] + 1})
258
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
VoVAllen's avatar
VoVAllen committed
259
260
                        F.asnumpy(
                            g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
261
                        rtol=1e-4, atol=1e-4)
Da Zheng's avatar
Da Zheng committed
262
263
264
265

    # Test the computation when only a few nodes are active in a layer.
    g.ndata['h'] = g.ndata['h1']
    for i in range(num_layers):
266
        l = -num_layers + i if use_negative_block_id else i
Da Zheng's avatar
Da Zheng committed
267
        vs = nf.layer_nid(i+1)[0:4]
268
        nf.block_compute(l, fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
269
                         lambda nodes: {'h': nodes.data['t'] + 1}, v=vs)
Da Zheng's avatar
Da Zheng committed
270
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
271
                     lambda nodes: {'h': nodes.data['t'] + 1})
Da Zheng's avatar
Da Zheng committed
272
        data1 = nf.layers[i + 1].data['h'][0:4]
273
        data2 = g.nodes[nf.map_to_parent_nid(vs)].data['h']
VoVAllen's avatar
VoVAllen committed
274
275
276
        assert_allclose(F.asnumpy(data1), F.asnumpy(
            data2), rtol=1e-4, atol=1e-4)

Da Zheng's avatar
Da Zheng committed
277

278
279
280
281
282
283
284
285
286
287
288
289
290
def check_flow_compute1(create_node_flow, use_negative_block_id=False):
    num_layers = 2
    g = generate_rand_graph(100)

    # test the case that we register UDFs per block.
    nf = create_node_flow(g, num_layers)
    nf.copy_from_parent()
    g.ndata['h'] = g.ndata['h1']
    nf.layers[0].data['h'] = nf.layers[0].data['h1']
    for i in range(num_layers):
        l = -num_layers + i if use_negative_block_id else i
        nf.register_message_func(fn.copy_src(src='h', out='m'), l)
        nf.register_reduce_func(fn.sum(msg='m', out='t'), l)
VoVAllen's avatar
VoVAllen committed
291
292
        nf.register_apply_node_func(
            lambda nodes: {'h': nodes.data['t'] + 1}, l)
293
294
        nf.block_compute(l)
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
295
                     lambda nodes: {'h': nodes.data['t'] + 1})
296
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
VoVAllen's avatar
VoVAllen committed
297
298
                        F.asnumpy(
                            g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
299
                        rtol=1e-4, atol=1e-4)
300
301
302
303
304
305
306
307

    # test the case that we register UDFs in all blocks.
    nf = create_node_flow(g, num_layers)
    nf.copy_from_parent()
    g.ndata['h'] = g.ndata['h1']
    nf.layers[0].data['h'] = nf.layers[0].data['h1']
    nf.register_message_func(fn.copy_src(src='h', out='m'))
    nf.register_reduce_func(fn.sum(msg='m', out='t'))
VoVAllen's avatar
VoVAllen committed
308
    nf.register_apply_node_func(lambda nodes: {'h': nodes.data['t'] + 1})
309
310
311
312
    for i in range(num_layers):
        l = -num_layers + i if use_negative_block_id else i
        nf.block_compute(l)
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
313
                     lambda nodes: {'h': nodes.data['t'] + 1})
314
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
VoVAllen's avatar
VoVAllen committed
315
316
                        F.asnumpy(
                            g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
317
318
                        rtol=1e-4, atol=1e-4)

VoVAllen's avatar
VoVAllen committed
319

320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
class SrcMulEdgeMessageFunction(object):
    def __init__(self, src_field, edge_field, out_field):
        self.mul_op = operator.mul
        self.src_field = src_field
        self.edge_field = edge_field
        self.out_field = out_field

    def __call__(self, edges):
        sdata = edges.src[self.src_field]
        edata = edges.data[self.edge_field]
        # Due to the different broadcasting semantics of different backends,
        # we need to broadcast the sdata and edata to be of the same rank.
        rank = max(F.ndim(sdata), F.ndim(edata))
        sshape = F.shape(sdata)
        eshape = F.shape(edata)
        sdata = F.reshape(sdata, sshape + (1,) * (rank - F.ndim(sdata)))
        edata = F.reshape(edata, eshape + (1,) * (rank - F.ndim(edata)))
        ret = self.mul_op(sdata, edata)
VoVAllen's avatar
VoVAllen committed
338
339
        return {self.out_field: ret}

340
341
342
343
344

def check_flow_compute2(create_node_flow):
    num_layers = 2
    g = generate_rand_graph(100)
    g.edata['h'] = F.ones((g.number_of_edges(), 10))
345

346
347
348
349
350
    nf = create_node_flow(g, num_layers)
    nf.copy_from_parent()
    g.ndata['h'] = g.ndata['h1']
    nf.layers[0].data['h'] = nf.layers[0].data['h1']
    for i in range(num_layers):
VoVAllen's avatar
VoVAllen committed
351
352
        nf.block_compute(i, SrcMulEdgeMessageFunction(
            'h', 'h', 't'), fn.sum('t', 'h1'))
353
354
355
356
357
358
        nf.block_compute(i, fn.src_mul_edge('h', 'h', 'h'), fn.sum('h', 'h'))
        g.update_all(fn.src_mul_edge('h', 'h', 'h'), fn.sum('h', 'h'))
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h1']),
                        F.asnumpy(nf.layers[i + 1].data['h']),
                        rtol=1e-4, atol=1e-4)
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
VoVAllen's avatar
VoVAllen committed
359
360
                        F.asnumpy(
                            g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
361
362
363
364
365
366
367
368
369
370
371
                        rtol=1e-4, atol=1e-4)

    nf = create_node_flow(g, num_layers)
    g.ndata['h'] = g.ndata['h1']
    nf.copy_from_parent()
    for i in range(nf.num_layers):
        nf.layers[i].data['h'] = nf.layers[i].data['h1']
    for i in range(num_layers):
        nf.block_compute(i, fn.u_mul_v('h', 'h', 't'), fn.sum('t', 's'))
        g.update_all(fn.u_mul_v('h', 'h', 't'), fn.sum('t', 's'))
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['s']),
VoVAllen's avatar
VoVAllen committed
372
373
                        F.asnumpy(
                            g.nodes[nf.layer_parent_nid(i + 1)].data['s']),
374
                        rtol=1e-4, atol=1e-4)
375

VoVAllen's avatar
VoVAllen committed
376

Da Zheng's avatar
Da Zheng committed
377
def test_flow_compute():
378
    check_flow_compute(create_full_nodeflow)
Da Zheng's avatar
Da Zheng committed
379
    check_flow_compute(create_mini_batch)
380
381
    check_flow_compute(create_full_nodeflow, use_negative_block_id=True)
    check_flow_compute(create_mini_batch, use_negative_block_id=True)
382
383
    check_flow_compute1(create_mini_batch)
    check_flow_compute1(create_mini_batch, use_negative_block_id=True)
384
    check_flow_compute2(create_mini_batch)
Da Zheng's avatar
Da Zheng committed
385
386
387
388
389
390
391
392
393
394
395


def check_prop_flows(create_node_flow):
    num_layers = 2
    g = generate_rand_graph(100)
    g.ndata['h'] = g.ndata['h1']
    nf2 = create_node_flow(g, num_layers)
    nf2.copy_from_parent()
    # Test the computation on a layer at a time.
    for i in range(num_layers):
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
396
                     lambda nodes: {'h': nodes.data['t'] + 1})
Da Zheng's avatar
Da Zheng committed
397
398
399

    # Test the computation on all layers.
    nf2.prop_flow(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
400
                  lambda nodes: {'h': nodes.data['t'] + 1})
401
402
403
    assert_allclose(F.asnumpy(nf2.layers[-1].data['h']),
                    F.asnumpy(g.nodes[nf2.layer_parent_nid(-1)].data['h']),
                    rtol=1e-4, atol=1e-4)
Da Zheng's avatar
Da Zheng committed
404
405
406


def test_prop_flows():
407
    check_prop_flows(create_full_nodeflow)
Da Zheng's avatar
Da Zheng committed
408
409
410
    check_prop_flows(create_mini_batch)


VoVAllen's avatar
VoVAllen committed
411
412
@unittest.skipIf(dgl.backend.backend_name == "tensorflow",
                 reason="TF doesn't support inplace update, nf.copy_to_parent will trigger this")
Da Zheng's avatar
Da Zheng committed
413
414
415
416
417
418
419
420
421
422
def test_copy():
    num_layers = 2
    g = generate_rand_graph(100)
    g.ndata['h'] = g.ndata['h1']
    nf = create_mini_batch(g, num_layers)
    nf.copy_from_parent()
    for i in range(nf.num_layers):
        assert len(g.ndata.keys()) == len(nf.layers[i].data.keys())
        for key in g.ndata.keys():
            assert key in nf.layers[i].data.keys()
423
424
            assert_array_equal(F.asnumpy(nf.layers[i].data[key]),
                               F.asnumpy(g.nodes[nf.layer_parent_nid(i)].data[key]))
Da Zheng's avatar
Da Zheng committed
425
426
427
428
    for i in range(nf.num_blocks):
        assert len(g.edata.keys()) == len(nf.blocks[i].data.keys())
        for key in g.edata.keys():
            assert key in nf.blocks[i].data.keys()
429
430
            assert_array_equal(F.asnumpy(nf.blocks[i].data[key]),
                               F.asnumpy(g.edges[nf.block_parent_eid(i)].data[key]))
Da Zheng's avatar
Da Zheng committed
431
432
433
434

    nf = create_mini_batch(g, num_layers)
    node_embed_names = [['h'], ['h1'], ['h']]
    edge_embed_names = [['h2'], ['h2']]
VoVAllen's avatar
VoVAllen committed
435
436
    nf.copy_from_parent(node_embed_names=node_embed_names,
                        edge_embed_names=edge_embed_names)
Da Zheng's avatar
Da Zheng committed
437
438
439
440
    for i in range(nf.num_layers):
        assert len(node_embed_names[i]) == len(nf.layers[i].data.keys())
        for key in node_embed_names[i]:
            assert key in nf.layers[i].data.keys()
441
442
            assert_array_equal(F.asnumpy(nf.layers[i].data[key]),
                               F.asnumpy(g.nodes[nf.layer_parent_nid(i)].data[key]))
Da Zheng's avatar
Da Zheng committed
443
444
445
446
    for i in range(nf.num_blocks):
        assert len(edge_embed_names[i]) == len(nf.blocks[i].data.keys())
        for key in edge_embed_names[i]:
            assert key in nf.blocks[i].data.keys()
447
448
            assert_array_equal(F.asnumpy(nf.blocks[i].data[key]),
                               F.asnumpy(g.edges[nf.block_parent_eid(i)].data[key]))
Da Zheng's avatar
Da Zheng committed
449
450
451
452

    nf = create_mini_batch(g, num_layers)
    g.ndata['h0'] = F.clone(g.ndata['h'])
    node_embed_names = [['h0'], [], []]
VoVAllen's avatar
VoVAllen committed
453
454
    nf.copy_from_parent(node_embed_names=node_embed_names,
                        edge_embed_names=None)
Da Zheng's avatar
Da Zheng committed
455
456
    for i in range(num_layers):
        nf.block_compute(i, fn.copy_src(src='h%d' % i, out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
457
                         lambda nodes: {'h%d' % (i+1): nodes.data['t'] + 1})
Da Zheng's avatar
Da Zheng committed
458
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
VoVAllen's avatar
VoVAllen committed
459
                     lambda nodes: {'h': nodes.data['t'] + 1})
460
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h%d' % (i+1)]),
VoVAllen's avatar
VoVAllen committed
461
462
                        F.asnumpy(
                            g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
463
                        rtol=1e-4, atol=1e-4)
Da Zheng's avatar
Da Zheng committed
464
465
    nf.copy_to_parent(node_embed_names=[['h0'], ['h1'], ['h2']])
    for i in range(num_layers + 1):
466
467
        assert_array_equal(F.asnumpy(nf.layers[i].data['h%d' % i]),
                           F.asnumpy(g.nodes[nf.layer_parent_nid(i)].data['h%d' % i]))
Da Zheng's avatar
Da Zheng committed
468
469
470
471
472
473

    nf = create_mini_batch(g, num_layers)
    g.ndata['h0'] = F.clone(g.ndata['h'])
    g.ndata['h1'] = F.clone(g.ndata['h'])
    g.ndata['h2'] = F.clone(g.ndata['h'])
    node_embed_names = [['h0'], ['h1'], ['h2']]
VoVAllen's avatar
VoVAllen committed
474
475
    nf.copy_from_parent(node_embed_names=node_embed_names,
                        edge_embed_names=None)
Da Zheng's avatar
Da Zheng committed
476
477
478

    def msg_func(edge, ind):
        assert 'h%d' % ind in edge.src.keys()
VoVAllen's avatar
VoVAllen committed
479
480
        return {'m': edge.src['h%d' % ind]}

Da Zheng's avatar
Da Zheng committed
481
482
    def reduce_func(node, ind):
        assert 'h%d' % (ind + 1) in node.data.keys()
VoVAllen's avatar
VoVAllen committed
483
        return {'h': F.sum(node.mailbox['m'], 1) + node.data['h%d' % (ind + 1)]}
Da Zheng's avatar
Da Zheng committed
484
485

    for i in range(num_layers):
VoVAllen's avatar
VoVAllen committed
486
487
        nf.block_compute(i, partial(msg_func, ind=i),
                         partial(reduce_func, ind=i))
Da Zheng's avatar
Da Zheng committed
488
489


490
def test_block_edges():
491
492
493
494
495
    num_layers = 3
    g = generate_rand_graph(100)
    nf = create_mini_batch(g, num_layers)
    assert nf.num_layers == num_layers + 1
    for i in range(nf.num_blocks):
496
497
498
499
500
501
502
        dest_nodes = utils.toindex(nf.layer_nid(i + 1))
        src1, dst1, eid1 = nf.in_edges(dest_nodes, 'all')

        src, dst, eid = nf.block_edges(i)
        assert_array_equal(F.asnumpy(src), F.asnumpy(src1))
        assert_array_equal(F.asnumpy(dst), F.asnumpy(dst1))
        assert_array_equal(F.asnumpy(eid), F.asnumpy(eid1))
503

504
        src, dst, eid = nf.block_edges(i, remap_local=True)
505
        # should also work for negative block ids
506
507
508
509
510
        src_by_neg, dst_by_neg, eid_by_neg = nf.block_edges(-nf.num_blocks + i,
                                                            remap_local=True)
        assert_array_equal(F.asnumpy(src), F.asnumpy(src_by_neg))
        assert_array_equal(F.asnumpy(dst), F.asnumpy(dst_by_neg))
        assert_array_equal(F.asnumpy(eid), F.asnumpy(eid_by_neg))
511

512
513
514
515
        src1 = nf._glb2lcl_nid(src1, i)
        dst1 = nf._glb2lcl_nid(dst1, i + 1)
        assert_array_equal(F.asnumpy(src), F.asnumpy(src1))
        assert_array_equal(F.asnumpy(dst), F.asnumpy(dst1))
516

517
518
519
520
521
522
523

def test_block_adj_matrix():
    num_layers = 3
    g = generate_rand_graph(100)
    nf = create_mini_batch(g, num_layers)
    assert nf.num_layers == num_layers + 1
    for i in range(nf.num_blocks):
524
        u, v, _ = nf.block_edges(i, remap_local=True)
525
526
        adj, _ = nf.block_adjacency_matrix(i, F.cpu())
        adj = F.sparse_to_numpy(adj)
527
528
529
530
531

        # should also work for negative block ids
        adj_by_neg, _ = nf.block_adjacency_matrix(-nf.num_blocks + i, F.cpu())
        adj_by_neg = F.sparse_to_numpy(adj_by_neg)

532
533
534
535
536
        data = np.ones((len(u)), dtype=np.float32)
        v = utils.toindex(v)
        u = utils.toindex(u)
        coo = sp.sparse.coo_matrix((data, (v.tonumpy(), u.tonumpy())),
                                   shape=adj.shape).todense()
537
538
        assert_array_equal(adj, coo)
        assert_array_equal(adj_by_neg, coo)
539
540
541
542
543
544
545
546


def test_block_incidence_matrix():
    num_layers = 3
    g = generate_rand_graph(100)
    nf = create_mini_batch(g, num_layers)
    assert nf.num_layers == num_layers + 1
    for i in range(nf.num_blocks):
VoVAllen's avatar
VoVAllen committed
547
        typestrs = ["in", "out"]  # todo need fix for "both"
548
549
550
551
552
553
554
555
556
        adjs = []
        for typestr in typestrs:
            adj, _ = nf.block_incidence_matrix(i, typestr, F.cpu())
            adj = F.sparse_to_numpy(adj)
            adjs.append(adj)

        # should work for negative block ids
        adjs_by_neg = []
        for typestr in typestrs:
VoVAllen's avatar
VoVAllen committed
557
558
            adj_by_neg, _ = nf.block_incidence_matrix(
                -nf.num_blocks + i, typestr, F.cpu())
559
560
561
            adj_by_neg = F.sparse_to_numpy(adj_by_neg)
            adjs_by_neg.append(adj_by_neg)

562
        u, v, e = nf.block_edges(i, remap_local=True)
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
        u = utils.toindex(u)
        v = utils.toindex(v)
        e = utils.toindex(e)

        expected = []
        data_in_and_out = np.ones((len(u)), dtype=np.float32)
        expected.append(
            sp.sparse.coo_matrix((data_in_and_out, (v.tonumpy(), e.tonumpy())),
                                 shape=adjs[0].shape).todense()
        )
        expected.append(
            sp.sparse.coo_matrix((data_in_and_out, (u.tonumpy(), e.tonumpy())),
                                 shape=adjs[1].shape).todense()
        )
        for i in range(len(typestrs)):
578
579
            assert_array_equal(adjs[i], expected[i])
            assert_array_equal(adjs_by_neg[i], expected[i])
580
581


Da Zheng's avatar
Da Zheng committed
582
583
if __name__ == '__main__':
    test_basic()
584
    test_block_adj_matrix()
Da Zheng's avatar
Da Zheng committed
585
586
587
588
589
    test_copy()
    test_apply_nodes()
    test_apply_edges()
    test_flow_compute()
    test_prop_flows()
590
    test_self_loop()
591
    test_block_edges()
592
    test_block_incidence_matrix()