test_nodeflow.py 22.2 KB
Newer Older
Da Zheng's avatar
Da Zheng committed
1
2
import backend as F
import numpy as np
3
from numpy.testing import assert_array_equal, assert_allclose
Da Zheng's avatar
Da Zheng committed
4
import scipy as sp
5
import operator
Da Zheng's avatar
Da Zheng committed
6
import dgl
7
from dgl.contrib.sampling.sampler import create_full_nodeflow, NeighborSampler
Da Zheng's avatar
Da Zheng committed
8
9
10
from dgl import utils
import dgl.function as fn
from functools import partial
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import itertools

def generate_rand_graph(n, connect_more=False, complete=False):
    if complete:
        cord = [(i,j) for i, j in itertools.product(range(n), range(n)) if i != j]
        row = [t[0] for t in cord]
        col = [t[1] for t in cord]
        data = np.ones((len(row),))
        arr = sp.sparse.coo_matrix((data, (row, col)), shape=(n, n))
    else:
        arr = (sp.sparse.random(n, n, density=0.1, format='coo') != 0).astype(np.int64)
        # having one node to connect to all other nodes.
        if connect_more:
            arr[0] = 1
            arr[:,0] = 1
Da Zheng's avatar
Da Zheng committed
26
27
28
29
30
31
    g = dgl.DGLGraph(arr, readonly=True)
    g.ndata['h1'] = F.randn((g.number_of_nodes(), 10))
    g.edata['h2'] = F.randn((g.number_of_edges(), 3))
    return g


32
33
34
35
36
37
38
def test_self_loop():
    n = 100
    num_hops = 2
    g = generate_rand_graph(n, complete=True)
    nf = create_mini_batch(g, num_hops, add_self_loop=True)
    for i in range(1, nf.num_layers):
        in_deg = nf.layer_in_degree(i)
39
        deg = F.copy_to(F.ones(in_deg.shape, dtype=F.int64), F.cpu()) * n
40
        assert_array_equal(F.asnumpy(in_deg), F.asnumpy(deg))
41
42

def create_mini_batch(g, num_hops, add_self_loop=False):
43
    seed_ids = np.array([1, 2, 0, 3])
44
45
46
47
    sampler = NeighborSampler(g, batch_size=4, expand_factor=g.number_of_nodes(),
            num_hops=num_hops, seed_nodes=seed_ids, add_self_loop=add_self_loop)
    nfs = list(sampler)
    assert len(nfs) == 1
48
    assert_array_equal(F.asnumpy(nfs[0].layer_parent_nid(-1)), seed_ids)
49
    return nfs[0]
Da Zheng's avatar
Da Zheng committed
50
51
52
53
54
55
56
57
58
59

def check_basic(g, nf):
    num_nodes = 0
    for i in range(nf.num_layers):
        num_nodes += nf.layer_size(i)
    assert nf.number_of_nodes() == num_nodes
    num_edges = 0
    for i in range(nf.num_blocks):
        num_edges += nf.block_size(i)
    assert nf.number_of_edges() == num_edges
60
61
62
63
64
65
66
67
68
69
70
71
72
73
    assert len(nf) == num_nodes
    assert nf.is_readonly
    assert not nf.is_multigraph

    assert np.all(F.asnumpy(nf.has_nodes(list(range(num_nodes)))))
    for i in range(num_nodes):
        assert nf.has_node(i)
    assert np.all(F.asnumpy(nf.has_nodes(list(range(num_nodes, 2 * num_nodes)))) == 0)
    for i in range(num_nodes, 2 * num_nodes):
        assert not nf.has_node(i)

    for block_id in range(nf.num_blocks):
        u, v, eid = nf.block_edges(block_id)
        assert np.all(F.asnumpy(nf.has_edges_between(u, v)))
Da Zheng's avatar
Da Zheng committed
74
75

    deg = nf.layer_in_degree(0)
76
    assert_array_equal(F.asnumpy(deg), np.zeros((nf.layer_size(0)), np.int64))
Da Zheng's avatar
Da Zheng committed
77
    deg = nf.layer_out_degree(-1)
