test_multi_send_recv.py 10.5 KB
Newer Older
1
2
3
4
5
import numpy as np
import dgl
from dgl.graph import DGLGraph
from collections import defaultdict as ddict
import scipy.sparse as sp
6
import backend as F
7
8
9
10
11
12
13
14
15
16
17
18

D = 5

def message_func(edges):
    assert len(edges.src['h'].shape) == 2
    assert edges.src['h'].shape[1] == D
    return {'m' : edges.src['h']}

def reduce_func(nodes):
    msgs = nodes.mailbox['m']
    assert len(msgs.shape) == 3
    assert msgs.shape[2] == D
19
    return {'accum' : F.sum(msgs, 1)}
20
21
22
23
24
25
26
27
28
29
30
31

def apply_node_func(nodes):
    return {'h' : nodes.data['h'] + nodes.data['accum']}

def generate_graph(grad=False):
    g = DGLGraph()
    g.add_nodes(10) # 10 nodes.
    # create a graph where 0 is the source and 9 is the sink
    # 16 edges
    for i in range(1, 9):
        g.add_edge(0, i)
        g.add_edge(i, 9)
32
33
34
35
36
    ncol = F.randn((10, D))
    ecol = F.randn((16, D))
    if grad:
        ncol = F.attach_grad(ncol)
        ecol = F.attach_grad(ecol)
37
38
39
40
41
42
43
44
45
46
47
48
49
    g.set_n_initializer(dgl.init.zero_initializer)
    g.set_e_initializer(dgl.init.zero_initializer)
    g.ndata['h'] = ncol
    g.edata['w'] = ecol
    return g

def test_multi_send():
    g = generate_graph()
    def _fmsg(edges):
        assert edges.src['h'].shape == (5, D)
        return {'m' : edges.src['h']}
    g.register_message_func(_fmsg)
    # many-many send
50
51
    u = F.tensor([0, 0, 0, 0, 0])
    v = F.tensor([1, 2, 3, 4, 5])
52
53
    g.send((u, v))
    # duplicate send
54
55
    u = F.tensor([0])
    v = F.tensor([1, 2, 3, 4, 5])
56
57
    g.send((u, v))
    # send more
58
59
    u = F.tensor([1, 2, 3, 4, 5])
    v = F.tensor([9])
60
61
62
    g.send((u, v))

    # check if message indicator is as expected
63
    expected = F.copy_to(F.zeros((g.number_of_edges(),), dtype=F.int64), F.cpu())
64
65
    eid = g.edge_ids([0, 0, 0, 0, 0, 1, 2, 3, 4, 5],
                     [1, 2, 3, 4, 5, 9, 9, 9, 9, 9])
VoVAllen's avatar
VoVAllen committed
66
67
    expected = F.asnumpy(expected)
    eid = F.asnumpy(eid)
68
    expected[eid] = 1
VoVAllen's avatar
VoVAllen committed
69
    assert np.array_equal(g._get_msg_index().tonumpy(), expected)
70
71
72
73
74
75
76
77

def test_multi_recv():
    # basic recv test
    g = generate_graph()
    h = g.ndata['h']
    g.register_message_func(message_func)
    g.register_reduce_func(reduce_func)
    g.register_apply_node_func(apply_node_func)
78
    expected = F.copy_to(F.zeros((g.number_of_edges(),), dtype=F.int64), F.cpu())
79
80
81
82
83
    # two separate round of send and recv
    u = [4, 5, 6]
    v = [9]
    g.send((u, v))
    eid = g.edge_ids(u, v)
VoVAllen's avatar
VoVAllen committed
84
85
    expected = F.asnumpy(expected)
    eid = F.asnumpy(eid)
86
    expected[eid] = 1
VoVAllen's avatar
VoVAllen committed
87
    assert np.array_equal(g._get_msg_index().tonumpy(), expected)
88
89
    g.recv(v)
    expected[eid] = 0
VoVAllen's avatar
VoVAllen committed
90
    assert np.array_equal(g._get_msg_index().tonumpy(), expected)
