test_specialization.py 13 KB
Newer Older
1
import torch as th
2
3
import dgl
import dgl.function as fn
4
import utils as U
5

Minjie Wang's avatar
Minjie Wang committed
6
7
D = 5

8
def generate_graph():
9
    g = dgl.DGLGraph()
Minjie Wang's avatar
Minjie Wang committed
10
    g.add_nodes(10)
11
12
13
14
15
16
    # create a graph where 0 is the source and 9 is the sink
    for i in range(1, 9):
        g.add_edge(0, i)
        g.add_edge(i, 9)
    # add a back flow from 9 to 0
    g.add_edge(9, 0)
17
18
19
    g.set_n_repr({'f1' : th.randn(10,), 'f2' : th.randn(10, D)})
    weights = th.randn(17,)
    g.set_e_repr({'e1': weights, 'e2': th.unsqueeze(weights, 1)})
20
21
    return g

22
def test_v2v_update_all():
23
    def _test(fld):
24
25
        def message_func(edges):
            return {'m' : edges.src[fld]}
26

27
28
29
        def message_func_edge(edges):
            if len(edges.src[fld].shape) == 1:
                return {'m' : edges.src[fld] * edges.data['e1']}
30
            else:
31
                return {'m' : edges.src[fld] * edges.data['e2']}
32

33
34
        def reduce_func(nodes):
            return {fld : th.sum(nodes.mailbox['m'], 1)}
35

36
37
        def apply_func(nodes):
            return {fld : 2 * nodes.data[fld]}
38
39
        g = generate_graph()
        # update all
40
        v1 = g.ndata[fld]
Minjie Wang's avatar
Minjie Wang committed
41
        g.update_all(fn.copy_src(src=fld, out='m'), fn.sum(msg='m', out=fld), apply_func)
42
        v2 = g.ndata[fld]
43
        g.set_n_repr({fld : v1})
Minjie Wang's avatar
Minjie Wang committed
44
        g.update_all(message_func, reduce_func, apply_func)
45
        v3 = g.ndata[fld]
46
        assert U.allclose(v2, v3)
47
        # update all with edge weights
48
        v1 = g.ndata[fld]
Minjie Wang's avatar
Minjie Wang committed
49
50
        g.update_all(fn.src_mul_edge(src=fld, edge='e1', out='m'),
                fn.sum(msg='m', out=fld), apply_func)
51
        v2 = g.ndata[fld]
52
        g.set_n_repr({fld : v1})
Minjie Wang's avatar
Minjie Wang committed
53
54
        g.update_all(fn.src_mul_edge(src=fld, edge='e2', out='m'),
                fn.sum(msg='m', out=fld), apply_func)
55
        v3 = g.ndata[fld]
56
        g.set_n_repr({fld : v1})
Minjie Wang's avatar
Minjie Wang committed
57
        g.update_all(message_func_edge, reduce_func, apply_func)
58
        v4 = g.ndata[fld]
59
60
        assert U.allclose(v2, v3)
        assert U.allclose(v3, v4)
61
62
63
64
65
    # test 1d node features
    _test('f1')
    # test 2d node features
    _test('f2')

66
def test_v2v_snr():
Minjie Wang's avatar
Minjie Wang committed
67
68
    u = th.tensor([0, 0, 0, 3, 4, 9])
    v = th.tensor([1, 2, 3, 9, 9, 0])
69
    def _test(fld):
70
71
        def message_func(edges):
            return {'m' : edges.src[fld]}
72

73
74
75
        def message_func_edge(edges):
            if len(edges.src[fld].shape) == 1:
                return {'m' : edges.src[fld] * edges.data['e1']}
76
            else:
77
                return {'m' : edges.src[fld] * edges.data['e2']}
78

79
80
        def reduce_func(nodes):
            return {fld : th.sum(nodes.mailbox['m'], 1)}
81

82
83
        def apply_func(nodes):
            return {fld : 2 * nodes.data[fld]}
84
85
        g = generate_graph()
        # send and recv
86
87
        v1 = g.ndata[fld]
        g.send_and_recv((u, v), fn.copy_src(src=fld, out='m'),
Minjie Wang's avatar
Minjie Wang committed
88
                fn.sum(msg='m', out=fld), apply_func)
89
        v2 = g.ndata[fld]
90
        g.set_n_repr({fld : v1})
91
92
        g.send_and_recv((u, v), message_func, reduce_func, apply_func)
        v3 = g.ndata[fld]
93
        assert U.allclose(v2, v3)
94
        # send and recv with edge weights
