test_specialization.py 7.04 KB
Newer Older
1
import torch as th
Minjie Wang's avatar
Minjie Wang committed
2
import numpy as np
3
4
import dgl
import dgl.function as fn
5

Minjie Wang's avatar
Minjie Wang committed
6
7
D = 5

8
def generate_graph():
9
    g = dgl.DGLGraph()
Minjie Wang's avatar
Minjie Wang committed
10
    g.add_nodes(10)
11
12
13
14
15
16
    # create a graph where 0 is the source and 9 is the sink
    for i in range(1, 9):
        g.add_edge(0, i)
        g.add_edge(i, 9)
    # add a back flow from 9 to 0
    g.add_edge(9, 0)
17
18
19
    g.set_n_repr({'f1' : th.randn(10,), 'f2' : th.randn(10, D)})
    weights = th.randn(17,)
    g.set_e_repr({'e1': weights, 'e2': th.unsqueeze(weights, 1)})
20
21
    return g

22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def test_update_all():
    def _test(fld):
        def message_func(hu, edge):
            return hu[fld]

        def message_func_edge(hu, edge):
            if len(hu[fld].shape) == 1:
                return hu[fld] * edge['e1']
            else:
                return hu[fld] * edge['e2']

        def reduce_func(hv, msgs):
            return {fld : th.sum(msgs, 1)}

        def apply_func(hu):
            return {fld : 2 * hu[fld]}
        g = generate_graph()
        # update all
        v1 = g.get_n_repr()[fld]
Minjie Wang's avatar
Minjie Wang committed
41
        g.update_all(fn.copy_src(src=fld), fn.sum(out=fld), apply_func)
42
43
        v2 = g.get_n_repr()[fld]
        g.set_n_repr({fld : v1})
Minjie Wang's avatar
Minjie Wang committed
44
        g.update_all(message_func, reduce_func, apply_func)
45
46
47
48
49
        v3 = g.get_n_repr()[fld]
        assert th.allclose(v2, v3)
        # update all with edge weights
        v1 = g.get_n_repr()[fld]
        g.update_all(fn.src_mul_edge(src=fld, edge='e1'),
Minjie Wang's avatar
Minjie Wang committed
50
                fn.sum(out=fld), apply_func)
51
52
53
        v2 = g.get_n_repr()[fld]
        g.set_n_repr({fld : v1})
        g.update_all(fn.src_mul_edge(src=fld, edge='e2'),
Minjie Wang's avatar
Minjie Wang committed
54
                fn.sum(out=fld), apply_func)
55
56
        v3 = g.get_n_repr()[fld]
        g.set_n_repr({fld : v1})
Minjie Wang's avatar
Minjie Wang committed
57
        g.update_all(message_func_edge, reduce_func, apply_func)
58
59
60
61
62
63
64
65
66
        v4 = g.get_n_repr()[fld]
        assert th.allclose(v2, v3)
        assert th.allclose(v3, v4)
    # test 1d node features
    _test('f1')
    # test 2d node features
    _test('f2')

def test_send_and_recv():
Minjie Wang's avatar
Minjie Wang committed
67
68
    u = th.tensor([0, 0, 0, 3, 4, 9])
    v = th.tensor([1, 2, 3, 9, 9, 0])
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
    def _test(fld):
        def message_func(hu, edge):
            return hu[fld]

        def message_func_edge(hu, edge):
            if len(hu[fld].shape) == 1:
                return hu[fld] * edge['e1']
            else:
                return hu[fld] * edge['e2']

        def reduce_func(hv, msgs):
            return {fld : th.sum(msgs, 1)}

        def apply_func(hu):
            return {fld : 2 * hu[fld]}
        g = generate_graph()
        # send and recv
        v1 = g.get_n_repr()[fld]
        g.send_and_recv(u, v, fn.copy_src(src=fld),
Minjie Wang's avatar
Minjie Wang committed
88
                fn.sum(out=fld), apply_func)
89
90
91
        v2 = g.get_n_repr()[fld]
        g.set_n_repr({fld : v1})
        g.send_and_recv(u, v, message_func,
Minjie Wang's avatar
Minjie Wang committed
92
                reduce_func, apply_func)
93
94
95
96
97
        v3 = g.get_n_repr()[fld]
        assert th.allclose(v2, v3)
        # send and recv with edge weights
        v1 = g.get_n_repr()[fld]
        g.send_and_recv(u, v, fn.src_mul_edge(src=fld, edge='e1'),
Minjie Wang's avatar
Minjie Wang committed
98
                fn.sum(out=fld), apply_func)
99
100
101
        v2 = g.get_n_repr()[fld]
        g.set_n_repr({fld : v1})
        g.send_and_recv(u, v, fn.src_mul_edge(src=fld, edge='e2'),
Minjie Wang's avatar
Minjie Wang committed
102
                fn.sum(out=fld), apply_func)
103
104
105
        v3 = g.get_n_repr()[fld]
        g.set_n_repr({fld : v1})
        g.send_and_recv(u, v, message_func_edge,
Minjie Wang's avatar
Minjie Wang committed
106
                reduce_func, apply_func)
107
108
109
110
111
112
113
        v4 = g.get_n_repr()[fld]
        assert th.allclose(v2, v3)
        assert th.allclose(v3, v4)
    # test 1d node features
    _test('f1')
    # test 2d node features
    _test('f2')
114