91
92
93
94
95

    u = [0]
    v = [1, 2, 3]
    g.send((u, v))
    eid = g.edge_ids(u, v)
VoVAllen's avatar
VoVAllen committed
96
    eid = F.asnumpy(eid)
97
    expected[eid] = 1
VoVAllen's avatar
VoVAllen committed
98
    assert np.array_equal(g._get_msg_index().tonumpy(), expected)
99
100
    g.recv(v)
    expected[eid] = 0
VoVAllen's avatar
VoVAllen committed
101
    assert np.array_equal(g._get_msg_index().tonumpy(), expected)
102
103
104
105
106

    h1 = g.ndata['h']

    # one send, two recv
    g.ndata['h'] = h
107
108
    u = F.tensor([0, 0, 0, 4, 5, 6])
    v = F.tensor([1, 2, 3, 9, 9, 9])
109
110
    g.send((u, v))
    eid = g.edge_ids(u, v)
VoVAllen's avatar
VoVAllen committed
111
    eid = F.asnumpy(eid)
112
    expected[eid] = 1
VoVAllen's avatar
VoVAllen committed
113
    assert np.array_equal(g._get_msg_index().tonumpy(), expected)
114
115
116
117
    u = [4, 5, 6]
    v = [9]
    g.recv(v)
    eid = g.edge_ids(u, v)
VoVAllen's avatar
VoVAllen committed
118
    eid = F.asnumpy(eid)
119
    expected[eid] = 0
VoVAllen's avatar
VoVAllen committed
120
    assert np.array_equal(g._get_msg_index().tonumpy(), expected)
121
122
123
124
    u = [0]
    v = [1, 2, 3]
    g.recv(v)
    eid = g.edge_ids(u, v)
VoVAllen's avatar
VoVAllen committed
125
    eid = F.asnumpy(eid)
126
    expected[eid] = 0
VoVAllen's avatar
VoVAllen committed
127
    assert np.array_equal(g._get_msg_index().tonumpy(), expected)
128
129

    h2 = g.ndata['h']
130
    assert F.allclose(h1, h2)
131
132
133
134
135
136
137

def test_multi_recv_0deg():
    # test recv with 0deg nodes;
    g = DGLGraph()
    def _message(edges):
        return {'m' : edges.src['h']}
    def _reduce(nodes):
VoVAllen's avatar
VoVAllen committed
138
        return {'h' : nodes.data['h'] + F.sum(nodes.mailbox['m'], 1)}
139
140
141
    def _apply(nodes):
        return {'h' : nodes.data['h'] * 2}
    def _init2(shape, dtype, ctx, ids):
142
        return 2 + F.zeros(shape, dtype=dtype, ctx=ctx)
143
144
145
146
147
148
149
    g.register_message_func(_message)
    g.register_reduce_func(_reduce)
    g.register_apply_node_func(_apply)
    g.set_n_initializer(_init2)
    g.add_nodes(2)
    g.add_edge(0, 1)
    # recv both 0deg and non-0deg nodes
150
    old = F.randn((2, 5))
151
152
153
154
155
    g.ndata['h'] = old
    g.send((0, 1))
    g.recv([0, 1])
    new = g.ndata['h']
    # 0deg check: initialized with the func and got applied
156
    assert F.allclose(new[0], F.full((5,), 4, F.float32))
157
    # non-0deg check
158
    assert F.allclose(new[1], F.sum(old, 0) * 2)
159
160
161

    # recv again on zero degree node
    g.recv([0])
162
    assert F.allclose(g.nodes[0].data['h'], F.full((5,), 8, F.float32))
163
164
165

    # recv again on node with no incoming message
    g.recv([1])
166
    assert F.allclose(g.nodes[1].data['h'], F.sum(old, 0) * 4)
167
168
169
170
171
172

def test_send_twice_different_shape():
    g = generate_graph()
    def _message_1(edges):
        return {'h': edges.src['h']}
    def _message_2(edges):
173
        return {'h': F.cat((edges.src['h'], edges.data['w']), dim=1)}