95
96
        v1 = g.ndata[fld]
        g.send_and_recv((u, v), fn.src_mul_edge(src=fld, edge='e1', out='m'),
Minjie Wang's avatar
Minjie Wang committed
97
                fn.sum(msg='m', out=fld), apply_func)
98
        v2 = g.ndata[fld]
99
        g.set_n_repr({fld : v1})
100
        g.send_and_recv((u, v), fn.src_mul_edge(src=fld, edge='e2', out='m'),
Minjie Wang's avatar
Minjie Wang committed
101
                fn.sum(msg='m', out=fld), apply_func)
102
        v3 = g.ndata[fld]
103
        g.set_n_repr({fld : v1})
104
105
        g.send_and_recv((u, v), message_func_edge, reduce_func, apply_func)
        v4 = g.ndata[fld]
106
107
        assert U.allclose(v2, v3)
        assert U.allclose(v3, v4)
108
109
110
111
    # test 1d node features
    _test('f1')
    # test 2d node features
    _test('f2')
112

113
def test_v2v_update_all_multi_fn():
114
115
    def message_func(edges):
        return {'m2': edges.src['f2']}
116

117
118
    def message_func_edge(edges):
        return {'m2': edges.src['f2'] * edges.data['e2']}
119

120
    def reduce_func(nodes):
121
        return {'v1': th.sum(nodes.mailbox['m2'], 1)}
122
123

    g = generate_graph()
Minjie Wang's avatar
Minjie Wang committed
124
    g.set_n_repr({'v1' : th.zeros((10,)), 'v2' : th.zeros((10,))})
125
126
    fld = 'f2'

127
    g.update_all(message_func, reduce_func)
128
    v1 = g.ndata['v1']
129

Minjie Wang's avatar
Minjie Wang committed
130
    # 1 message, 2 reduces
131
    g.update_all(fn.copy_src(src=fld, out='m'), [fn.sum(msg='m', out='v2'), fn.sum(msg='m', out='v3')])
132
133
    v2 = g.ndata['v2']
    v3 = g.ndata['v3']
134
135
    assert U.allclose(v1, v2)
    assert U.allclose(v1, v3)
136
137
138

    # update all with edge weights, 2 message, 3 reduces
    g.update_all([fn.src_mul_edge(src=fld, edge='e1', out='m1'), fn.src_mul_edge(src=fld, edge='e2', out='m2')],
Minjie Wang's avatar
Minjie Wang committed
139
                 [fn.sum(msg='m1', out='v1'), fn.sum(msg='m2', out='v2'), fn.sum(msg='m1', out='v3')],
Minjie Wang's avatar
Minjie Wang committed
140
                 None)
141
142
143
    v1 = g.ndata['v1']
    v2 = g.ndata['v2']
    v3 = g.ndata['v3']
144
145
    assert U.allclose(v1, v2)
    assert U.allclose(v1, v3)
146
147

    # run UDF with single message and reduce
Minjie Wang's avatar
Minjie Wang committed
148
    g.update_all(message_func_edge, reduce_func, None)
149
    v2 = g.ndata['v2']
150
    assert U.allclose(v1, v2)
151

152
def test_v2v_snr_multi_fn():
153
154
155
    u = th.tensor([0, 0, 0, 3, 4, 9])
    v = th.tensor([1, 2, 3, 9, 9, 0])

156
157
    def message_func(edges):
        return {'m2': edges.src['f2']}
158

159
160
    def message_func_edge(edges):
        return {'m2': edges.src['f2'] * edges.data['e2']}
161

162
    def reduce_func(nodes):
163
        return {'v1' : th.sum(nodes.mailbox['m2'], 1)}
164
165

    g = generate_graph()
Minjie Wang's avatar
Minjie Wang committed
166
167
    g.set_n_repr({'v1' : th.zeros((10, D)), 'v2' : th.zeros((10, D)),
        'v3' : th.zeros((10, D))})
168
169
    fld = 'f2'

170
    g.send_and_recv((u, v), message_func, reduce_func)
171
    v1 = g.ndata['v1']
172

Minjie Wang's avatar
Minjie Wang committed
173
    # 1 message, 2 reduces
174
    g.send_and_recv((u, v),
Minjie Wang's avatar
Minjie Wang committed
175
176
177
            fn.copy_src(src=fld, out='m'),
            [fn.sum(msg='m', out='v2'), fn.sum(msg='m', out='v3')],
            None)
178
179
    v2 = g.ndata['v2']
    v3 = g.ndata['v3']
180
181
    assert U.allclose(v1, v2)
    assert U.allclose(v1, v3)
182
183

    # send and recv with edge weights, 2 message, 3 reduces
