test_nodeflow.py 23.1 KB
Newer Older
Da Zheng's avatar
Da Zheng committed
1
2
import backend as F
import numpy as np
3
from numpy.testing import assert_array_equal, assert_allclose
Da Zheng's avatar
Da Zheng committed
4
import scipy as sp
5
import operator
Da Zheng's avatar
Da Zheng committed
6
import dgl
7
from dgl.contrib.sampling.sampler import create_full_nodeflow, NeighborSampler
Da Zheng's avatar
Da Zheng committed
8
9
10
from dgl import utils
import dgl.function as fn
from functools import partial
11
12
import itertools

Da Zheng's avatar
Da Zheng committed
13
def generate_rand_graph(n, connect_more=False, complete=False, add_self_loop=False):
14
15
16
17
18
19
20
21
22
23
24
25
    if complete:
        cord = [(i,j) for i, j in itertools.product(range(n), range(n)) if i != j]
        row = [t[0] for t in cord]
        col = [t[1] for t in cord]
        data = np.ones((len(row),))
        arr = sp.sparse.coo_matrix((data, (row, col)), shape=(n, n))
    else:
        arr = (sp.sparse.random(n, n, density=0.1, format='coo') != 0).astype(np.int64)
        # having one node to connect to all other nodes.
        if connect_more:
            arr[0] = 1
            arr[:,0] = 1
Da Zheng's avatar
Da Zheng committed
26
27
28
29
30
31
32
    if add_self_loop:
        g = dgl.DGLGraph(arr, readonly=False)
        nodes = np.arange(g.number_of_nodes())
        g.add_edges(nodes, nodes)
        g.readonly()
    else:
        g = dgl.DGLGraph(arr, readonly=True)
Da Zheng's avatar
Da Zheng committed
33
34
35
36
37
    g.ndata['h1'] = F.randn((g.number_of_nodes(), 10))
    g.edata['h2'] = F.randn((g.number_of_edges(), 3))
    return g


38
39
40
41
42
43
44
def test_self_loop():
    n = 100
    num_hops = 2
    g = generate_rand_graph(n, complete=True)
    nf = create_mini_batch(g, num_hops, add_self_loop=True)
    for i in range(1, nf.num_layers):
        in_deg = nf.layer_in_degree(i)
45
        deg = F.copy_to(F.ones(in_deg.shape, dtype=F.int64), F.cpu()) * n
46
        assert_array_equal(F.asnumpy(in_deg), F.asnumpy(deg))
47

Da Zheng's avatar
Da Zheng committed
48
49
50
51
52
53
54
55
56
57
58
59
    g = generate_rand_graph(n, complete=True, add_self_loop=True)
    g = dgl.to_simple_graph(g)
    nf = create_mini_batch(g, num_hops, add_self_loop=True)
    for i in range(nf.num_blocks):
        parent_eid = F.asnumpy(nf.block_parent_eid(i))
        parent_nid = F.asnumpy(nf.layer_parent_nid(i + 1))
        # The loop eid in the parent graph must exist in the block parent eid.
        parent_loop_eid = F.asnumpy(g.edge_ids(parent_nid, parent_nid))
        assert len(parent_loop_eid) == len(parent_nid)
        for eid in parent_loop_eid:
            assert eid in parent_eid

60
def create_mini_batch(g, num_hops, add_self_loop=False):
61
    seed_ids = np.array([1, 2, 0, 3])
62
63
64
65
    sampler = NeighborSampler(g, batch_size=4, expand_factor=g.number_of_nodes(),
            num_hops=num_hops, seed_nodes=seed_ids, add_self_loop=add_self_loop)
    nfs = list(sampler)
    assert len(nfs) == 1
66
    assert_array_equal(F.asnumpy(nfs[0].layer_parent_nid(-1)), seed_ids)
67
    return nfs[0]
Da Zheng's avatar
Da Zheng committed
68
69
70
71
72
73
74
75
76
77

def check_basic(g, nf):
    num_nodes = 0
    for i in range(nf.num_layers):
        num_nodes += nf.layer_size(i)
    assert nf.number_of_nodes() == num_nodes
    num_edges = 0
    for i in range(nf.num_blocks):
        num_edges += nf.block_size(i)
    assert nf.number_of_edges() == num_edges