78
79
80
    assert_array_equal(F.asnumpy(deg), np.zeros((nf.layer_size(-1)), np.int64))

    nf.copy_from_parent()
Da Zheng's avatar
Da Zheng committed
81
82
83
84
85
    for i in range(1, nf.num_layers):
        in_deg = nf.layer_in_degree(i)
        out_deg = nf.layer_out_degree(i - 1)
        assert F.asnumpy(F.sum(in_deg, 0) == F.sum(out_deg, 0))

86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
        nids = nf.layer_nid(i)
        parent_nids = nf.map_to_parent_nid(nids)
        nids1 = nf.map_from_parent_nid(i, parent_nids)
        assert_array_equal(F.asnumpy(nids), F.asnumpy(nids1))

        data = nf.layers[i].data['h1']
        data1 = g.nodes[nf.layer_parent_nid(i)].data['h1']
        assert_array_equal(F.asnumpy(data), F.asnumpy(data1))

    for i in range(nf.num_blocks):
        data = nf.blocks[i].data['h2']
        data1 = g.edges[nf.block_parent_eid(i)].data['h2']
        assert_array_equal(F.asnumpy(data), F.asnumpy(data1))


101
102
103
104
105
106
    # negative layer Ids.
    for i in range(-1, -nf.num_layers, -1):
        in_deg = nf.layer_in_degree(i)
        out_deg = nf.layer_out_degree(i - 1)
        assert F.asnumpy(F.sum(in_deg, 0) == F.sum(out_deg, 0))

Da Zheng's avatar
Da Zheng committed
107
108
109
110

def test_basic():
    num_layers = 2
    g = generate_rand_graph(100, connect_more=True)
111
    nf = create_full_nodeflow(g, num_layers)
Da Zheng's avatar
Da Zheng committed
112
113
114
115
116
117
118
    assert nf.number_of_nodes() == g.number_of_nodes() * (num_layers + 1)
    assert nf.number_of_edges() == g.number_of_edges() * num_layers
    assert nf.num_layers == num_layers + 1
    assert nf.layer_size(0) == g.number_of_nodes()
    assert nf.layer_size(1) == g.number_of_nodes()
    check_basic(g, nf)

119
    parent_nids = F.copy_to(F.arange(0, g.number_of_nodes()), F.cpu())
120
121
    nids = nf.map_from_parent_nid(0, parent_nids, remap_local=True)
    assert_array_equal(F.asnumpy(nids), F.asnumpy(parent_nids))
Da Zheng's avatar
Da Zheng committed
122

123
124
    # should also work for negative layer ids
    for l in range(-1, -num_layers, -1):
125
126
127
        nids1 = nf.map_from_parent_nid(l, parent_nids, remap_local=True)
        nids2 = nf.map_from_parent_nid(l + num_layers, parent_nids, remap_local=True)
        assert_array_equal(F.asnumpy(nids1), F.asnumpy(nids2))
128

Da Zheng's avatar
Da Zheng committed
129
130
131
132
133
134
    g = generate_rand_graph(100)
    nf = create_mini_batch(g, num_layers)
    assert nf.num_layers == num_layers + 1
    check_basic(g, nf)


135
def check_apply_nodes(create_node_flow, use_negative_block_id):
Da Zheng's avatar
Da Zheng committed
136
137
    num_layers = 2
    for i in range(num_layers):
138
        l = -num_layers + i if use_negative_block_id else i
Da Zheng's avatar
Da Zheng committed
139
140
141
        g = generate_rand_graph(100)
        nf = create_node_flow(g, num_layers)
        nf.copy_from_parent()
142
        new_feats = F.randn((nf.layer_size(l), 5))
Da Zheng's avatar
Da Zheng committed
143
144
        def update_func(nodes):
            return {'h1' : new_feats}
145
        nf.apply_layer(l, update_func)
146
        assert_array_equal(F.asnumpy(nf.layers[l].data['h1']), F.asnumpy(new_feats))
Da Zheng's avatar
Da Zheng committed
147
148
149
150

        new_feats = F.randn((4, 5))
        def update_func1(nodes):
            return {'h1' : new_feats}
151
        nf.apply_layer(l, update_func1, v=nf.layer_nid(l)[0:4])
