test_specialization.py 7.41 KB
Newer Older
1
import torch as th
Minjie Wang's avatar
Minjie Wang committed
2
import numpy as np
3
4
import dgl
import dgl.function as fn
5

Minjie Wang's avatar
Minjie Wang committed
6
7
D = 5

8
def generate_graph():
9
    g = dgl.DGLGraph()
Minjie Wang's avatar
Minjie Wang committed
10
    g.add_nodes(10)
11
12
13
14
15
16
    # create a graph where 0 is the source and 9 is the sink
    for i in range(1, 9):
        g.add_edge(0, i)
        g.add_edge(i, 9)
    # add a back flow from 9 to 0
    g.add_edge(9, 0)
17
18
19
    g.set_n_repr({'f1' : th.randn(10,), 'f2' : th.randn(10, D)})
    weights = th.randn(17,)
    g.set_e_repr({'e1': weights, 'e2': th.unsqueeze(weights, 1)})
20
21
    return g

22
23
24
def test_update_all():
    def _test(fld):
        def message_func(hu, edge):
Minjie Wang's avatar
Minjie Wang committed
25
            return {'m' : hu[fld]}
26
27
28

        def message_func_edge(hu, edge):
            if len(hu[fld].shape) == 1:
Minjie Wang's avatar
Minjie Wang committed
29
                return {'m' : hu[fld] * edge['e1']}
30
            else:
Minjie Wang's avatar
Minjie Wang committed
31
                return {'m' : hu[fld] * edge['e2']}
32
33

        def reduce_func(hv, msgs):
Minjie Wang's avatar
Minjie Wang committed
34
            return {fld : th.sum(msgs['m'], 1)}
35
36
37
38
39
40

        def apply_func(hu):
            return {fld : 2 * hu[fld]}
        g = generate_graph()
        # update all
        v1 = g.get_n_repr()[fld]
Minjie Wang's avatar
Minjie Wang committed
41
        g.update_all(fn.copy_src(src=fld, out='m'), fn.sum(msg='m', out=fld), apply_func)
42
43
        v2 = g.get_n_repr()[fld]
        g.set_n_repr({fld : v1})
Minjie Wang's avatar
Minjie Wang committed
44
        g.update_all(message_func, reduce_func, apply_func)
45
46
47
48
        v3 = g.get_n_repr()[fld]
        assert th.allclose(v2, v3)
        # update all with edge weights
        v1 = g.get_n_repr()[fld]
Minjie Wang's avatar
Minjie Wang committed
49
50
        g.update_all(fn.src_mul_edge(src=fld, edge='e1', out='m'),
                fn.sum(msg='m', out=fld), apply_func)
51
52
        v2 = g.get_n_repr()[fld]
        g.set_n_repr({fld : v1})
Minjie Wang's avatar
Minjie Wang committed
53
54
        g.update_all(fn.src_mul_edge(src=fld, edge='e2', out='m'),
                fn.sum(msg='m', out=fld), apply_func)
55
56
        v3 = g.get_n_repr()[fld]
        g.set_n_repr({fld : v1})
Minjie Wang's avatar
Minjie Wang committed
57
        g.update_all(message_func_edge, reduce_func, apply_func)
58
59
60
61
62
63
64
65
66
        v4 = g.get_n_repr()[fld]
        assert th.allclose(v2, v3)
        assert th.allclose(v3, v4)
    # test 1d node features
    _test('f1')
    # test 2d node features
    _test('f2')

def test_send_and_recv():
Minjie Wang's avatar
Minjie Wang committed
67
68
    u = th.tensor([0, 0, 0, 3, 4, 9])
    v = th.tensor([1, 2, 3, 9, 9, 0])
69
70
    def _test(fld):
        def message_func(hu, edge):
Minjie Wang's avatar
Minjie Wang committed
71
            return {'m' : hu[fld]}
72
73
74

        def message_func_edge(hu, edge):
            if len(hu[fld].shape) == 1:
Minjie Wang's avatar
Minjie Wang committed
75
                return {'m' : hu[fld] * edge['e1']}
76
            else:
Minjie Wang's avatar
Minjie Wang committed
77
                return {'m' : hu[fld] * edge['e2']}