78
79
80
81
82
83
84
85
86
87
88
89
90
    assert len(nf) == num_nodes
    assert nf.is_readonly

    assert np.all(F.asnumpy(nf.has_nodes(list(range(num_nodes)))))
    for i in range(num_nodes):
        assert nf.has_node(i)
    assert np.all(F.asnumpy(nf.has_nodes(list(range(num_nodes, 2 * num_nodes)))) == 0)
    for i in range(num_nodes, 2 * num_nodes):
        assert not nf.has_node(i)

    for block_id in range(nf.num_blocks):
        u, v, eid = nf.block_edges(block_id)
        assert np.all(F.asnumpy(nf.has_edges_between(u, v)))
Da Zheng's avatar
Da Zheng committed
91
92

    deg = nf.layer_in_degree(0)
93
    assert_array_equal(F.asnumpy(deg), np.zeros((nf.layer_size(0)), np.int64))
Da Zheng's avatar
Da Zheng committed
94
    deg = nf.layer_out_degree(-1)
95
96
97
    assert_array_equal(F.asnumpy(deg), np.zeros((nf.layer_size(-1)), np.int64))

    nf.copy_from_parent()
Da Zheng's avatar
Da Zheng committed
98
99
100
101
102
    for i in range(1, nf.num_layers):
        in_deg = nf.layer_in_degree(i)
        out_deg = nf.layer_out_degree(i - 1)
        assert F.asnumpy(F.sum(in_deg, 0) == F.sum(out_deg, 0))

103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
        nids = nf.layer_nid(i)
        parent_nids = nf.map_to_parent_nid(nids)
        nids1 = nf.map_from_parent_nid(i, parent_nids)
        assert_array_equal(F.asnumpy(nids), F.asnumpy(nids1))

        data = nf.layers[i].data['h1']
        data1 = g.nodes[nf.layer_parent_nid(i)].data['h1']
        assert_array_equal(F.asnumpy(data), F.asnumpy(data1))

    for i in range(nf.num_blocks):
        data = nf.blocks[i].data['h2']
        data1 = g.edges[nf.block_parent_eid(i)].data['h2']
        assert_array_equal(F.asnumpy(data), F.asnumpy(data1))


118
119
120
121
122
123
    # negative layer Ids.
    for i in range(-1, -nf.num_layers, -1):
        in_deg = nf.layer_in_degree(i)
        out_deg = nf.layer_out_degree(i - 1)
        assert F.asnumpy(F.sum(in_deg, 0) == F.sum(out_deg, 0))

Da Zheng's avatar
Da Zheng committed
124
125
126
127

def test_basic():
    num_layers = 2
    g = generate_rand_graph(100, connect_more=True)
128
    nf = create_full_nodeflow(g, num_layers)
Da Zheng's avatar
Da Zheng committed
129
130
131
132
133
134
135
    assert nf.number_of_nodes() == g.number_of_nodes() * (num_layers + 1)
    assert nf.number_of_edges() == g.number_of_edges() * num_layers
    assert nf.num_layers == num_layers + 1
    assert nf.layer_size(0) == g.number_of_nodes()
    assert nf.layer_size(1) == g.number_of_nodes()
    check_basic(g, nf)

136
    parent_nids = F.copy_to(F.arange(0, g.number_of_nodes()), F.cpu())
137
138
    nids = nf.map_from_parent_nid(0, parent_nids, remap_local=True)
    assert_array_equal(F.asnumpy(nids), F.asnumpy(parent_nids))
Da Zheng's avatar
Da Zheng committed
139

140
141
    # should also work for negative layer ids
    for l in range(-1, -num_layers, -1):
142
143
144
        nids1 = nf.map_from_parent_nid(l, parent_nids, remap_local=True)
        nids2 = nf.map_from_parent_nid(l + num_layers, parent_nids, remap_local=True)
        assert_array_equal(F.asnumpy(nids1), F.asnumpy(nids2))
145

Da Zheng's avatar
Da Zheng committed
146
147
148
149
150
    g = generate_rand_graph(100)
    nf = create_mini_batch(g, num_layers)
    assert nf.num_layers == num_layers + 1
    check_basic(g, nf)

Da Zheng's avatar
Da Zheng committed
151
152
153
154
155
    g = generate_rand_graph(100, add_self_loop=True)
    nf = create_mini_batch(g, num_layers, add_self_loop=True)
    assert nf.num_layers == num_layers + 1
    check_basic(g, nf)