152
        assert_array_equal(F.asnumpy(nf.layers[l].data['h1'][0:4]), F.asnumpy(new_feats))
Da Zheng's avatar
Da Zheng committed
153
154
155


def test_apply_nodes():
156
157
158
159
    check_apply_nodes(create_full_nodeflow, use_negative_block_id=False)
    check_apply_nodes(create_mini_batch, use_negative_block_id=False)
    check_apply_nodes(create_full_nodeflow, use_negative_block_id=True)
    check_apply_nodes(create_mini_batch, use_negative_block_id=True)
Da Zheng's avatar
Da Zheng committed
160
161
162
163
164
165


def check_apply_edges(create_node_flow):
    num_layers = 2
    for i in range(num_layers):
        g = generate_rand_graph(100)
166
        g.ndata["f"] = F.randn((100, 10))
Da Zheng's avatar
Da Zheng committed
167
168
169
        nf = create_node_flow(g, num_layers)
        nf.copy_from_parent()
        new_feats = F.randn((nf.block_size(i), 5))
170
171
172
173

        def update_func(edges):
            return {'h2': new_feats, "f2": edges.src["f"] + edges.dst["f"]}

Da Zheng's avatar
Da Zheng committed
174
        nf.apply_block(i, update_func)
175
        assert_array_equal(F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
Da Zheng's avatar
Da Zheng committed
176

177
178
        # should also work for negative block ids
        nf.apply_block(-num_layers + i, update_func)
179
        assert_array_equal(F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
180

181
182
        eids = nf.block_parent_eid(i)
        srcs, dsts = g.find_edges(eids)
183
        expected_f_sum = g.nodes[srcs].data["f"] + g.nodes[dsts].data["f"]
184
        assert_array_equal(F.asnumpy(nf.blocks[i].data['f2']), F.asnumpy(expected_f_sum))
185

Da Zheng's avatar
Da Zheng committed
186

187
188
189
190
191
192
193
194
195
196
197
198
199
200
def check_apply_edges1(create_node_flow):
    num_layers = 2
    for i in range(num_layers):
        g = generate_rand_graph(100)
        g.ndata["f"] = F.randn((100, 10))
        nf = create_node_flow(g, num_layers)
        nf.copy_from_parent()
        new_feats = F.randn((nf.block_size(i), 5))

        def update_func(edges):
            return {'h2': new_feats, "f2": edges.src["f"] + edges.dst["f"]}

        nf.register_apply_edge_func(update_func, i)
        nf.apply_block(i)
201
        assert_array_equal(F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
202
203
204
205

        # should also work for negative block ids
        nf.register_apply_edge_func(update_func, -num_layers + i)
        nf.apply_block(-num_layers + i)
206
        assert_array_equal(F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
207
208
209
210
211

        eids = nf.block_parent_eid(i)
        srcs, dsts = g.find_edges(eids)
        expected_f_sum = g.nodes[srcs].data["f"] + g.nodes[dsts].data["f"]
        #expected_f_sum = g.ndata["f"][srcs] + g.ndata["f"][dsts]
212
        assert_array_equal(F.asnumpy(nf.blocks[i].data['f2']), F.asnumpy(expected_f_sum))
213
214


Da Zheng's avatar
Da Zheng committed
215
def test_apply_edges():
216
    check_apply_edges(create_full_nodeflow)
Da Zheng's avatar
Da Zheng committed
217
    check_apply_edges(create_mini_batch)
218
    check_apply_edges1(create_mini_batch)
Da Zheng's avatar
Da Zheng committed
219
220


221
def check_flow_compute(create_node_flow, use_negative_block_id=False):
Da Zheng's avatar
Da Zheng committed
222
223
224
225
226
227
228
229
    num_layers = 2
    g = generate_rand_graph(100)
    nf = create_node_flow(g, num_layers)
    nf.copy_from_parent()
    g.ndata['h'] = g.ndata['h1']
    nf.layers[0].data['h'] = nf.layers[0].data['h1']
    # Test the computation on a layer at a time.
    for i in range(num_layers):
230
231
        l = -num_layers + i if use_negative_block_id else i
        nf.block_compute(l, fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
Da Zheng's avatar
Da Zheng committed
232
233
234
                         lambda nodes: {'h' : nodes.data['t'] + 1})
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
                     lambda nodes: {'h' : nodes.data['t'] + 1})
235
236
237
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
                        F.asnumpy(g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
                        rtol=1e-4, atol=1e-4)
Da Zheng's avatar
Da Zheng committed
238
239
240
241

    # Test the computation when only a few nodes are active in a layer.
    g.ndata['h'] = g.ndata['h1']
    for i in range(num_layers):
242
        l = -num_layers + i if use_negative_block_id else i
Da Zheng's avatar
Da Zheng committed
243
        vs = nf.layer_nid(i+1)[0:4]
244
        nf.block_compute(l, fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
Da Zheng's avatar
Da Zheng committed
245
246
247
248
                        lambda nodes: {'h' : nodes.data['t'] + 1}, v=vs)
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
                     lambda nodes: {'h' : nodes.data['t'] + 1})
        data1 = nf.layers[i + 1].data['h'][0:4]
249
        data2 = g.nodes[nf.map_to_parent_nid(vs)].data['h']
250
        assert_allclose(F.asnumpy(data1), F.asnumpy(data2), rtol=1e-4, atol=1e-4)
Da Zheng's avatar
Da Zheng committed
251

252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def check_flow_compute1(create_node_flow, use_negative_block_id=False):
    num_layers = 2
    g = generate_rand_graph(100)

    # test the case that we register UDFs per block.
    nf = create_node_flow(g, num_layers)
    nf.copy_from_parent()
    g.ndata['h'] = g.ndata['h1']
    nf.layers[0].data['h'] = nf.layers[0].data['h1']
    for i in range(num_layers):
        l = -num_layers + i if use_negative_block_id else i
        nf.register_message_func(fn.copy_src(src='h', out='m'), l)
        nf.register_reduce_func(fn.sum(msg='m', out='t'), l)
        nf.register_apply_node_func(lambda nodes: {'h' : nodes.data['t'] + 1}, l)
        nf.block_compute(l)
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
                     lambda nodes: {'h' : nodes.data['t'] + 1})
269
270
271
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
                        F.asnumpy(g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
                        rtol=1e-4, atol=1e-4)
272
273
274
275
276
277
278
279
280
281
282
283
284
285

    # test the case that we register UDFs in all blocks.
    nf = create_node_flow(g, num_layers)
    nf.copy_from_parent()
    g.ndata['h'] = g.ndata['h1']
    nf.layers[0].data['h'] = nf.layers[0].data['h1']
    nf.register_message_func(fn.copy_src(src='h', out='m'))
    nf.register_reduce_func(fn.sum(msg='m', out='t'))
    nf.register_apply_node_func(lambda nodes: {'h' : nodes.data['t'] + 1})
    for i in range(num_layers):
        l = -num_layers + i if use_negative_block_id else i
        nf.block_compute(l)
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
                     lambda nodes: {'h' : nodes.data['t'] + 1})
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
                        F.asnumpy(g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
                        rtol=1e-4, atol=1e-4)