78
79

        def reduce_func(hv, msgs):
Minjie Wang's avatar
Minjie Wang committed
80
            return {fld : th.sum(msgs['m'], 1)}
81
82
83
84
85
86

        def apply_func(hu):
            return {fld : 2 * hu[fld]}
        g = generate_graph()
        # send and recv
        v1 = g.get_n_repr()[fld]
Minjie Wang's avatar
Minjie Wang committed
87
88
        g.send_and_recv(u, v, fn.copy_src(src=fld, out='m'),
                fn.sum(msg='m', out=fld), apply_func)
89
90
        v2 = g.get_n_repr()[fld]
        g.set_n_repr({fld : v1})
Minjie Wang's avatar
Minjie Wang committed
91
        g.send_and_recv(u, v, message_func, reduce_func, apply_func)
92
93
94
95
        v3 = g.get_n_repr()[fld]
        assert th.allclose(v2, v3)
        # send and recv with edge weights
        v1 = g.get_n_repr()[fld]
Minjie Wang's avatar
Minjie Wang committed
96
97
        g.send_and_recv(u, v, fn.src_mul_edge(src=fld, edge='e1', out='m'),
                fn.sum(msg='m', out=fld), apply_func)
98
99
        v2 = g.get_n_repr()[fld]
        g.set_n_repr({fld : v1})
Minjie Wang's avatar
Minjie Wang committed
100
101
        g.send_and_recv(u, v, fn.src_mul_edge(src=fld, edge='e2', out='m'),
                fn.sum(msg='m', out=fld), apply_func)
102
103
        v3 = g.get_n_repr()[fld]
        g.set_n_repr({fld : v1})
Minjie Wang's avatar
Minjie Wang committed
104
        g.send_and_recv(u, v, message_func_edge, reduce_func, apply_func)
105
106
107
108
109
110
111
        v4 = g.get_n_repr()[fld]
        assert th.allclose(v2, v3)
        assert th.allclose(v3, v4)
    # test 1d node features
    _test('f1')
    # test 2d node features
    _test('f2')
112

113
114
115
116
117
118
119
120
121
122
123
def test_update_all_multi_fn():
    def message_func(hu, edge):
        return {'m2': hu['f2']}

    def message_func_edge(hu, edge):
        return {'m2': hu['f2'] * edge['e2']}

    def reduce_func(hv, msgs):
        return {'v2': th.sum(msgs['m2'], 1)}

    g = generate_graph()
Minjie Wang's avatar
Minjie Wang committed
124
    g.set_n_repr({'v1' : th.zeros((10,)), 'v2' : th.zeros((10,))})
125
126
127
    fld = 'f2'
    # update all, mix of builtin and UDF
    g.update_all([fn.copy_src(src=fld, out='m1'), message_func],
Minjie Wang's avatar
Minjie Wang committed
128
                 [fn.sum(msg='m1', out='v1'), reduce_func],
Minjie Wang's avatar
Minjie Wang committed
129
                 None)
130
131
132
133
134
    v1 = g.get_n_repr()['v1']
    v2 = g.get_n_repr()['v2']
    assert th.allclose(v1, v2)

    # run builtin with single message and reduce
Minjie Wang's avatar
Minjie Wang committed
135
    g.update_all(fn.copy_src(src=fld, out='m'), fn.sum(msg='m', out='v1'), None)
136
137
138
    v1 = g.get_n_repr()['v1']
    assert th.allclose(v1, v2)

Minjie Wang's avatar
Minjie Wang committed
139
140
    # 1 message, 2 reduces
    g.update_all(fn.copy_src(src=fld, out='m'), [fn.sum(msg='m', out='v2'), fn.sum(msg='m', out='v3')], None)
141
142
143
144
145
146
147
    v2 = g.get_n_repr()['v2']
    v3 = g.get_n_repr()['v3']
    assert th.allclose(v1, v2)
    assert th.allclose(v1, v3)

    # update all with edge weights, 2 message, 3 reduces
    g.update_all([fn.src_mul_edge(src=fld, edge='e1', out='m1'), fn.src_mul_edge(src=fld, edge='e2', out='m2')],
Minjie Wang's avatar
Minjie Wang committed
148
                 [fn.sum(msg='m1', out='v1'), fn.sum(msg='m2', out='v2'), fn.sum(msg='m1', out='v3')],
Minjie Wang's avatar
Minjie Wang committed
149
                 None)