Da Zheng's avatar
Da Zheng committed
156

157
def check_apply_nodes(create_node_flow, use_negative_block_id):
Da Zheng's avatar
Da Zheng committed
158
159
    num_layers = 2
    for i in range(num_layers):
160
        l = -num_layers + i if use_negative_block_id else i
Da Zheng's avatar
Da Zheng committed
161
162
163
        g = generate_rand_graph(100)
        nf = create_node_flow(g, num_layers)
        nf.copy_from_parent()
164
        new_feats = F.randn((nf.layer_size(l), 5))
Da Zheng's avatar
Da Zheng committed
165
166
        def update_func(nodes):
            return {'h1' : new_feats}
167
        nf.apply_layer(l, update_func)
168
        assert_array_equal(F.asnumpy(nf.layers[l].data['h1']), F.asnumpy(new_feats))
Da Zheng's avatar
Da Zheng committed
169
170
171
172

        new_feats = F.randn((4, 5))
        def update_func1(nodes):
            return {'h1' : new_feats}
173
        nf.apply_layer(l, update_func1, v=nf.layer_nid(l)[0:4])
174
        assert_array_equal(F.asnumpy(nf.layers[l].data['h1'][0:4]), F.asnumpy(new_feats))
Da Zheng's avatar
Da Zheng committed
175
176
177


def test_apply_nodes():
178
179
180
181
    check_apply_nodes(create_full_nodeflow, use_negative_block_id=False)
    check_apply_nodes(create_mini_batch, use_negative_block_id=False)
    check_apply_nodes(create_full_nodeflow, use_negative_block_id=True)
    check_apply_nodes(create_mini_batch, use_negative_block_id=True)
Da Zheng's avatar
Da Zheng committed
182
183
184
185
186
187


def check_apply_edges(create_node_flow):
    num_layers = 2
    for i in range(num_layers):
        g = generate_rand_graph(100)
188
        g.ndata["f"] = F.randn((100, 10))
Da Zheng's avatar
Da Zheng committed
189
190
191
        nf = create_node_flow(g, num_layers)
        nf.copy_from_parent()
        new_feats = F.randn((nf.block_size(i), 5))
192
193
194
195

        def update_func(edges):
            return {'h2': new_feats, "f2": edges.src["f"] + edges.dst["f"]}

Da Zheng's avatar
Da Zheng committed
196
        nf.apply_block(i, update_func)