class SrcMulEdgeMessageFunction(object):
    def __init__(self, src_field, edge_field, out_field):
        self.mul_op = operator.mul
        self.src_field = src_field
        self.edge_field = edge_field
        self.out_field = out_field

    def __call__(self, edges):
        sdata = edges.src[self.src_field]
        edata = edges.data[self.edge_field]
        # Due to the different broadcasting semantics of different backends,
        # we need to broadcast the sdata and edata to be of the same rank.
        rank = max(F.ndim(sdata), F.ndim(edata))
        sshape = F.shape(sdata)
        eshape = F.shape(edata)
        sdata = F.reshape(sdata, sshape + (1,) * (rank - F.ndim(sdata)))
        edata = F.reshape(edata, eshape + (1,) * (rank - F.ndim(edata)))
        ret = self.mul_op(sdata, edata)
        return {self.out_field : ret}

def check_flow_compute2(create_node_flow):
    num_layers = 2
    g = generate_rand_graph(100)
    g.edata['h'] = F.ones((g.number_of_edges(), 10))
314

315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
    nf = create_node_flow(g, num_layers)
    nf.copy_from_parent()
    g.ndata['h'] = g.ndata['h1']
    nf.layers[0].data['h'] = nf.layers[0].data['h1']
    for i in range(num_layers):
        nf.block_compute(i, SrcMulEdgeMessageFunction('h', 'h', 't'), fn.sum('t', 'h1'))
        nf.block_compute(i, fn.src_mul_edge('h', 'h', 'h'), fn.sum('h', 'h'))
        g.update_all(fn.src_mul_edge('h', 'h', 'h'), fn.sum('h', 'h'))
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h1']),
                        F.asnumpy(nf.layers[i + 1].data['h']),
                        rtol=1e-4, atol=1e-4)
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
                        F.asnumpy(g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
                        rtol=1e-4, atol=1e-4)

    nf = create_node_flow(g, num_layers)
    g.ndata['h'] = g.ndata['h1']
    nf.copy_from_parent()
    for i in range(nf.num_layers):
        nf.layers[i].data['h'] = nf.layers[i].data['h1']
    for i in range(num_layers):
        nf.block_compute(i, fn.u_mul_v('h', 'h', 't'), fn.sum('t', 's'))
        g.update_all(fn.u_mul_v('h', 'h', 't'), fn.sum('t', 's'))
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['s']),
                        F.asnumpy(g.nodes[nf.layer_parent_nid(i + 1)].data['s']),
                        rtol=1e-4, atol=1e-4)
