test_inplace_update.py 8.47 KB
Newer Older
Lingfan Yu's avatar
Lingfan Yu committed
1
2
3
4
import numpy as np
import scipy.sparse as sp
import dgl
import dgl.function as fn
5
import backend as F
VoVAllen's avatar
VoVAllen committed
6
import unittest
Lingfan Yu's avatar
Lingfan Yu committed
7
8
9
10
11
12
13
14
15
16
17
18

D = 5

def generate_graph():
    g = dgl.DGLGraph()
    g.add_nodes(10)
    # create a graph where 0 is the source and 9 is the sink
    for i in range(1, 9):
        g.add_edge(0, i)
        g.add_edge(i, 9)
    # add a back flow from 9 to 0
    g.add_edge(9, 0)
19
20
    g.ndata['f'] = F.randn((10, D))
    g.edata['e'] = F.randn((17, D))
Lingfan Yu's avatar
Lingfan Yu committed
21
22
    return g

VoVAllen's avatar
VoVAllen committed
23
24

@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support inplace update")
Lingfan Yu's avatar
Lingfan Yu committed
25
def test_inplace_recv():
26
27
    u = F.tensor([0, 0, 0, 3, 4, 9])
    v = F.tensor([1, 2, 3, 9, 9, 0])
Lingfan Yu's avatar
Lingfan Yu committed
28
29
30
31
32

    def message_func(edges):
        return {'m' : edges.src['f'] + edges.dst['f']}

    def reduce_func(nodes):
33
        return {'f' : F.sum(nodes.mailbox['m'], 1)}
Lingfan Yu's avatar
Lingfan Yu committed
34
35
36
37
38
39
40
41
42
43
44
45
46
47

    def apply_func(nodes):
        return {'f' : 2 * nodes.data['f']}

    def _test(apply_func):
        g = generate_graph()
        f = g.ndata['f']

        # one out place run to get result
        g.send((u, v), message_func)
        g.recv([0,1,2,3,9], reduce_func, apply_func)
        result = g.get_n_repr()['f']

        # inplace deg bucket run
48
        v1 = F.clone(f)
Lingfan Yu's avatar
Lingfan Yu committed
49
50
51
52
53
        g.ndata['f'] = v1
        g.send((u, v), message_func)
        g.recv([0,1,2,3,9], reduce_func, apply_func, inplace=True)
        r1 = g.get_n_repr()['f']
        # check result
54
        assert F.allclose(r1, result)
Lingfan Yu's avatar
Lingfan Yu committed
55
        # check inplace
56
        assert F.allclose(v1, r1)
Lingfan Yu's avatar
Lingfan Yu committed
57
58

        # inplace e2v
59
        v1 = F.clone(f)
Lingfan Yu's avatar
Lingfan Yu committed
60
61
62
63
64
        g.ndata['f'] = v1
        g.send((u, v), message_func)
        g.recv([0,1,2,3,9], fn.sum(msg='m', out='f'), apply_func, inplace=True)
        r1 = g.ndata['f']
        # check result
65
        assert F.allclose(r1, result)
Lingfan Yu's avatar
Lingfan Yu committed
66
        # check inplace
67
        assert F.allclose(v1, r1)
Lingfan Yu's avatar
Lingfan Yu committed
68
69
70
71
72
73

    # test send_and_recv with apply_func
    _test(apply_func)
    # test send_and_recv without apply_func
    _test(None)

VoVAllen's avatar
VoVAllen committed
74
75

@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support inplace update")
Lingfan Yu's avatar
Lingfan Yu committed
76
def test_inplace_snr():
77
78
    u = F.tensor([0, 0, 0, 3, 4, 9])
    v = F.tensor([1, 2, 3, 9, 9, 0])
Lingfan Yu's avatar
Lingfan Yu committed
79
80
81
82
83

    def message_func(edges):
        return {'m' : edges.src['f']}

    def reduce_func(nodes):
84
        return {'f' : F.sum(nodes.mailbox['m'], 1)}
Lingfan Yu's avatar
Lingfan Yu committed
85
86
87
88
89
90
91
92
93
94
95
96
97
98

    def apply_func(nodes):
        return {'f' : 2 * nodes.data['f']}

    def _test(apply_func):
        g = generate_graph()
        f = g.ndata['f']

        # an out place run to get result
        g.send_and_recv((u, v), fn.copy_src(src='f', out='m'),
                fn.sum(msg='m', out='f'), apply_func)
        result = g.ndata['f']

        # inplace deg bucket
99
        v1 = F.clone(f)
Lingfan Yu's avatar
Lingfan Yu committed
100
101
102
103
        g.ndata['f'] = v1
        g.send_and_recv((u, v), message_func, reduce_func, apply_func, inplace=True)
        r1 = g.ndata['f']
        # check result