197
        assert_array_equal(F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
Da Zheng's avatar
Da Zheng committed
198

199
200
        # should also work for negative block ids
        nf.apply_block(-num_layers + i, update_func)
201
        assert_array_equal(F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
202

203
204
        eids = nf.block_parent_eid(i)
        srcs, dsts = g.find_edges(eids)
205
        expected_f_sum = g.nodes[srcs].data["f"] + g.nodes[dsts].data["f"]
206
        assert_array_equal(F.asnumpy(nf.blocks[i].data['f2']), F.asnumpy(expected_f_sum))
207

Da Zheng's avatar
Da Zheng committed
208

209
210
211
212
213
214
215
216
217
218
219
220
221
222
def check_apply_edges1(create_node_flow):
    num_layers = 2
    for i in range(num_layers):
        g = generate_rand_graph(100)
        g.ndata["f"] = F.randn((100, 10))
        nf = create_node_flow(g, num_layers)
        nf.copy_from_parent()
        new_feats = F.randn((nf.block_size(i), 5))

        def update_func(edges):
            return {'h2': new_feats, "f2": edges.src["f"] + edges.dst["f"]}

        nf.register_apply_edge_func(update_func, i)
        nf.apply_block(i)
223
        assert_array_equal(F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
224
225
226
227

        # should also work for negative block ids
        nf.register_apply_edge_func(update_func, -num_layers + i)
        nf.apply_block(-num_layers + i)
228
        assert_array_equal(F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
229
230
231
232
233

        eids = nf.block_parent_eid(i)
        srcs, dsts = g.find_edges(eids)
        expected_f_sum = g.nodes[srcs].data["f"] + g.nodes[dsts].data["f"]
        #expected_f_sum = g.ndata["f"][srcs] + g.ndata["f"][dsts]
234
        assert_array_equal(F.asnumpy(nf.blocks[i].data['f2']), F.asnumpy(expected_f_sum))
235
236


Da Zheng's avatar
Da Zheng committed
237
def test_apply_edges():
238
    check_apply_edges(create_full_nodeflow)
Da Zheng's avatar
Da Zheng committed
239
    check_apply_edges(create_mini_batch)
240
    check_apply_edges1(create_mini_batch)
Da Zheng's avatar
Da Zheng committed
241
242


243
def check_flow_compute(create_node_flow, use_negative_block_id=False):
Da Zheng's avatar
Da Zheng committed
244
245
246
247
248
249
250
251
    num_layers = 2
    g = generate_rand_graph(100)
    nf = create_node_flow(g, num_layers)
    nf.copy_from_parent()
    g.ndata['h'] = g.ndata['h1']
    nf.layers[0].data['h'] = nf.layers[0].data['h1']
    # Test the computation on a layer at a time.
    for i in range(num_layers):
252
253
        l = -num_layers + i if use_negative_block_id else i
        nf.block_compute(l, fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
Da Zheng's avatar
Da Zheng committed
254
255
256
                         lambda nodes: {'h' : nodes.data['t'] + 1})
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
                     lambda nodes: {'h' : nodes.data['t'] + 1})
257
258
259
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
                        F.asnumpy(g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
                        rtol=1e-4, atol=1e-4)
Da Zheng's avatar
Da Zheng committed
260
261
262
263

    # Test the computation when only a few nodes are active in a layer.
    g.ndata['h'] = g.ndata['h1']
    for i in range(num_layers):
264
        l = -num_layers + i if use_negative_block_id else i
Da Zheng's avatar
Da Zheng committed
265
        vs = nf.layer_nid(i+1)[0:4]
266
        nf.block_compute(l, fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
Da Zheng's avatar
Da Zheng committed
267
268
269
270
                        lambda nodes: {'h' : nodes.data['t'] + 1}, v=vs)
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
                     lambda nodes: {'h' : nodes.data['t'] + 1})
        data1 = nf.layers[i + 1].data['h'][0:4]
271
        data2 = g.nodes[nf.map_to_parent_nid(vs)].data['h']
272
        assert_allclose(F.asnumpy(data1), F.asnumpy(data2), rtol=1e-4, atol=1e-4)
Da Zheng's avatar
Da Zheng committed
273

274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
def check_flow_compute1(create_node_flow, use_negative_block_id=False):
    num_layers = 2
    g = generate_rand_graph(100)

    # test the case that we register UDFs per block.
    nf = create_node_flow(g, num_layers)
    nf.copy_from_parent()
    g.ndata['h'] = g.ndata['h1']
    nf.layers[0].data['h'] = nf.layers[0].data['h1']
    for i in range(num_layers):
        l = -num_layers + i if use_negative_block_id else i
        nf.register_message_func(fn.copy_src(src='h', out='m'), l)
        nf.register_reduce_func(fn.sum(msg='m', out='t'), l)
        nf.register_apply_node_func(lambda nodes: {'h' : nodes.data['t'] + 1}, l)
        nf.block_compute(l)
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
                     lambda nodes: {'h' : nodes.data['t'] + 1})
291
292
293
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
                        F.asnumpy(g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
                        rtol=1e-4, atol=1e-4)
294
295
296
297
298
299
300
301
302
303
304
305
306
307

    # test the case that we register UDFs in all blocks.
    nf = create_node_flow(g, num_layers)
    nf.copy_from_parent()
    g.ndata['h'] = g.ndata['h1']
    nf.layers[0].data['h'] = nf.layers[0].data['h1']
    nf.register_message_func(fn.copy_src(src='h', out='m'))
    nf.register_reduce_func(fn.sum(msg='m', out='t'))
    nf.register_apply_node_func(lambda nodes: {'h' : nodes.data['t'] + 1})
    for i in range(num_layers):
        l = -num_layers + i if use_negative_block_id else i
        nf.block_compute(l)
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
                     lambda nodes: {'h' : nodes.data['t'] + 1})
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
                        F.asnumpy(g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
                        rtol=1e-4, atol=1e-4)

