test_multi_send_recv.py 10.2 KB
Newer Older
1
2
3
4
5
import numpy as np
import dgl
from dgl.graph import DGLGraph
from collections import defaultdict as ddict
import scipy.sparse as sp
6
import backend as F
7
8
9
10
11
12
13
14
15
16
17
18

D = 5

def message_func(edges):
    assert len(edges.src['h'].shape) == 2
    assert edges.src['h'].shape[1] == D
    return {'m' : edges.src['h']}

def reduce_func(nodes):
    msgs = nodes.mailbox['m']
    assert len(msgs.shape) == 3
    assert msgs.shape[2] == D
19
    return {'accum' : F.sum(msgs, 1)}
20
21
22
23
24
25
26
27
28
29
30
31

def apply_node_func(nodes):
    return {'h' : nodes.data['h'] + nodes.data['accum']}

def generate_graph(grad=False):
    g = DGLGraph()
    g.add_nodes(10) # 10 nodes.
    # create a graph where 0 is the source and 9 is the sink
    # 16 edges
    for i in range(1, 9):
        g.add_edge(0, i)
        g.add_edge(i, 9)
32
33
34
35
36
    ncol = F.randn((10, D))
    ecol = F.randn((16, D))
    if grad:
        ncol = F.attach_grad(ncol)
        ecol = F.attach_grad(ecol)
37
38
39
40
41
42
43
44
45
46
47
48
49
    g.set_n_initializer(dgl.init.zero_initializer)
    g.set_e_initializer(dgl.init.zero_initializer)
    g.ndata['h'] = ncol
    g.edata['w'] = ecol
    return g

def test_multi_send():
    g = generate_graph()
    def _fmsg(edges):
        assert edges.src['h'].shape == (5, D)
        return {'m' : edges.src['h']}
    g.register_message_func(_fmsg)
    # many-many send
50
51
    u = F.tensor([0, 0, 0, 0, 0])
    v = F.tensor([1, 2, 3, 4, 5])
52
53
    g.send((u, v))
    # duplicate send
54
55
    u = F.tensor([0])
    v = F.tensor([1, 2, 3, 4, 5])
56
57
    g.send((u, v))
    # send more
58
59
    u = F.tensor([1, 2, 3, 4, 5])
    v = F.tensor([9])
60
61
62
    g.send((u, v))

    # check if message indicator is as expected
63
    expected = F.zeros((g.number_of_edges(),), dtype=F.int64)
64
65
66
    eid = g.edge_ids([0, 0, 0, 0, 0, 1, 2, 3, 4, 5],
                     [1, 2, 3, 4, 5, 9, 9, 9, 9, 9])
    expected[eid] = 1
67
    assert F.array_equal(g._get_msg_index().tousertensor(), expected)
68
69
70
71
72
73
74
75

def test_multi_recv():
    # basic recv test
    g = generate_graph()
    h = g.ndata['h']
    g.register_message_func(message_func)
    g.register_reduce_func(reduce_func)
    g.register_apply_node_func(apply_node_func)
76
    expected = F.zeros((g.number_of_edges(),), dtype=F.int64)
77
78
79
80
81
82
    # two separate round of send and recv
    u = [4, 5, 6]
    v = [9]
    g.send((u, v))
    eid = g.edge_ids(u, v)
    expected[eid] = 1
83
    assert F.array_equal(g._get_msg_index().tousertensor(), expected)
84
85
    g.recv(v)
    expected[eid] = 0
86
    assert F.array_equal(g._get_msg_index().tousertensor(), expected)
87
88
89
90
91
92

    u = [0]
    v = [1, 2, 3]
    g.send((u, v))
    eid = g.edge_ids(u, v)
    expected[eid] = 1
93
    assert F.array_equal(g._get_msg_index().tousertensor(), expected)
94
95
    g.recv(v)
    expected[eid] = 0
96
    assert F.array_equal(g._get_msg_index().tousertensor(), expected)
97
98
99
100
101

    h1 = g.ndata['h']

    # one send, two recv
    g.ndata['h'] = h
102
103
    u = F.tensor([0, 0, 0, 4, 5, 6])
    v = F.tensor([1, 2, 3, 9, 9, 9])
104
105
106
    g.send((u, v))
    eid = g.edge_ids(u, v)
    expected[eid] = 1
107
    assert F.array_equal(g._get_msg_index().tousertensor(), expected)
108
109
110
111
112
    u = [4, 5, 6]
    v = [9]
    g.recv(v)
    eid = g.edge_ids(u, v)
    expected[eid] = 0
113
    assert F.array_equal(g._get_msg_index().tousertensor(), expected)
114
115
116
117
118
    u = [0]
    v = [1, 2, 3]
    g.recv(v)
    eid = g.edge_ids(u, v)
    expected[eid] = 0
119
    assert F.array_equal(g._get_msg_index().tousertensor(), expected)