115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def test_update_all_multi_fn():
    def message_func(hu, edge):
        return {'m2': hu['f2']}

    def message_func_edge(hu, edge):
        return {'m2': hu['f2'] * edge['e2']}

    def reduce_func(hv, msgs):
        return {'v2': th.sum(msgs['m2'], 1)}

    g = generate_graph()
    fld = 'f2'
    # update all, mix of builtin and UDF
    g.update_all([fn.copy_src(src=fld, out='m1'), message_func],
                 [fn.sum(msgs='m1', out='v1'), reduce_func],
Minjie Wang's avatar
Minjie Wang committed
130
                 None)
131
132
133
134
135
    v1 = g.get_n_repr()['v1']
    v2 = g.get_n_repr()['v2']
    assert th.allclose(v1, v2)

    # run builtin with single message and reduce
Minjie Wang's avatar
Minjie Wang committed
136
    g.update_all(fn.copy_src(src=fld), fn.sum(out='v1'), None)
137
138
139
140
    v1 = g.get_n_repr()['v1']
    assert th.allclose(v1, v2)

    # 1 message, 2 reduces, using anonymous repr
Minjie Wang's avatar
Minjie Wang committed
141
    g.update_all(fn.copy_src(src=fld), [fn.sum(out='v2'), fn.sum(out='v3')], None)
142
143
144
145
146
147
148
149
    v2 = g.get_n_repr()['v2']
    v3 = g.get_n_repr()['v3']
    assert th.allclose(v1, v2)
    assert th.allclose(v1, v3)

    # update all with edge weights, 2 message, 3 reduces
    g.update_all([fn.src_mul_edge(src=fld, edge='e1', out='m1'), fn.src_mul_edge(src=fld, edge='e2', out='m2')],
                 [fn.sum(msgs='m1', out='v1'), fn.sum(msgs='m2', out='v2'), fn.sum(msgs='m1', out='v3')],
Minjie Wang's avatar
Minjie Wang committed
150
                 None)
151
152
153
154
155
156
157
    v1 = g.get_n_repr()['v1']
    v2 = g.get_n_repr()['v2']
    v3 = g.get_n_repr()['v3']
    assert th.allclose(v1, v2)
    assert th.allclose(v1, v3)

    # run UDF with single message and reduce
Minjie Wang's avatar
Minjie Wang committed
158
    g.update_all(message_func_edge, reduce_func, None)
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
    v2 = g.get_n_repr()['v2']
    assert th.allclose(v1, v2)

def test_send_and_recv_multi_fn():
    u = th.tensor([0, 0, 0, 3, 4, 9])
    v = th.tensor([1, 2, 3, 9, 9, 0])

    def message_func(hu, edge):
        return {'m2': hu['f2']}

    def message_func_edge(hu, edge):
        return {'m2': hu['f2'] * edge['e2']}

    def reduce_func(hv, msgs):
        return {'v2' : th.sum(msgs['m2'], 1)}

    g = generate_graph()
    fld = 'f2'

    # send and recv, mix of builtin and UDF
    g.send_and_recv(u, v,
                    [fn.copy_src(src=fld, out='m1'), message_func],
                    [fn.sum(msgs='m1', out='v1'), reduce_func],
Minjie Wang's avatar
Minjie Wang committed
182
                    None)
183
184
185
186
187
188
    v1 = g.get_n_repr()['v1']
    v2 = g.get_n_repr()['v2']
    assert th.allclose(v1, v2)

    # run builtin with single message and reduce
    g.send_and_recv(u, v, fn.copy_src(src=fld), fn.sum(out='v1'),
Minjie Wang's avatar
Minjie Wang committed
189
                    None)
190
191
192
193
    v1 = g.get_n_repr()['v1']
    assert th.allclose(v1, v2)

    # 1 message, 2 reduces, using anonymous repr
Minjie Wang's avatar
Minjie Wang committed
194
    g.send_and_recv(u, v, fn.copy_src(src=fld), [fn.sum(out='v2'), fn.sum(out='v3')], None)
195
196
197
198
199
200
201
202
203
    v2 = g.get_n_repr()['v2']
    v3 = g.get_n_repr()['v3']
    assert th.allclose(v1, v2)
    assert th.allclose(v1, v3)

    # send and recv with edge weights, 2 message, 3 reduces
    g.send_and_recv(u, v,
                    [fn.src_mul_edge(src=fld, edge='e1', out='m1'), fn.src_mul_edge(src=fld, edge='e2', out='m2')],
                    [fn.sum(msgs='m1', out='v1'), fn.sum(msgs='m2', out='v2'), fn.sum(msgs='m1', out='v3')],
Minjie Wang's avatar
Minjie Wang committed
204
                    None)
205
206
207
208
209
210
211
212
    v1 = g.get_n_repr()['v1']
    v2 = g.get_n_repr()['v2']
    v3 = g.get_n_repr()['v3']
    assert th.allclose(v1, v2)
    assert th.allclose(v1, v3)

    # run UDF with single message and reduce
    g.send_and_recv(u, v, message_func_edge,
Minjie Wang's avatar
Minjie Wang committed
213
            reduce_func, None)
214
215
216
    v2 = g.get_n_repr()['v2']
    assert th.allclose(v1, v2)

217
if __name__ == '__main__':
218
    test_update_all()
219
    test_send_and_recv()
220
221
    test_update_all_multi_fn()
    test_send_and_recv_multi_fn()