class SrcMulEdgeMessageFunction(object):
    def __init__(self, src_field, edge_field, out_field):
        self.mul_op = operator.mul
        self.src_field = src_field
        self.edge_field = edge_field
        self.out_field = out_field

    def __call__(self, edges):
        sdata = edges.src[self.src_field]
        edata = edges.data[self.edge_field]
        # Due to the different broadcasting semantics of different backends,
        # we need to broadcast the sdata and edata to be of the same rank.
        rank = max(F.ndim(sdata), F.ndim(edata))
        sshape = F.shape(sdata)
        eshape = F.shape(edata)
        sdata = F.reshape(sdata, sshape + (1,) * (rank - F.ndim(sdata)))
        edata = F.reshape(edata, eshape + (1,) * (rank - F.ndim(edata)))
        ret = self.mul_op(sdata, edata)
        return {self.out_field : ret}

def check_flow_compute2(create_node_flow):
    num_layers = 2
    g = generate_rand_graph(100)
    g.edata['h'] = F.ones((g.number_of_edges(), 10))
336

337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
    nf = create_node_flow(g, num_layers)
    nf.copy_from_parent()
    g.ndata['h'] = g.ndata['h1']
    nf.layers[0].data['h'] = nf.layers[0].data['h1']
    for i in range(num_layers):
        nf.block_compute(i, SrcMulEdgeMessageFunction('h', 'h', 't'), fn.sum('t', 'h1'))
        nf.block_compute(i, fn.src_mul_edge('h', 'h', 'h'), fn.sum('h', 'h'))
        g.update_all(fn.src_mul_edge('h', 'h', 'h'), fn.sum('h', 'h'))
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h1']),
                        F.asnumpy(nf.layers[i + 1].data['h']),
                        rtol=1e-4, atol=1e-4)
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
                        F.asnumpy(g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
                        rtol=1e-4, atol=1e-4)

    nf = create_node_flow(g, num_layers)
    g.ndata['h'] = g.ndata['h1']
    nf.copy_from_parent()
    for i in range(nf.num_layers):
        nf.layers[i].data['h'] = nf.layers[i].data['h1']
    for i in range(num_layers):
        nf.block_compute(i, fn.u_mul_v('h', 'h', 't'), fn.sum('t', 's'))
        g.update_all(fn.u_mul_v('h', 'h', 't'), fn.sum('t', 's'))
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['s']),
                        F.asnumpy(g.nodes[nf.layer_parent_nid(i + 1)].data['s']),
                        rtol=1e-4, atol=1e-4)
363

Da Zheng's avatar
Da Zheng committed
364
def test_flow_compute():
365
    check_flow_compute(create_full_nodeflow)
Da Zheng's avatar
Da Zheng committed
366
    check_flow_compute(create_mini_batch)
367
368
    check_flow_compute(create_full_nodeflow, use_negative_block_id=True)
    check_flow_compute(create_mini_batch, use_negative_block_id=True)
369
370
    check_flow_compute1(create_mini_batch)
    check_flow_compute1(create_mini_batch, use_negative_block_id=True)
371
    check_flow_compute2(create_mini_batch)
Da Zheng's avatar
Da Zheng committed
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387


def check_prop_flows(create_node_flow):
    num_layers = 2
    g = generate_rand_graph(100)
    g.ndata['h'] = g.ndata['h1']
    nf2 = create_node_flow(g, num_layers)
    nf2.copy_from_parent()
    # Test the computation on a layer at a time.
    for i in range(num_layers):
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
                     lambda nodes: {'h' : nodes.data['t'] + 1})

    # Test the computation on all layers.
    nf2.prop_flow(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
                  lambda nodes: {'h' : nodes.data['t'] + 1})
388
389
390
    assert_allclose(F.asnumpy(nf2.layers[-1].data['h']),
                    F.asnumpy(g.nodes[nf2.layer_parent_nid(-1)].data['h']),
                    rtol=1e-4, atol=1e-4)
Da Zheng's avatar
Da Zheng committed
391
392
393


def test_prop_flows():
394
    check_prop_flows(create_full_nodeflow)
Da Zheng's avatar
Da Zheng committed
395
396
397
398
399
400
401
402
403
404
405
406
407
    check_prop_flows(create_mini_batch)


def test_copy():
    num_layers = 2
    g = generate_rand_graph(100)
    g.ndata['h'] = g.ndata['h1']
    nf = create_mini_batch(g, num_layers)
    nf.copy_from_parent()
    for i in range(nf.num_layers):
        assert len(g.ndata.keys()) == len(nf.layers[i].data.keys())
        for key in g.ndata.keys():
            assert key in nf.layers[i].data.keys()
408
409
            assert_array_equal(F.asnumpy(nf.layers[i].data[key]),
                               F.asnumpy(g.nodes[nf.layer_parent_nid(i)].data[key]))