341

Da Zheng's avatar
Da Zheng committed
342
def test_flow_compute():
343
    check_flow_compute(create_full_nodeflow)
Da Zheng's avatar
Da Zheng committed
344
    check_flow_compute(create_mini_batch)
345
346
    check_flow_compute(create_full_nodeflow, use_negative_block_id=True)
    check_flow_compute(create_mini_batch, use_negative_block_id=True)
347
348
    check_flow_compute1(create_mini_batch)
    check_flow_compute1(create_mini_batch, use_negative_block_id=True)
349
    check_flow_compute2(create_mini_batch)
Da Zheng's avatar
Da Zheng committed
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365


def check_prop_flows(create_node_flow):
    num_layers = 2
    g = generate_rand_graph(100)
    g.ndata['h'] = g.ndata['h1']
    nf2 = create_node_flow(g, num_layers)
    nf2.copy_from_parent()
    # Test the computation on a layer at a time.
    for i in range(num_layers):
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
                     lambda nodes: {'h' : nodes.data['t'] + 1})

    # Test the computation on all layers.
    nf2.prop_flow(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
                  lambda nodes: {'h' : nodes.data['t'] + 1})
366
367
368
    assert_allclose(F.asnumpy(nf2.layers[-1].data['h']),
                    F.asnumpy(g.nodes[nf2.layer_parent_nid(-1)].data['h']),
                    rtol=1e-4, atol=1e-4)
Da Zheng's avatar
Da Zheng committed
369
370
371


def test_prop_flows():
372
    check_prop_flows(create_full_nodeflow)
Da Zheng's avatar
Da Zheng committed
373
374
375
376
377
378
379
380
381
382
383
384
385
    check_prop_flows(create_mini_batch)


def test_copy():
    num_layers = 2
    g = generate_rand_graph(100)
    g.ndata['h'] = g.ndata['h1']
    nf = create_mini_batch(g, num_layers)
    nf.copy_from_parent()
    for i in range(nf.num_layers):
        assert len(g.ndata.keys()) == len(nf.layers[i].data.keys())
        for key in g.ndata.keys():
            assert key in nf.layers[i].data.keys()
386
387
            assert_array_equal(F.asnumpy(nf.layers[i].data[key]),
                               F.asnumpy(g.nodes[nf.layer_parent_nid(i)].data[key]))
Da Zheng's avatar
Da Zheng committed
388
389
390
391
    for i in range(nf.num_blocks):
        assert len(g.edata.keys()) == len(nf.blocks[i].data.keys())
        for key in g.edata.keys():
            assert key in nf.blocks[i].data.keys()
392
393
            assert_array_equal(F.asnumpy(nf.blocks[i].data[key]),
                               F.asnumpy(g.edges[nf.block_parent_eid(i)].data[key]))