184
    g.send_and_recv((u, v),
185
                    [fn.src_mul_edge(src=fld, edge='e1', out='m1'), fn.src_mul_edge(src=fld, edge='e2', out='m2')],
Minjie Wang's avatar
Minjie Wang committed
186
                    [fn.sum(msg='m1', out='v1'), fn.sum(msg='m2', out='v2'), fn.sum(msg='m1', out='v3')],
Minjie Wang's avatar
Minjie Wang committed
187
                    None)
188
189
190
    v1 = g.ndata['v1']
    v2 = g.ndata['v2']
    v3 = g.ndata['v3']
191
192
    assert U.allclose(v1, v2)
    assert U.allclose(v1, v3)
193
194

    # run UDF with single message and reduce
195
    g.send_and_recv((u, v), message_func_edge,
Minjie Wang's avatar
Minjie Wang committed
196
            reduce_func, None)
197
    v2 = g.ndata['v2']
198
    assert U.allclose(v1, v2)
199

200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def test_e2v_update_all_multi_fn():
    def _test(fld):
        def message_func(edges):
            return {'m1' : edges.src[fld] + edges.dst[fld],
                    'm2' : edges.src[fld] * edges.dst[fld]}

        def reduce_func(nodes):
            return {fld : th.sum(nodes.mailbox['m1'] + nodes.mailbox['m2'], 1)}

        def apply_func(nodes):
            return {fld : 2 * nodes.data[fld]}

        def apply_func_2(nodes):
            return {fld : 2 * nodes.data['r1'] + 2 * nodes.data['r2']}

        g = generate_graph()
        # update all
        v1 = g.get_n_repr()[fld]
        # no specialization
        g.update_all(message_func, reduce_func, apply_func)
        v2 = g.get_n_repr()[fld]

        # user break reduce func into 2 builtin
        g.set_n_repr({fld : v1})
        g.update_all(message_func,
                     [fn.sum(msg='m1', out='r1'), fn.sum(msg='m2', out='r2')],
                     apply_func_2)
        v3 = g.get_n_repr()[fld]

        assert th.allclose(v2, v3)

    # test 1d node features
    _test('f1')
    # test 2d node features
    _test('f2')

def test_e2v_snr_multi_fn():
    u = th.tensor([0, 0, 0, 3, 4, 9])
    v = th.tensor([1, 2, 3, 9, 9, 0])
    def _test(fld):
        def message_func(edges):
            return {'m1' : edges.src[fld] + edges.dst[fld],
                    'm2' : edges.src[fld] * edges.dst[fld]}

        def reduce_func(nodes):
            return {fld : th.sum(nodes.mailbox['m1'] + nodes.mailbox['m2'], 1)}

        def apply_func(nodes):
            return {fld : 2 * nodes.data[fld]}

        def apply_func_2(nodes):
            return {fld : 2 * nodes.data['r1'] + 2 * nodes.data['r2']}

        g = generate_graph()
        # send_and_recv
        v1 = g.get_n_repr()[fld]
        # no specialization
        g.send_and_recv((u, v), message_func, reduce_func, apply_func)
        v2 = g.get_n_repr()[fld]

        # user break reduce func into 2 builtin
        g.set_n_repr({fld : v1})
        g.send_and_recv((u, v), message_func,
                        [fn.sum(msg='m1', out='r1'), fn.sum(msg='m2', out='r2')],
                        apply_func_2)
        v3 = g.get_n_repr()[fld]

        assert th.allclose(v2, v3)

    # test 1d node features
    _test('f1')
    # test 2d node features
    _test('f2')

def test_e2v_recv_multi_fn():
    u = th.tensor([0, 0, 0, 3, 4, 9])
    v = th.tensor([1, 2, 3, 9, 9, 0])
    def _test(fld):
        def message_func(edges):
            return {'m1' : edges.src[fld] + edges.dst[fld],
                    'm2' : edges.src[fld] * edges.dst[fld]}

        def reduce_func(nodes):
            return {fld : th.sum(nodes.mailbox['m1'] + nodes.mailbox['m2'], 1)}

        def apply_func(nodes):
            return {fld : 2 * nodes.data[fld]}

        def apply_func_2(nodes):
            return {fld : 2 * nodes.data['r1'] + 2 * nodes.data['r2']}

        g = generate_graph()
        # recv
        v1 = g.get_n_repr()[fld]
        # no specialization
        g.send((u, v), message_func)
        g.recv([0,1,2,3,9], reduce_func, apply_func)
        v2 = g.get_n_repr()[fld]

        # user break reduce func into 2 builtin
        g.set_n_repr({fld : v1})
        g.send((u, v), message_func)
        g.recv([0,1,2,3,9],
               [fn.sum(msg='m1', out='r1'), fn.sum(msg='m2', out='r2')],
               apply_func_2)
        v3 = g.get_n_repr()[fld]

        assert th.allclose(v2, v3)

    # test 1d node features
    _test('f1')
    # test 2d node features
    _test('f2')

