test_nodeflow.py 14 KB
Newer Older
Da Zheng's avatar
Da Zheng committed
1
2
3
4
import backend as F
import numpy as np
import scipy as sp
import dgl
5
from dgl.contrib.sampling.sampler import create_full_nodeflow, NeighborSampler
Da Zheng's avatar
Da Zheng committed
6
7
8
from dgl import utils
import dgl.function as fn
from functools import partial
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import itertools


def generate_rand_graph(n, connect_more=False, complete=False):
    if complete:
        cord = [(i,j) for i, j in itertools.product(range(n), range(n)) if i != j]
        row = [t[0] for t in cord]
        col = [t[1] for t in cord]
        data = np.ones((len(row),))
        arr = sp.sparse.coo_matrix((data, (row, col)), shape=(n, n))
    else:
        arr = (sp.sparse.random(n, n, density=0.1, format='coo') != 0).astype(np.int64)
        # having one node to connect to all other nodes.
        if connect_more:
            arr[0] = 1
            arr[:,0] = 1
Da Zheng's avatar
Da Zheng committed
25
26
27
28
29
30
    g = dgl.DGLGraph(arr, readonly=True)
    g.ndata['h1'] = F.randn((g.number_of_nodes(), 10))
    g.edata['h2'] = F.randn((g.number_of_edges(), 3))
    return g


31
32
33
34
35
36
37
def test_self_loop():
    n = 100
    num_hops = 2
    g = generate_rand_graph(n, complete=True)
    nf = create_mini_batch(g, num_hops, add_self_loop=True)
    for i in range(1, nf.num_layers):
        in_deg = nf.layer_in_degree(i)
38
        deg = F.copy_to(F.ones(in_deg.shape, dtype=F.int64), F.cpu()) * n
39
40
41
        assert F.array_equal(in_deg, deg)

def create_mini_batch(g, num_hops, add_self_loop=False):
Da Zheng's avatar
Da Zheng committed
42
    seed_ids = np.array([0, 1, 2, 3])
43
44
45
46
47
    sampler = NeighborSampler(g, batch_size=4, expand_factor=g.number_of_nodes(),
            num_hops=num_hops, seed_nodes=seed_ids, add_self_loop=add_self_loop)
    nfs = list(sampler)
    assert len(nfs) == 1
    return nfs[0]
Da Zheng's avatar
Da Zheng committed
48
49
50
51
52
53
54
55
56
57
58
59

def check_basic(g, nf):
    num_nodes = 0
    for i in range(nf.num_layers):
        num_nodes += nf.layer_size(i)
    assert nf.number_of_nodes() == num_nodes
    num_edges = 0
    for i in range(nf.num_blocks):
        num_edges += nf.block_size(i)
    assert nf.number_of_edges() == num_edges

    deg = nf.layer_in_degree(0)
60
    assert F.array_equal(deg, F.copy_to(F.zeros((nf.layer_size(0)), F.int64), F.cpu()))
Da Zheng's avatar
Da Zheng committed
61
    deg = nf.layer_out_degree(-1)
62
    assert F.array_equal(deg, F.copy_to(F.zeros((nf.layer_size(-1)), F.int64), F.cpu()))
Da Zheng's avatar
Da Zheng committed
63
64
65
66
67
68
69
70
71
    for i in range(1, nf.num_layers):
        in_deg = nf.layer_in_degree(i)
        out_deg = nf.layer_out_degree(i - 1)
        assert F.asnumpy(F.sum(in_deg, 0) == F.sum(out_deg, 0))


def test_basic():
    num_layers = 2
    g = generate_rand_graph(100, connect_more=True)
72
    nf = create_full_nodeflow(g, num_layers)
Da Zheng's avatar
Da Zheng committed
73
74
75
76
77
78
79
    assert nf.number_of_nodes() == g.number_of_nodes() * (num_layers + 1)
    assert nf.number_of_edges() == g.number_of_edges() * num_layers
    assert nf.num_layers == num_layers + 1
    assert nf.layer_size(0) == g.number_of_nodes()
    assert nf.layer_size(1) == g.number_of_nodes()
    check_basic(g, nf)

80
    parent_nids = F.copy_to(F.arange(0, g.number_of_nodes()), F.cpu())
81
    nids = nf.map_from_parent_nid(0, parent_nids)
Da Zheng's avatar
Da Zheng committed
82
83
    assert F.array_equal(nids, parent_nids)

84
85
86
87
88
89
    # should also work for negative layer ids
    for l in range(-1, -num_layers, -1):
        nids1 = nf.map_from_parent_nid(l, parent_nids)
        nids2 = nf.map_from_parent_nid(l + num_layers, parent_nids)
        assert F.array_equal(nids1, nids2)