104
        assert F.allclose(r1, result)
Lingfan Yu's avatar
Lingfan Yu committed
105
        # check inplace
106
        assert F.allclose(v1, r1)
Lingfan Yu's avatar
Lingfan Yu committed
107
108

        # inplace v2v spmv
109
        v1 = F.clone(f)
Lingfan Yu's avatar
Lingfan Yu committed
110
111
112
113
114
        g.ndata['f'] = v1
        g.send_and_recv((u, v), fn.copy_src(src='f', out='m'),
                        fn.sum(msg='m', out='f'), apply_func, inplace=True)
        r1 = g.ndata['f']
        # check result
115
        assert F.allclose(r1, result)
Lingfan Yu's avatar
Lingfan Yu committed
116
        # check inplace
117
        assert F.allclose(v1, r1)
Lingfan Yu's avatar
Lingfan Yu committed
118
119

        # inplace e2v spmv
120
        v1 = F.clone(f)
Lingfan Yu's avatar
Lingfan Yu committed
121
122
123
124
125
        g.ndata['f'] = v1
        g.send_and_recv((u, v), message_func,
                        fn.sum(msg='m', out='f'), apply_func, inplace=True)
        r1 = g.ndata['f']
        # check result
126
        assert F.allclose(r1, result)
Lingfan Yu's avatar
Lingfan Yu committed
127
        # check inplace
128
        assert F.allclose(v1, r1)
Lingfan Yu's avatar
Lingfan Yu committed
129
130
131
132
133
134

    # test send_and_recv with apply_func
    _test(apply_func)
    # test send_and_recv without apply_func
    _test(None)

VoVAllen's avatar
VoVAllen committed
135
136

@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support inplace update")
Lingfan Yu's avatar
Lingfan Yu committed
137
def test_inplace_push():
138
    nodes = F.tensor([0, 3, 4, 9])
Lingfan Yu's avatar
Lingfan Yu committed
139
140
141
142
143

    def message_func(edges):
        return {'m' : edges.src['f']}

    def reduce_func(nodes):
144
        return {'f' : F.sum(nodes.mailbox['m'], 1)}
Lingfan Yu's avatar
Lingfan Yu committed
145
146
147
148
149
150
151
152
153
154
155
156
157
158

    def apply_func(nodes):
        return {'f' : 2 * nodes.data['f']}

    def _test(apply_func):
        g = generate_graph()
        f = g.ndata['f']

        # an out place run to get result
        g.push(nodes,
               fn.copy_src(src='f', out='m'), fn.sum(msg='m', out='f'), apply_func)
        result = g.ndata['f']

        # inplace deg bucket
159
        v1 = F.clone(f)
Lingfan Yu's avatar
Lingfan Yu committed
160
161
162
163
        g.ndata['f'] = v1
        g.push(nodes, message_func, reduce_func, apply_func, inplace=True)
        r1 = g.ndata['f']
        # check result
164
        assert F.allclose(r1, result)
Lingfan Yu's avatar
Lingfan Yu committed
165
        # check inplace
166
        assert F.allclose(v1, r1)
Lingfan Yu's avatar
Lingfan Yu committed
167
168

        # inplace v2v spmv
169
        v1 = F.clone(f)
Lingfan Yu's avatar
Lingfan Yu committed
170
171
172
173
174
        g.ndata['f'] = v1
        g.push(nodes, fn.copy_src(src='f', out='m'),
               fn.sum(msg='m', out='f'), apply_func, inplace=True)
        r1 = g.ndata['f']
        # check result
175
        assert F.allclose(r1, result)
Lingfan Yu's avatar
Lingfan Yu committed
176
        # check inplace
177
        assert F.allclose(v1, r1)
Lingfan Yu's avatar
Lingfan Yu committed
178
179

        # inplace e2v spmv
180
        v1 = F.clone(f)
Lingfan Yu's avatar
Lingfan Yu committed
181
182
183
184
185
        g.ndata['f'] = v1
        g.push(nodes,
               message_func, fn.sum(msg='m', out='f'), apply_func, inplace=True)
        r1 = g.ndata['f']
        # check result
186
        assert F.allclose(r1, result)
Lingfan Yu's avatar
Lingfan Yu committed
187
        # check inplace
188
        assert F.allclose(v1, r1)
Lingfan Yu's avatar
Lingfan Yu committed
189
190
191
192
193
194

    # test send_and_recv with apply_func
    _test(apply_func)
    # test send_and_recv without apply_func
    _test(None)

VoVAllen's avatar
VoVAllen committed
195
196

@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support inplace update")
Lingfan Yu's avatar
Lingfan Yu committed
197
def test_inplace_pull():
198
    nodes = F.tensor([1, 2, 3, 9])