Da Zheng's avatar
Da Zheng committed
394
395
396
397
398
399
400
401
402

    nf = create_mini_batch(g, num_layers)
    node_embed_names = [['h'], ['h1'], ['h']]
    edge_embed_names = [['h2'], ['h2']]
    nf.copy_from_parent(node_embed_names=node_embed_names, edge_embed_names=edge_embed_names)
    for i in range(nf.num_layers):
        assert len(node_embed_names[i]) == len(nf.layers[i].data.keys())
        for key in node_embed_names[i]:
            assert key in nf.layers[i].data.keys()
403
404
            assert_array_equal(F.asnumpy(nf.layers[i].data[key]),
                               F.asnumpy(g.nodes[nf.layer_parent_nid(i)].data[key]))
Da Zheng's avatar
Da Zheng committed
405
406
407
408
    for i in range(nf.num_blocks):
        assert len(edge_embed_names[i]) == len(nf.blocks[i].data.keys())
        for key in edge_embed_names[i]:
            assert key in nf.blocks[i].data.keys()
409
410
            assert_array_equal(F.asnumpy(nf.blocks[i].data[key]),
                               F.asnumpy(g.edges[nf.block_parent_eid(i)].data[key]))
Da Zheng's avatar
Da Zheng committed
411
412
413
414
415
416
417
418
419
420

    nf = create_mini_batch(g, num_layers)
    g.ndata['h0'] = F.clone(g.ndata['h'])
    node_embed_names = [['h0'], [], []]
    nf.copy_from_parent(node_embed_names=node_embed_names, edge_embed_names=None)
    for i in range(num_layers):
        nf.block_compute(i, fn.copy_src(src='h%d' % i, out='m'), fn.sum(msg='m', out='t'),
                         lambda nodes: {'h%d' % (i+1) : nodes.data['t'] + 1})
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
                     lambda nodes: {'h' : nodes.data['t'] + 1})