174
175
176
177
178
179
180
181
182
183
184
185
186
187
    g.send(message_func=_message_1)
    g.send(message_func=_message_2)

def test_send_twice_different_msg():
    g = DGLGraph()
    g.set_n_initializer(dgl.init.zero_initializer)
    g.add_nodes(3)
    g.add_edge(0, 1)
    g.add_edge(2, 1)
    def _message_a(edges):
        return {'a': edges.src['a']}
    def _message_b(edges):
        return {'a': edges.src['a'] * 3}
    def _reduce(nodes):
188
        return {'a': F.max(nodes.mailbox['a'], 1)}
189

190
    old_repr = F.randn((3, 5))
191
192
193
194
195
    g.ndata['a'] = old_repr
    g.send((0, 1), _message_a)
    g.send((0, 1), _message_b)
    g.recv(1, _reduce)
    new_repr = g.ndata['a']
196
    assert F.allclose(new_repr[1], old_repr[0] * 3)
197
198
199
200
201
202

    g.ndata['a'] = old_repr
    g.send((0, 1), _message_a)
    g.send((2, 1), _message_b)
    g.recv(1, _reduce)
    new_repr = g.ndata['a']
203
    assert F.allclose(new_repr[1], F.max(F.stack([old_repr[0], old_repr[2] * 3], 0), 0))
204
205
206
207
208
209
210
211
212
213
214

def test_send_twice_different_field():
    g = DGLGraph()
    g.set_n_initializer(dgl.init.zero_initializer)
    g.add_nodes(2)
    g.add_edge(0, 1)
    def _message_a(edges):
        return {'a': edges.src['a']}
    def _message_b(edges):
        return {'b': edges.src['b']}
    def _reduce(nodes):
215
216
217
        return {'a': F.sum(nodes.mailbox['a'], 1), 'b': F.sum(nodes.mailbox['b'], 1)}
    old_a = F.randn((2, 5))
    old_b = F.randn((2, 5))
218
219
220
221
222
    g.set_n_repr({'a': old_a, 'b': old_b})
    g.send((0, 1), _message_a)
    g.send((0, 1), _message_b)
    g.recv([1], _reduce)
    new_repr = g.get_n_repr()
223
224
    assert F.allclose(new_repr['a'][1], old_a[0])
    assert F.allclose(new_repr['b'][1], old_b[0])
225
226
227
228
229
230
231

def test_dynamic_addition():
    N = 3
    D = 1

    g = DGLGraph()
    def _init(shape, dtype, ctx, ids):
232
        return F.copy_to(F.astype(F.randn(shape), dtype), ctx)
233
234
235
236
237
238
239
    g.set_n_initializer(_init)
    g.set_e_initializer(_init)

    def _message(edges):
        return {'m' : edges.src['h1'] + edges.dst['h2'] + edges.data['h1'] +
                edges.data['h2']}
    def _reduce(nodes):
240
        return {'h' : F.sum(nodes.mailbox['m'], 1)}
241
242
243
244
245
246
247
248
249
250
251
    def _apply(nodes):
        return {'h' : nodes.data['h']}

    g.register_message_func(_message)
    g.register_reduce_func(_reduce)
    g.register_apply_node_func(_apply)
    g.set_n_initializer(dgl.init.zero_initializer)
    g.set_e_initializer(dgl.init.zero_initializer)

    # add nodes and edges
    g.add_nodes(N)
252
253
    g.ndata.update({'h1': F.randn((N, D)),
                    'h2': F.randn((N, D))})
254
255
256
    g.add_nodes(3)
    g.add_edge(0, 1)
    g.add_edge(1, 0)
257
258
    g.edata.update({'h1': F.randn((2, D)),
                    'h2': F.randn((2, D))})
259
    g.send()
260
    expected = F.copy_to(F.ones((g.number_of_edges(),), dtype=F.int64), F.cpu())
261
    assert F.array_equal(g._get_msg_index().tousertensor(), expected)
262
263

    # add more edges