def test_multi_fn_fallback():
    # create a graph with zero in degree nodes
    g = dgl.DGLGraph()
    g.add_nodes(10)
    for i in range(1, 9):
        g.add_edge(0, i)
        g.add_edge(i, 9)
    g.ndata['h'] = th.randn(10, D)
    g.edata['w1'] = th.randn(16,)
    g.edata['w2'] = th.randn(16, D)
    def _mfunc_hxw1(edges):
        return {'m1' : edges.src['h'] * th.unsqueeze(edges.data['w1'], 1)}
    def _mfunc_hxw2(edges):
        return {'m2' : edges.src['h'] * edges.data['w2']}
    def _rfunc_m1(nodes):
        return {'o1' : th.sum(nodes.mailbox['m1'], 1)}
    def _rfunc_m2(nodes):
        return {'o2' : th.sum(nodes.mailbox['m2'], 1)}
    def _rfunc_m1max(nodes):
        return {'o3' : th.max(nodes.mailbox['m1'], 1)[0]}
    def _afunc(nodes):
        ret = {}
        for k, v in nodes.data.items():
            if k.startswith('o'):
                ret[k] = 2 * v
        return ret
    # compute ground truth
    g.update_all(_mfunc_hxw1, _rfunc_m1, _afunc)
    o1 = g.ndata.pop('o1')
    g.update_all(_mfunc_hxw2, _rfunc_m2, _afunc)
    o2 = g.ndata.pop('o2')
    g.update_all(_mfunc_hxw1, _rfunc_m1max, _afunc)
    o3 = g.ndata.pop('o3')
    # v2v spmv
    g.update_all(fn.src_mul_edge(src='h', edge='w1', out='m1'),
                 fn.sum(msg='m1', out='o1'),
                 _afunc)
    assert U.allclose(o1, g.ndata.pop('o1'))
    # v2v fallback to e2v
    g.update_all(fn.src_mul_edge(src='h', edge='w2', out='m2'),
                 fn.sum(msg='m2', out='o2'),
                 _afunc)
    assert U.allclose(o2, g.ndata.pop('o2'))
    # v2v fallback to degree bucketing
    g.update_all(fn.src_mul_edge(src='h', edge='w1', out='m1'),
                 fn.max(msg='m1', out='o3'),
                 _afunc)
    assert U.allclose(o3, g.ndata.pop('o3'))
    # multi builtins, both v2v spmv
    g.update_all([fn.src_mul_edge(src='h', edge='w1', out='m1'), fn.src_mul_edge(src='h', edge='w1', out='m2')],
                 [fn.sum(msg='m1', out='o1'), fn.sum(msg='m2', out='o2')],
                 _afunc)
    assert U.allclose(o1, g.ndata.pop('o1'))
    assert U.allclose(o1, g.ndata.pop('o2'))
    # multi builtins, one v2v spmv, one fallback to e2v
    g.update_all([fn.src_mul_edge(src='h', edge='w1', out='m1'), fn.src_mul_edge(src='h', edge='w2', out='m2')],
                 [fn.sum(msg='m1', out='o1'), fn.sum(msg='m2', out='o2')],
                 _afunc)
    assert U.allclose(o1, g.ndata.pop('o1'))
    assert U.allclose(o2, g.ndata.pop('o2'))
    # multi builtins, one v2v spmv, one fallback to e2v, one fallback to degree-bucketing
    g.update_all([fn.src_mul_edge(src='h', edge='w1', out='m1'),
                  fn.src_mul_edge(src='h', edge='w2', out='m2'),
                  fn.src_mul_edge(src='h', edge='w1', out='m3')],
                 [fn.sum(msg='m1', out='o1'),
                  fn.sum(msg='m2', out='o2'),
                  fn.max(msg='m3', out='o3')],
                 _afunc)
    assert U.allclose(o1, g.ndata.pop('o1'))
    assert U.allclose(o2, g.ndata.pop('o2'))
    assert U.allclose(o3, g.ndata.pop('o3'))

386
if __name__ == '__main__':
387
388
389
390
391
392
393
394
    test_v2v_update_all()
    test_v2v_snr()
    test_v2v_update_all_multi_fn()
    test_v2v_snr_multi_fn()
    test_e2v_update_all_multi_fn()
    test_e2v_snr_multi_fn()
    test_e2v_recv_multi_fn()
    test_multi_fn_fallback()