421
422
423
        assert_allclose(F.asnumpy(nf.layers[i + 1].data['h%d' % (i+1)]),
                        F.asnumpy(g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
                        rtol=1e-4, atol=1e-4)
Da Zheng's avatar
Da Zheng committed
424
425
    nf.copy_to_parent(node_embed_names=[['h0'], ['h1'], ['h2']])
    for i in range(num_layers + 1):
426
427
        assert_array_equal(F.asnumpy(nf.layers[i].data['h%d' % i]),
                           F.asnumpy(g.nodes[nf.layer_parent_nid(i)].data['h%d' % i]))
Da Zheng's avatar
Da Zheng committed
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446

    nf = create_mini_batch(g, num_layers)
    g.ndata['h0'] = F.clone(g.ndata['h'])
    g.ndata['h1'] = F.clone(g.ndata['h'])
    g.ndata['h2'] = F.clone(g.ndata['h'])
    node_embed_names = [['h0'], ['h1'], ['h2']]
    nf.copy_from_parent(node_embed_names=node_embed_names, edge_embed_names=None)

    def msg_func(edge, ind):
        assert 'h%d' % ind in edge.src.keys()
        return {'m' : edge.src['h%d' % ind]}
    def reduce_func(node, ind):
        assert 'h%d' % (ind + 1) in node.data.keys()
        return {'h' : F.sum(node.mailbox['m'], 1) + node.data['h%d' % (ind + 1)]}

    for i in range(num_layers):
        nf.block_compute(i, partial(msg_func, ind=i), partial(reduce_func, ind=i))


447
def test_block_edges():
448
449
450
451
452
    num_layers = 3
    g = generate_rand_graph(100)
    nf = create_mini_batch(g, num_layers)
    assert nf.num_layers == num_layers + 1
    for i in range(nf.num_blocks):
453
454
455
456
457
458
459
        dest_nodes = utils.toindex(nf.layer_nid(i + 1))
        src1, dst1, eid1 = nf.in_edges(dest_nodes, 'all')

        src, dst, eid = nf.block_edges(i)
        assert_array_equal(F.asnumpy(src), F.asnumpy(src1))
        assert_array_equal(F.asnumpy(dst), F.asnumpy(dst1))
        assert_array_equal(F.asnumpy(eid), F.asnumpy(eid1))
460

461
        src, dst, eid = nf.block_edges(i, remap_local=True)
462
        # should also work for negative block ids
463
464
465
466
467
        src_by_neg, dst_by_neg, eid_by_neg = nf.block_edges(-nf.num_blocks + i,
                                                            remap_local=True)
        assert_array_equal(F.asnumpy(src), F.asnumpy(src_by_neg))
        assert_array_equal(F.asnumpy(dst), F.asnumpy(dst_by_neg))
        assert_array_equal(F.asnumpy(eid), F.asnumpy(eid_by_neg))
468

469
470
471
472
        src1 = nf._glb2lcl_nid(src1, i)
        dst1 = nf._glb2lcl_nid(dst1, i + 1)
        assert_array_equal(F.asnumpy(src), F.asnumpy(src1))
        assert_array_equal(F.asnumpy(dst), F.asnumpy(dst1))
473

474
475
476
477
478
479
480

def test_block_adj_matrix():
    num_layers = 3
    g = generate_rand_graph(100)
    nf = create_mini_batch(g, num_layers)
    assert nf.num_layers == num_layers + 1
    for i in range(nf.num_blocks):
481
        u, v, _ = nf.block_edges(i, remap_local=True)
482
483
        adj, _ = nf.block_adjacency_matrix(i, F.cpu())
        adj = F.sparse_to_numpy(adj)
484
485
486
487
488

        # should also work for negative block ids
        adj_by_neg, _ = nf.block_adjacency_matrix(-nf.num_blocks + i, F.cpu())
        adj_by_neg = F.sparse_to_numpy(adj_by_neg)

489
490
491
492
493
        data = np.ones((len(u)), dtype=np.float32)
        v = utils.toindex(v)
        u = utils.toindex(u)
        coo = sp.sparse.coo_matrix((data, (v.tonumpy(), u.tonumpy())),
                                   shape=adj.shape).todense()
494
495
        assert_array_equal(adj, coo)
        assert_array_equal(adj_by_neg, coo)
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517


def test_block_incidence_matrix():
    num_layers = 3
    g = generate_rand_graph(100)
    nf = create_mini_batch(g, num_layers)
    assert nf.num_layers == num_layers + 1
    for i in range(nf.num_blocks):
        typestrs = ["in", "out"] # todo need fix for "both"
        adjs = []
        for typestr in typestrs:
            adj, _ = nf.block_incidence_matrix(i, typestr, F.cpu())
            adj = F.sparse_to_numpy(adj)
            adjs.append(adj)

        # should work for negative block ids
        adjs_by_neg = []
        for typestr in typestrs:
            adj_by_neg, _ = nf.block_incidence_matrix(-nf.num_blocks + i, typestr, F.cpu())
            adj_by_neg = F.sparse_to_numpy(adj_by_neg)
            adjs_by_neg.append(adj_by_neg)

518
        u, v, e = nf.block_edges(i, remap_local=True)
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
        u = utils.toindex(u)
        v = utils.toindex(v)
        e = utils.toindex(e)

        expected = []
        data_in_and_out = np.ones((len(u)), dtype=np.float32)
        expected.append(
            sp.sparse.coo_matrix((data_in_and_out, (v.tonumpy(), e.tonumpy())),
                                 shape=adjs[0].shape).todense()
        )
        expected.append(
            sp.sparse.coo_matrix((data_in_and_out, (u.tonumpy(), e.tonumpy())),
                                 shape=adjs[1].shape).todense()
        )
        for i in range(len(typestrs)):
534
535
            assert_array_equal(adjs[i], expected[i])
            assert_array_equal(adjs_by_neg[i], expected[i])
536
537


Da Zheng's avatar
Da Zheng committed
538
539
if __name__ == '__main__':
    test_basic()
540
    test_block_adj_matrix()
Da Zheng's avatar
Da Zheng committed
541
542
543
544
545
    test_copy()
    test_apply_nodes()
    test_apply_edges()
    test_flow_compute()
    test_prop_flows()
546
    test_self_loop()
547
    test_block_edges()
548
    test_block_incidence_matrix()