264
    g.add_edges([0, 2], [2, 0], {'h1': F.randn((2, D))})
265
266
267
268
    g.send(([0, 2], [2, 0]))
    g.recv(0)

    g.add_edge(1, 2)
269
    g.edges[4].data['h1'] = F.randn((1, D))
270
271
272
273
274
275
276
277
    g.send((1, 2))
    g.recv([1, 2])

    h = g.ndata.pop('h')

    # a complete round of send and recv
    g.send()
    g.recv()
278
    assert F.allclose(h, g.ndata['h'])
279
280
281
282
283
284
285
286
287

def test_recv_no_send():
    g = generate_graph()
    g.recv(1, reduce_func)
    # test recv after clear
    g.clear()
    g.add_nodes(3)
    g.add_edges([0, 1], [1, 2])
    g.set_n_initializer(dgl.init.zero_initializer)
288
    g.ndata['h'] = F.randn((3, D))
289
    g.send((1, 2), message_func)
290
    expected = F.copy_to(F.zeros(2, dtype=F.int64), F.cpu())
VoVAllen's avatar
VoVAllen committed
291
    expected = F.asnumpy(expected)
292
    expected[1] = 1
VoVAllen's avatar
VoVAllen committed
293
    assert np.array_equal(g._get_msg_index().tonumpy(), expected)
294
295
    g.recv(2, reduce_func)
    expected[1] = 0
VoVAllen's avatar
VoVAllen committed
296
    assert np.array_equal(g._get_msg_index().tonumpy(), expected)
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315

def test_send_recv_after_conversion():
    # test send and recv after converting from a graph with edges

    g = generate_graph()

    # nx graph
    nxg = g.to_networkx(node_attrs=['h'])
    g1 = DGLGraph()
    # some random node and edges
    g1.add_nodes(4)
    g1.add_edges([1, 2], [2, 3])
    g1.set_n_initializer(dgl.init.zero_initializer)
    g1.from_networkx(nxg, node_attrs=['h'])

    # sparse matrix
    row, col= g.all_edges()
    data = range(len(row))
    n = g.number_of_nodes()
316
317
318
    a = sp.coo_matrix(
            (data, (F.zerocopy_to_numpy(row), F.zerocopy_to_numpy(col))),
            shape=(n, n))
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
    g2 = DGLGraph()
    # some random node and edges
    g2.add_nodes(5)
    g2.add_edges([1, 2, 4], [2, 3, 0])
    g2.set_n_initializer(dgl.init.zero_initializer)
    g2.from_scipy_sparse_matrix(a)
    g2.ndata['h'] = g.ndata['h']

    # on dgl graph
    g.send(message_func=message_func)
    g.recv([0, 1, 3, 5], reduce_func=reduce_func,
           apply_node_func=apply_node_func)
    g.recv([0, 2, 4, 8], reduce_func=reduce_func,
           apply_node_func=apply_node_func)

    # nx
    g1.send(message_func=message_func)
    g1.recv([0, 1, 3, 5], reduce_func=reduce_func,
            apply_node_func=apply_node_func)
    g1.recv([0, 2, 4, 8], reduce_func=reduce_func,
            apply_node_func=apply_node_func)

    # sparse matrix
    g2.send(message_func=message_func)
    g2.recv([0, 1, 3, 5], reduce_func=reduce_func,
            apply_node_func=apply_node_func)
    g2.recv([0, 2, 4, 8], reduce_func=reduce_func,
            apply_node_func=apply_node_func)

348
349
    assert F.allclose(g.ndata['h'], g1.ndata['h'])
    assert F.allclose(g.ndata['h'], g2.ndata['h'])
350
351
352
353
354
355
356
357
358
359
360
361


if __name__ == '__main__':
    test_multi_send()
    test_multi_recv()
    test_multi_recv_0deg()
    test_dynamic_addition()
    test_send_twice_different_shape()
    test_send_twice_different_msg()
    test_send_twice_different_field()
    test_recv_no_send()
    test_send_recv_after_conversion()