Lingfan Yu's avatar
Lingfan Yu committed
199
200
201
202
203

    def message_func(edges):
        return {'m' : edges.src['f']}

    def reduce_func(nodes):
204
        return {'f' : F.sum(nodes.mailbox['m'], 1)}
Lingfan Yu's avatar
Lingfan Yu committed
205
206
207
208
209
210
211
212
213
214
215
216
217
218

    def apply_func(nodes):
        return {'f' : 2 * nodes.data['f']}

    def _test(apply_func):
        g = generate_graph()
        f = g.ndata['f']

        # an out place run to get result
        g.pull(nodes,
               fn.copy_src(src='f', out='m'), fn.sum(msg='m', out='f'), apply_func)
        result = g.ndata['f']

        # inplace deg bucket
219
        v1 = F.clone(f)
Lingfan Yu's avatar
Lingfan Yu committed
220
221
222
223
        g.ndata['f'] = v1
        g.pull(nodes, message_func, reduce_func, apply_func, inplace=True)
        r1 = g.ndata['f']
        # check result
224
        assert F.allclose(r1, result)
Lingfan Yu's avatar
Lingfan Yu committed
225
        # check inplace
226
        assert F.allclose(v1, r1)
Lingfan Yu's avatar
Lingfan Yu committed
227
228

        # inplace v2v spmv
229
        v1 = F.clone(f)
Lingfan Yu's avatar
Lingfan Yu committed
230
231
232
233
234
        g.ndata['f'] = v1
        g.pull(nodes, fn.copy_src(src='f', out='m'),
               fn.sum(msg='m', out='f'), apply_func, inplace=True)
        r1 = g.ndata['f']
        # check result
235
        assert F.allclose(r1, result)
Lingfan Yu's avatar
Lingfan Yu committed
236
        # check inplace
237
        assert F.allclose(v1, r1)
Lingfan Yu's avatar
Lingfan Yu committed
238
239

        # inplace e2v spmv
240
        v1 = F.clone(f)
Lingfan Yu's avatar
Lingfan Yu committed
241
242
243
244
245
        g.ndata['f'] = v1
        g.pull(nodes,
               message_func, fn.sum(msg='m', out='f'), apply_func, inplace=True)
        r1 = g.ndata['f']
        # check result
246
        assert F.allclose(r1, result)
Lingfan Yu's avatar
Lingfan Yu committed
247
        # check inplace
248
        assert F.allclose(v1, r1)
Lingfan Yu's avatar
Lingfan Yu committed
249
250
251
252
253
254

    # test send_and_recv with apply_func
    _test(apply_func)
    # test send_and_recv without apply_func
    _test(None)

VoVAllen's avatar
VoVAllen committed
255
256

@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support inplace update")
Lingfan Yu's avatar
Lingfan Yu committed
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
def test_inplace_apply():
    def apply_node_func(nodes):
        return {'f': nodes.data['f'] * 2}

    def apply_edge_func(edges):
        return {'e': edges.data['e'] * 2}

    g = generate_graph()
    nodes = [1, 2, 3, 9]
    nf = g.ndata['f']
    # out place run
    g.apply_nodes(apply_node_func, nodes)
    new_nf = g.ndata['f']
    # in place run
    g.ndata['f'] = nf
    g.apply_nodes(apply_node_func, nodes, inplace=True)
    # check results correct and in place
274
    assert F.allclose(nf, new_nf)
Lingfan Yu's avatar
Lingfan Yu committed
275
276
277
    # test apply all nodes, should not be done in place
    g.ndata['f'] = nf
    g.apply_nodes(apply_node_func, inplace=True)
278
    assert F.allclose(nf, g.ndata['f']) == False
Lingfan Yu's avatar
Lingfan Yu committed
279
280
281
282
283
284
285
286
287
288

    edges = [3, 5, 7, 10]
    ef = g.edata['e']
    # out place run
    g.apply_edges(apply_edge_func, edges)
    new_ef = g.edata['e']
    # in place run
    g.edata['e'] = ef
    g.apply_edges(apply_edge_func, edges, inplace=True)
    g.edata['e'] = ef
289
    assert F.allclose(ef, new_ef)
Lingfan Yu's avatar
Lingfan Yu committed
290
291
292
    # test apply all edges, should not be done in place
    g.edata['e'] == ef
    g.apply_edges(apply_edge_func, inplace=True)
293
    assert F.allclose(ef, g.edata['e']) == False
Lingfan Yu's avatar
Lingfan Yu committed
294
295
296
297
298
299
300

if __name__ == '__main__':
    test_inplace_recv()
    test_inplace_snr()
    test_inplace_push()
    test_inplace_pull()
    test_inplace_apply()