Da Zheng's avatar
Da Zheng committed
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
    g = generate_rand_graph(100)
    nf = create_mini_batch(g, num_layers)
    assert nf.num_layers == num_layers + 1
    check_basic(g, nf)


def check_apply_nodes(create_node_flow):
    num_layers = 2
    for i in range(num_layers):
        g = generate_rand_graph(100)
        nf = create_node_flow(g, num_layers)
        nf.copy_from_parent()
        new_feats = F.randn((nf.layer_size(i), 5))
        def update_func(nodes):
            return {'h1' : new_feats}
        nf.apply_layer(i, update_func)
        assert F.array_equal(nf.layers[i].data['h1'], new_feats)

        new_feats = F.randn((4, 5))
        def update_func1(nodes):
            return {'h1' : new_feats}
        nf.apply_layer(i, update_func1, v=nf.layer_nid(i)[0:4])
        assert F.array_equal(nf.layers[i].data['h1'][0:4], new_feats)


def test_apply_nodes():
116
    check_apply_nodes(create_full_nodeflow)
Da Zheng's avatar
Da Zheng committed
117
118
119
120
121
122
123
    check_apply_nodes(create_mini_batch)


def check_apply_edges(create_node_flow):
    num_layers = 2
    for i in range(num_layers):
        g = generate_rand_graph(100)
124
        g.ndata["f"] = F.randn((100, 10))
Da Zheng's avatar
Da Zheng committed
125
126
127
        nf = create_node_flow(g, num_layers)
        nf.copy_from_parent()
        new_feats = F.randn((nf.block_size(i), 5))
128
129
130
131

        def update_func(edges):
            return {'h2': new_feats, "f2": edges.src["f"] + edges.dst["f"]}

Da Zheng's avatar
Da Zheng committed
132
133
134
        nf.apply_block(i, update_func)
        assert F.array_equal(nf.blocks[i].data['h2'], new_feats)

135
136
137
138
        # should also work for negative block ids
        nf.apply_block(-num_layers + i, update_func)
        assert F.array_equal(nf.blocks[i].data['h2'], new_feats)

139
140
        eids = nf.block_parent_eid(i)
        srcs, dsts = g.find_edges(eids)
141
        expected_f_sum = g.nodes[srcs].data["f"] + g.nodes[dsts].data["f"]
142
143
        assert F.array_equal(nf.blocks[i].data['f2'], expected_f_sum)

Da Zheng's avatar
Da Zheng committed
144
145

def test_apply_edges():
146
    check_apply_edges(create_full_nodeflow)
Da Zheng's avatar
Da Zheng committed
147
148
149
    check_apply_edges(create_mini_batch)


150
def check_flow_compute(create_node_flow, use_negative_block_id=False):
Da Zheng's avatar
Da Zheng committed
151
152
153
154
155
156
157
158
    num_layers = 2
    g = generate_rand_graph(100)
    nf = create_node_flow(g, num_layers)
    nf.copy_from_parent()
    g.ndata['h'] = g.ndata['h1']
    nf.layers[0].data['h'] = nf.layers[0].data['h1']
    # Test the computation on a layer at a time.
    for i in range(num_layers):
159
160
        l = -num_layers + i if use_negative_block_id else i
        nf.block_compute(l, fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
Da Zheng's avatar
Da Zheng committed
161
162
163
                         lambda nodes: {'h' : nodes.data['t'] + 1})
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
                     lambda nodes: {'h' : nodes.data['t'] + 1})
164
        assert F.allclose(nf.layers[i + 1].data['h'], g.nodes[nf.layer_parent_nid(i + 1)].data['h'])
Da Zheng's avatar
Da Zheng committed
165
166
167
168

    # Test the computation when only a few nodes are active in a layer.
    g.ndata['h'] = g.ndata['h1']
    for i in range(num_layers):
169
        l = -num_layers + i if use_negative_block_id else i
Da Zheng's avatar
Da Zheng committed
170
        vs = nf.layer_nid(i+1)[0:4]
171
        nf.block_compute(l, fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
Da Zheng's avatar
Da Zheng committed
172
173
174
175
                        lambda nodes: {'h' : nodes.data['t'] + 1}, v=vs)
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
                     lambda nodes: {'h' : nodes.data['t'] + 1})
        data1 = nf.layers[i + 1].data['h'][0:4]
176
177
        data2 = g.nodes[nf.map_to_parent_nid(vs)].data['h']
        assert F.allclose(data1, data2)
Da Zheng's avatar
Da Zheng committed
178
179
180


def test_flow_compute():
181
    check_flow_compute(create_full_nodeflow)
Da Zheng's avatar
Da Zheng committed
182
    check_flow_compute(create_mini_batch)
183
184
    check_flow_compute(create_full_nodeflow, use_negative_block_id=True)
    check_flow_compute(create_mini_batch, use_negative_block_id=True)