120
121

    h2 = g.ndata['h']
122
    assert F.allclose(h1, h2)
123
124
125
126
127
128
129
130
131
132
133

def test_multi_recv_0deg():
    # test recv with 0deg nodes;
    g = DGLGraph()
    def _message(edges):
        return {'m' : edges.src['h']}
    def _reduce(nodes):
        return {'h' : nodes.data['h'] + nodes.mailbox['m'].sum(1)}
    def _apply(nodes):
        return {'h' : nodes.data['h'] * 2}
    def _init2(shape, dtype, ctx, ids):
134
        return 2 + F.zeros(shape, dtype=dtype, ctx=ctx)
135
136
137
138
139
140
141
    g.register_message_func(_message)
    g.register_reduce_func(_reduce)
    g.register_apply_node_func(_apply)
    g.set_n_initializer(_init2)
    g.add_nodes(2)
    g.add_edge(0, 1)
    # recv both 0deg and non-0deg nodes
142
    old = F.randn((2, 5))
143
144
145
146
147
    g.ndata['h'] = old
    g.send((0, 1))
    g.recv([0, 1])
    new = g.ndata['h']
    # 0deg check: initialized with the func and got applied
148
    assert F.allclose(new[0], F.full((5,), 4, F.float32))
149
    # non-0deg check
150
    assert F.allclose(new[1], F.sum(old, 0) * 2)
151
152
153

    # recv again on zero degree node
    g.recv([0])
154
    assert F.allclose(g.nodes[0].data['h'], F.full((5,), 8, F.float32))
155
156
157

    # recv again on node with no incoming message
    g.recv([1])
158
    assert F.allclose(g.nodes[1].data['h'], F.sum(old, 0) * 4)
159
160
161
162
163
164

def test_send_twice_different_shape():
    g = generate_graph()
    def _message_1(edges):
        return {'h': edges.src['h']}
    def _message_2(edges):
165
        return {'h': F.cat((edges.src['h'], edges.data['w']), dim=1)}
166
167
168
169
170
171
172
173
174
175
176
177
178
179
    g.send(message_func=_message_1)
    g.send(message_func=_message_2)

def test_send_twice_different_msg():
    g = DGLGraph()
    g.set_n_initializer(dgl.init.zero_initializer)
    g.add_nodes(3)
    g.add_edge(0, 1)
    g.add_edge(2, 1)
    def _message_a(edges):
        return {'a': edges.src['a']}
    def _message_b(edges):
        return {'a': edges.src['a'] * 3}
    def _reduce(nodes):
180
        return {'a': F.max(nodes.mailbox['a'], 1)}
181

182
    old_repr = F.randn((3, 5))
183
184
185
186
187
    g.ndata['a'] = old_repr
    g.send((0, 1), _message_a)
    g.send((0, 1), _message_b)
    g.recv(1, _reduce)
    new_repr = g.ndata['a']
188
    assert F.allclose(new_repr[1], old_repr[0] * 3)
189
190
191
192
193
194

    g.ndata['a'] = old_repr
    g.send((0, 1), _message_a)
    g.send((2, 1), _message_b)
    g.recv(1, _reduce)
    new_repr = g.ndata['a']
195
    assert F.allclose(new_repr[1], F.max(F.stack([old_repr[0], old_repr[2] * 3], 0), 0))
196
197
198
199
200
201
202
203
204
205
206

def test_send_twice_different_field():
    g = DGLGraph()
    g.set_n_initializer(dgl.init.zero_initializer)
    g.add_nodes(2)
    g.add_edge(0, 1)
    def _message_a(edges):
        return {'a': edges.src['a']}
    def _message_b(edges):
        return {'b': edges.src['b']}
    def _reduce(nodes):
207
208
209
        return {'a': F.sum(nodes.mailbox['a'], 1), 'b': F.sum(nodes.mailbox['b'], 1)}
    old_a = F.randn((2, 5))
    old_b = F.randn((2, 5))
210
211
212
213
214
    g.set_n_repr({'a': old_a, 'b': old_b})
    g.send((0, 1), _message_a)
    g.send((0, 1), _message_b)
    g.recv([1], _reduce)
    new_repr = g.get_n_repr()
215
216
    assert F.allclose(new_repr['a'][1], old_a[0])
    assert F.allclose(new_repr['b'][1], old_b[0])
217
218
219
220
221
222
223

def test_dynamic_addition():
    N = 3
    D = 1

    g = DGLGraph()
    def _init(shape, dtype, ctx, ids):
224
        return F.copy_to(F.astype(F.randn(shape), dtype), ctx)
225
226
227
228
229
230
231
    g.set_n_initializer(_init)
    g.set_e_initializer(_init)

    def _message(edges):
        return {'m' : edges.src['h1'] + edges.dst['h2'] + edges.data['h1'] +
                edges.data['h2']}
    def _reduce(nodes):