Da Zheng's avatar
Da Zheng committed
410
411
412
413
    for i in range(nf.num_blocks):
        assert len(g.edata.keys()) == len(nf.blocks[i].data.keys())
        for key in g.edata.keys():
            assert key in nf.blocks[i].data.keys()
414
415
            assert_array_equal(F.asnumpy(nf.blocks[i].data[key]),
                               F.asnumpy(g.edges[nf.block_parent_eid(i)].data[key]))
Da Zheng's avatar
Da Zheng committed
416
417
418
419
420
421
422
423
424

    nf = create_mini_batch(g, num_layers)
    node_embed_names = [['h'], ['h1'], ['h']]
    edge_embed_names = [['h2'], ['h2']]
    nf.copy_from_parent(node_embed_names=node_embed_names, edge_embed_names=edge_embed_names)
    for i in range(nf.num_layers):
        assert len(node_embed_names[i]) == len(nf.layers[i].data.keys())
        for key in node_embed_names[i]:
            assert key in nf.layers[i].data.keys()
425
426
            assert_array_equal(F.asnumpy(nf.layers[i].data[key]),
                               F.asnumpy(g.nodes[nf.layer_parent_nid(i)].data[key]))
Da Zheng's avatar
Da Zheng committed
427
428
429
430
    for i in range(nf.num_blocks):
        assert len(edge_embed_names[i]) == len(nf.blocks[i].data.keys())
        for key in edge_embed_names[i]:
            assert key in nf.blocks[i].data.keys()
431
432
            assert_array_equal(F.asnumpy(nf.blocks[i].data[key]),
                               F.asnumpy(g.edges[nf.block_parent_eid(i)].data[key]))
Da Zheng's avatar
Da Zheng committed
433
434
435
436
437
438
439
440
441
442

    nf = create_mini_batch(g, num_layers)
    g.ndata['h0'] = F.clone(g.ndata['h'])
    node_embed_names = [['h0'], [], []]
    nf.copy_from_parent(node_embed_names=node_embed_names, edge_embed_names=None)
    for i in range(num_layers):
        nf.block_compute(i, fn.copy_src(src='h%d' % i, out='m'), fn.sum(msg='m', out='t'),
                         lambda nodes: {'h%d' % (i+1) : nodes.data['t'] + 1})
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
                     lambda nodes: {'h' : nodes.data['t'] + 1})
443
444
445
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h%d' % (i+1)]),
                        F.asnumpy(g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
                        rtol=1e-4, atol=1e-4)
Da Zheng's avatar
Da Zheng committed
446
447
    nf.copy_to_parent(node_embed_names=[['h0'], ['h1'], ['h2']])
    for i in range(num_layers + 1):
448
449
        assert_array_equal(F.asnumpy(nf.layers[i].data['h%d' % i]),
                           F.asnumpy(g.nodes[nf.layer_parent_nid(i)].data['h%d' % i]))
Da Zheng's avatar
Da Zheng committed
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468

    nf = create_mini_batch(g, num_layers)
    g.ndata['h0'] = F.clone(g.ndata['h'])
    g.ndata['h1'] = F.clone(g.ndata['h'])
    g.ndata['h2'] = F.clone(g.ndata['h'])
    node_embed_names = [['h0'], ['h1'], ['h2']]
    nf.copy_from_parent(node_embed_names=node_embed_names, edge_embed_names=None)

    def msg_func(edge, ind):
        assert 'h%d' % ind in edge.src.keys()
        return {'m' : edge.src['h%d' % ind]}
    def reduce_func(node, ind):
        assert 'h%d' % (ind + 1) in node.data.keys()
        return {'h' : F.sum(node.mailbox['m'], 1) + node.data['h%d' % (ind + 1)]}

    for i in range(num_layers):
        nf.block_compute(i, partial(msg_func, ind=i), partial(reduce_func, ind=i))


469
def test_block_edges():
470
471
472
473
474
    num_layers = 3
    g = generate_rand_graph(100)
    nf = create_mini_batch(g, num_layers)
    assert nf.num_layers == num_layers + 1
    for i in range(nf.num_blocks):