Da Zheng's avatar
Da Zheng committed
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200


def check_prop_flows(create_node_flow):
    num_layers = 2
    g = generate_rand_graph(100)
    g.ndata['h'] = g.ndata['h1']
    nf2 = create_node_flow(g, num_layers)
    nf2.copy_from_parent()
    # Test the computation on a layer at a time.
    for i in range(num_layers):
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
                     lambda nodes: {'h' : nodes.data['t'] + 1})

    # Test the computation on all layers.
    nf2.prop_flow(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
                  lambda nodes: {'h' : nodes.data['t'] + 1})
201
    assert F.allclose(nf2.layers[-1].data['h'], g.nodes[nf2.layer_parent_nid(-1)].data['h'])
Da Zheng's avatar
Da Zheng committed
202
203
204


def test_prop_flows():
205
    check_prop_flows(create_full_nodeflow)
Da Zheng's avatar
Da Zheng committed
206
207
208
209
210
211
212
213
214
215
216
217
218
    check_prop_flows(create_mini_batch)


def test_copy():
    num_layers = 2
    g = generate_rand_graph(100)
    g.ndata['h'] = g.ndata['h1']
    nf = create_mini_batch(g, num_layers)
    nf.copy_from_parent()
    for i in range(nf.num_layers):
        assert len(g.ndata.keys()) == len(nf.layers[i].data.keys())
        for key in g.ndata.keys():
            assert key in nf.layers[i].data.keys()
219
            assert F.array_equal(nf.layers[i].data[key], g.nodes[nf.layer_parent_nid(i)].data[key])
Da Zheng's avatar
Da Zheng committed
220
221
222
223
    for i in range(nf.num_blocks):
        assert len(g.edata.keys()) == len(nf.blocks[i].data.keys())
        for key in g.edata.keys():
            assert key in nf.blocks[i].data.keys()
224
            assert F.array_equal(nf.blocks[i].data[key], g.edges[nf.block_parent_eid(i)].data[key])
Da Zheng's avatar
Da Zheng committed
225
226
227
228
229
230
231
232
233

    nf = create_mini_batch(g, num_layers)
    node_embed_names = [['h'], ['h1'], ['h']]
    edge_embed_names = [['h2'], ['h2']]
    nf.copy_from_parent(node_embed_names=node_embed_names, edge_embed_names=edge_embed_names)
    for i in range(nf.num_layers):
        assert len(node_embed_names[i]) == len(nf.layers[i].data.keys())
        for key in node_embed_names[i]:
            assert key in nf.layers[i].data.keys()
234
            assert F.array_equal(nf.layers[i].data[key], g.nodes[nf.layer_parent_nid(i)].data[key])
Da Zheng's avatar
Da Zheng committed
235
236
237
238
    for i in range(nf.num_blocks):
        assert len(edge_embed_names[i]) == len(nf.blocks[i].data.keys())
        for key in edge_embed_names[i]:
            assert key in nf.blocks[i].data.keys()
239
            assert F.array_equal(nf.blocks[i].data[key], g.edges[nf.block_parent_eid(i)].data[key])
Da Zheng's avatar
Da Zheng committed
240
241
242
243
244
245
246
247
248
249

    nf = create_mini_batch(g, num_layers)
    g.ndata['h0'] = F.clone(g.ndata['h'])
    node_embed_names = [['h0'], [], []]
    nf.copy_from_parent(node_embed_names=node_embed_names, edge_embed_names=None)
    for i in range(num_layers):
        nf.block_compute(i, fn.copy_src(src='h%d' % i, out='m'), fn.sum(msg='m', out='t'),
                         lambda nodes: {'h%d' % (i+1) : nodes.data['t'] + 1})
        g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
                     lambda nodes: {'h' : nodes.data['t'] + 1})
250
251
        assert F.allclose(nf.layers[i + 1].data['h%d' % (i+1)],
                          g.nodes[nf.layer_parent_nid(i + 1)].data['h'])
Da Zheng's avatar
Da Zheng committed
252
253
254
    nf.copy_to_parent(node_embed_names=[['h0'], ['h1'], ['h2']])
    for i in range(num_layers + 1):
        assert F.array_equal(nf.layers[i].data['h%d' % i],
255
                             g.nodes[nf.layer_parent_nid(i)].data['h%d' % i])