150
151
152
153
154
155
156
    v1 = g.get_n_repr()['v1']
    v2 = g.get_n_repr()['v2']
    v3 = g.get_n_repr()['v3']
    assert th.allclose(v1, v2)
    assert th.allclose(v1, v3)

    # run UDF with single message and reduce
Minjie Wang's avatar
Minjie Wang committed
157
    g.update_all(message_func_edge, reduce_func, None)
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
    v2 = g.get_n_repr()['v2']
    assert th.allclose(v1, v2)

def test_send_and_recv_multi_fn():
    u = th.tensor([0, 0, 0, 3, 4, 9])
    v = th.tensor([1, 2, 3, 9, 9, 0])

    def message_func(hu, edge):
        return {'m2': hu['f2']}

    def message_func_edge(hu, edge):
        return {'m2': hu['f2'] * edge['e2']}

    def reduce_func(hv, msgs):
        return {'v2' : th.sum(msgs['m2'], 1)}

    g = generate_graph()
Minjie Wang's avatar
Minjie Wang committed
175
176
    g.set_n_repr({'v1' : th.zeros((10, D)), 'v2' : th.zeros((10, D)),
        'v3' : th.zeros((10, D))})
177
178
179
180
181
    fld = 'f2'

    # send and recv, mix of builtin and UDF
    g.send_and_recv(u, v,
                    [fn.copy_src(src=fld, out='m1'), message_func],
Minjie Wang's avatar
Minjie Wang committed
182
                    [fn.sum(msg='m1', out='v1'), reduce_func],
Minjie Wang's avatar
Minjie Wang committed
183
                    None)
184
185
186
187
188
    v1 = g.get_n_repr()['v1']
    v2 = g.get_n_repr()['v2']
    assert th.allclose(v1, v2)

    # run builtin with single message and reduce
Minjie Wang's avatar
Minjie Wang committed
189
    g.send_and_recv(u, v, fn.copy_src(src=fld, out='m'), fn.sum(msg='m', out='v1'),
Minjie Wang's avatar
Minjie Wang committed
190
                    None)
191
192
193
    v1 = g.get_n_repr()['v1']
    assert th.allclose(v1, v2)

Minjie Wang's avatar
Minjie Wang committed
194
195
196
197
198
    # 1 message, 2 reduces
    g.send_and_recv(u, v,
            fn.copy_src(src=fld, out='m'),
            [fn.sum(msg='m', out='v2'), fn.sum(msg='m', out='v3')],
            None)
199
200
201
202
203
204
205
206
    v2 = g.get_n_repr()['v2']
    v3 = g.get_n_repr()['v3']
    assert th.allclose(v1, v2)
    assert th.allclose(v1, v3)

    # send and recv with edge weights, 2 message, 3 reduces
    g.send_and_recv(u, v,
                    [fn.src_mul_edge(src=fld, edge='e1', out='m1'), fn.src_mul_edge(src=fld, edge='e2', out='m2')],
Minjie Wang's avatar
Minjie Wang committed
207
                    [fn.sum(msg='m1', out='v1'), fn.sum(msg='m2', out='v2'), fn.sum(msg='m1', out='v3')],
Minjie Wang's avatar
Minjie Wang committed
208
                    None)
209
210
211
212
213
214
215
216
    v1 = g.get_n_repr()['v1']
    v2 = g.get_n_repr()['v2']
    v3 = g.get_n_repr()['v3']
    assert th.allclose(v1, v2)
    assert th.allclose(v1, v3)

    # run UDF with single message and reduce
    g.send_and_recv(u, v, message_func_edge,
Minjie Wang's avatar
Minjie Wang committed
217
            reduce_func, None)
218
219
220
    v2 = g.get_n_repr()['v2']
    assert th.allclose(v1, v2)

221
if __name__ == '__main__':
222
    test_update_all()
223
    test_send_and_recv()
224
225
    test_update_all_multi_fn()
    test_send_and_recv_multi_fn()