475
476
477
478
479
480
481
        dest_nodes = utils.toindex(nf.layer_nid(i + 1))
        src1, dst1, eid1 = nf.in_edges(dest_nodes, 'all')

        src, dst, eid = nf.block_edges(i)
        assert_array_equal(F.asnumpy(src), F.asnumpy(src1))
        assert_array_equal(F.asnumpy(dst), F.asnumpy(dst1))
        assert_array_equal(F.asnumpy(eid), F.asnumpy(eid1))
482

483
        src, dst, eid = nf.block_edges(i, remap_local=True)
484
        # should also work for negative block ids
485
486
487
488
489
        src_by_neg, dst_by_neg, eid_by_neg = nf.block_edges(-nf.num_blocks + i,
                                                            remap_local=True)
        assert_array_equal(F.asnumpy(src), F.asnumpy(src_by_neg))
        assert_array_equal(F.asnumpy(dst), F.asnumpy(dst_by_neg))
        assert_array_equal(F.asnumpy(eid), F.asnumpy(eid_by_neg))
490

491
492
493
494
        src1 = nf._glb2lcl_nid(src1, i)
        dst1 = nf._glb2lcl_nid(dst1, i + 1)
        assert_array_equal(F.asnumpy(src), F.asnumpy(src1))
        assert_array_equal(F.asnumpy(dst), F.asnumpy(dst1))
495

496
497
498
499
500
501
502

def test_block_adj_matrix():
    num_layers = 3
    g = generate_rand_graph(100)
    nf = create_mini_batch(g, num_layers)
    assert nf.num_layers == num_layers + 1
    for i in range(nf.num_blocks):
503
        u, v, _ = nf.block_edges(i, remap_local=True)
504
505
        adj, _ = nf.block_adjacency_matrix(i, F.cpu())
        adj = F.sparse_to_numpy(adj)
506
507
508
509
510

        # should also work for negative block ids
        adj_by_neg, _ = nf.block_adjacency_matrix(-nf.num_blocks + i, F.cpu())
        adj_by_neg = F.sparse_to_numpy(adj_by_neg)

511
512
513
514
515
        data = np.ones((len(u)), dtype=np.float32)
        v = utils.toindex(v)
        u = utils.toindex(u)
        coo = sp.sparse.coo_matrix((data, (v.tonumpy(), u.tonumpy())),
                                   shape=adj.shape).todense()
516
517
        assert_array_equal(adj, coo)
        assert_array_equal(adj_by_neg, coo)
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539


def test_block_incidence_matrix():
    num_layers = 3
    g = generate_rand_graph(100)
    nf = create_mini_batch(g, num_layers)
    assert nf.num_layers == num_layers + 1
    for i in range(nf.num_blocks):
        typestrs = ["in", "out"] # todo need fix for "both"
        adjs = []
        for typestr in typestrs:
            adj, _ = nf.block_incidence_matrix(i, typestr, F.cpu())
            adj = F.sparse_to_numpy(adj)
            adjs.append(adj)

        # should work for negative block ids
        adjs_by_neg = []
        for typestr in typestrs:
            adj_by_neg, _ = nf.block_incidence_matrix(-nf.num_blocks + i, typestr, F.cpu())
            adj_by_neg = F.sparse_to_numpy(adj_by_neg)
            adjs_by_neg.append(adj_by_neg)

540
        u, v, e = nf.block_edges(i, remap_local=True)
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
        u = utils.toindex(u)
        v = utils.toindex(v)
        e = utils.toindex(e)

        expected = []
        data_in_and_out = np.ones((len(u)), dtype=np.float32)
        expected.append(
            sp.sparse.coo_matrix((data_in_and_out, (v.tonumpy(), e.tonumpy())),
                                 shape=adjs[0].shape).todense()
        )
        expected.append(
            sp.sparse.coo_matrix((data_in_and_out, (u.tonumpy(), e.tonumpy())),
                                 shape=adjs[1].shape).todense()
        )
        for i in range(len(typestrs)):
556
557
            assert_array_equal(adjs[i], expected[i])
            assert_array_equal(adjs_by_neg[i], expected[i])
558
559


Da Zheng's avatar
Da Zheng committed
560
561
if __name__ == '__main__':
    test_basic()
562
    test_block_adj_matrix()
Da Zheng's avatar
Da Zheng committed
563
564
565
566
567
    test_copy()
    test_apply_nodes()
    test_apply_edges()
    test_flow_compute()
    test_prop_flows()
568
    test_self_loop()
569
    test_block_edges()
570
    test_block_incidence_matrix()