Da Zheng's avatar
Da Zheng committed
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274

    nf = create_mini_batch(g, num_layers)
    g.ndata['h0'] = F.clone(g.ndata['h'])
    g.ndata['h1'] = F.clone(g.ndata['h'])
    g.ndata['h2'] = F.clone(g.ndata['h'])
    node_embed_names = [['h0'], ['h1'], ['h2']]
    nf.copy_from_parent(node_embed_names=node_embed_names, edge_embed_names=None)

    def msg_func(edge, ind):
        assert 'h%d' % ind in edge.src.keys()
        return {'m' : edge.src['h%d' % ind]}
    def reduce_func(node, ind):
        assert 'h%d' % (ind + 1) in node.data.keys()
        return {'h' : F.sum(node.mailbox['m'], 1) + node.data['h%d' % (ind + 1)]}

    for i in range(num_layers):
        nf.block_compute(i, partial(msg_func, ind=i), partial(reduce_func, ind=i))


275
def test_block_edges():
276
277
278
279
280
    num_layers = 3
    g = generate_rand_graph(100)
    nf = create_mini_batch(g, num_layers)
    assert nf.num_layers == num_layers + 1
    for i in range(nf.num_blocks):
281
        src, dst, eid = nf.block_edges(i, remap=True)
282
283

        # should also work for negative block ids
284
        src_by_neg, dst_by_neg, eid_by_neg = nf.block_edges(-nf.num_blocks + i, remap=True)
285
286
287
288
        assert F.array_equal(src, src_by_neg)
        assert F.array_equal(dst, dst_by_neg)
        assert F.array_equal(eid, eid_by_neg)

289
290
291
292
293
294
295
        dest_nodes = utils.toindex(nf.layer_nid(i + 1))
        u, v, _ = nf._graph.in_edges(dest_nodes)
        u = nf._glb2lcl_nid(u.tousertensor(), i)
        v = nf._glb2lcl_nid(v.tousertensor(), i + 1)
        assert F.array_equal(src, u)
        assert F.array_equal(dst, v)

296
297
298
299
300
301
302

def test_block_adj_matrix():
    num_layers = 3
    g = generate_rand_graph(100)
    nf = create_mini_batch(g, num_layers)
    assert nf.num_layers == num_layers + 1
    for i in range(nf.num_blocks):
303
        u, v, _ = nf.block_edges(i, remap=True)
304
305
        adj, _ = nf.block_adjacency_matrix(i, F.cpu())
        adj = F.sparse_to_numpy(adj)
306
307
308
309
310

        # should also work for negative block ids
        adj_by_neg, _ = nf.block_adjacency_matrix(-nf.num_blocks + i, F.cpu())
        adj_by_neg = F.sparse_to_numpy(adj_by_neg)

311
312
313
314
315
316
        data = np.ones((len(u)), dtype=np.float32)
        v = utils.toindex(v)
        u = utils.toindex(u)
        coo = sp.sparse.coo_matrix((data, (v.tonumpy(), u.tonumpy())),
                                   shape=adj.shape).todense()
        assert np.array_equal(adj, coo)
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
        assert np.array_equal(adj_by_neg, coo)


def test_block_incidence_matrix():
    num_layers = 3
    g = generate_rand_graph(100)
    nf = create_mini_batch(g, num_layers)
    assert nf.num_layers == num_layers + 1
    for i in range(nf.num_blocks):
        typestrs = ["in", "out"] # todo need fix for "both"
        adjs = []
        for typestr in typestrs:
            adj, _ = nf.block_incidence_matrix(i, typestr, F.cpu())
            adj = F.sparse_to_numpy(adj)
            adjs.append(adj)

        # should work for negative block ids
        adjs_by_neg = []
        for typestr in typestrs:
            adj_by_neg, _ = nf.block_incidence_matrix(-nf.num_blocks + i, typestr, F.cpu())
            adj_by_neg = F.sparse_to_numpy(adj_by_neg)
            adjs_by_neg.append(adj_by_neg)

340
        u, v, e = nf.block_edges(i, remap=True)
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
        u = utils.toindex(u)
        v = utils.toindex(v)
        e = utils.toindex(e)

        expected = []
        data_in_and_out = np.ones((len(u)), dtype=np.float32)
        expected.append(
            sp.sparse.coo_matrix((data_in_and_out, (v.tonumpy(), e.tonumpy())),
                                 shape=adjs[0].shape).todense()
        )
        expected.append(
            sp.sparse.coo_matrix((data_in_and_out, (u.tonumpy(), e.tonumpy())),
                                 shape=adjs[1].shape).todense()
        )
        for i in range(len(typestrs)):
            assert np.array_equal(adjs[i], expected[i])
            assert np.array_equal(adjs_by_neg[i], expected[i])
358
359


Da Zheng's avatar
Da Zheng committed
360
361
if __name__ == '__main__':
    test_basic()
362
    test_block_adj_matrix()
Da Zheng's avatar
Da Zheng committed
363
364
365
366
367
    test_copy()
    test_apply_nodes()
    test_apply_edges()
    test_flow_compute()
    test_prop_flows()
368
    test_self_loop()
369
    test_block_edges()
370
    test_block_incidence_matrix()