232
        return {'h' : F.sum(nodes.mailbox['m'], 1)}
233
234
235
236
237
238
239
240
241
242
243
    def _apply(nodes):
        return {'h' : nodes.data['h']}

    g.register_message_func(_message)
    g.register_reduce_func(_reduce)
    g.register_apply_node_func(_apply)
    g.set_n_initializer(dgl.init.zero_initializer)
    g.set_e_initializer(dgl.init.zero_initializer)

    # add nodes and edges
    g.add_nodes(N)
244
245
    g.ndata.update({'h1': F.randn((N, D)),
                    'h2': F.randn((N, D))})
246
247
248
    g.add_nodes(3)
    g.add_edge(0, 1)
    g.add_edge(1, 0)
249
250
    g.edata.update({'h1': F.randn((2, D)),
                    'h2': F.randn((2, D))})
251
    g.send()
252
    expected = F.ones((g.number_of_edges(),), dtype=F.int64)
253
    assert F.array_equal(g._get_msg_index().tousertensor(), expected)
254
255

    # add more edges
256
    g.add_edges([0, 2], [2, 0], {'h1': F.randn((2, D))})
257
258
259
260
    g.send(([0, 2], [2, 0]))
    g.recv(0)

    g.add_edge(1, 2)
261
    g.edges[4].data['h1'] = F.randn((1, D))
262
263
264
265
266
267
268
269
    g.send((1, 2))
    g.recv([1, 2])

    h = g.ndata.pop('h')

    # a complete round of send and recv
    g.send()
    g.recv()
270
    assert F.allclose(h, g.ndata['h'])
271
272
273
274
275
276
277
278
279

def test_recv_no_send():
    g = generate_graph()
    g.recv(1, reduce_func)
    # test recv after clear
    g.clear()
    g.add_nodes(3)
    g.add_edges([0, 1], [1, 2])
    g.set_n_initializer(dgl.init.zero_initializer)
280
    g.ndata['h'] = F.randn((3, D))
281
    g.send((1, 2), message_func)
282
    expected = F.zeros((2,), dtype=F.int64)
283
    expected[1] = 1
284
    assert F.array_equal(g._get_msg_index().tousertensor(), expected)
285
286
    g.recv(2, reduce_func)
    expected[1] = 0
287
    assert F.array_equal(g._get_msg_index().tousertensor(), expected)
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306

def test_send_recv_after_conversion():
    # test send and recv after converting from a graph with edges

    g = generate_graph()

    # nx graph
    nxg = g.to_networkx(node_attrs=['h'])
    g1 = DGLGraph()
    # some random node and edges
    g1.add_nodes(4)
    g1.add_edges([1, 2], [2, 3])
    g1.set_n_initializer(dgl.init.zero_initializer)
    g1.from_networkx(nxg, node_attrs=['h'])

    # sparse matrix
    row, col= g.all_edges()
    data = range(len(row))
    n = g.number_of_nodes()
307
308
309
    a = sp.coo_matrix(
            (data, (F.zerocopy_to_numpy(row), F.zerocopy_to_numpy(col))),
            shape=(n, n))
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
    g2 = DGLGraph()
    # some random node and edges
    g2.add_nodes(5)
    g2.add_edges([1, 2, 4], [2, 3, 0])
    g2.set_n_initializer(dgl.init.zero_initializer)
    g2.from_scipy_sparse_matrix(a)
    g2.ndata['h'] = g.ndata['h']

    # on dgl graph
    g.send(message_func=message_func)
    g.recv([0, 1, 3, 5], reduce_func=reduce_func,
           apply_node_func=apply_node_func)
    g.recv([0, 2, 4, 8], reduce_func=reduce_func,
           apply_node_func=apply_node_func)

    # nx
    g1.send(message_func=message_func)
    g1.recv([0, 1, 3, 5], reduce_func=reduce_func,
            apply_node_func=apply_node_func)
    g1.recv([0, 2, 4, 8], reduce_func=reduce_func,
            apply_node_func=apply_node_func)

    # sparse matrix
    g2.send(message_func=message_func)
    g2.recv([0, 1, 3, 5], reduce_func=reduce_func,
            apply_node_func=apply_node_func)
    g2.recv([0, 2, 4, 8], reduce_func=reduce_func,
            apply_node_func=apply_node_func)

339
340
    assert F.allclose(g.ndata['h'], g1.ndata['h'])
    assert F.allclose(g.ndata['h'], g2.ndata['h'])
341
342
343
344
345
346
347
348
349
350
351
352


if __name__ == '__main__':
    test_multi_send()
    test_multi_recv()
    test_multi_recv_0deg()
    test_dynamic_addition()
    test_send_twice_different_shape()
    test_send_twice_different_msg()
    test_send_twice_different_field()
    test_recv_no_send()
    test_send_recv_after_conversion()