test_inplace_update.py.bak 6.99 KB
Newer Older
Lingfan Yu's avatar
Lingfan Yu committed
1
2
3
4
import numpy as np
import scipy.sparse as sp
import dgl
import dgl.function as fn
5
import backend as F
VoVAllen's avatar
VoVAllen committed
6
import unittest
Lingfan Yu's avatar
Lingfan Yu committed
7
8
9
10
11
12
13
14
15
16
17
18

D = 5

def generate_graph():
    g = dgl.DGLGraph()
    g.add_nodes(10)
    # create a graph where 0 is the source and 9 is the sink
    for i in range(1, 9):
        g.add_edge(0, i)
        g.add_edge(i, 9)
    # add a back flow from 9 to 0
    g.add_edge(9, 0)
19
20
    g.ndata['f'] = F.randn((10, D))
    g.edata['e'] = F.randn((17, D))
Lingfan Yu's avatar
Lingfan Yu committed
21
22
    return g

VoVAllen's avatar
VoVAllen committed
23
24

@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support inplace update")
Lingfan Yu's avatar
Lingfan Yu committed
25
def test_inplace_snr():
26
27
    u = F.tensor([0, 0, 0, 3, 4, 9])
    v = F.tensor([1, 2, 3, 9, 9, 0])
Lingfan Yu's avatar
Lingfan Yu committed
28
29
30
31
32

    def message_func(edges):
        return {'m' : edges.src['f']}

    def reduce_func(nodes):
33
        return {'f' : F.sum(nodes.mailbox['m'], 1)}
Lingfan Yu's avatar
Lingfan Yu committed
34
35
36
37
38
39
40
41
42
43
44
45
46
47

    def apply_func(nodes):
        return {'f' : 2 * nodes.data['f']}

    def _test(apply_func):
        g = generate_graph()
        f = g.ndata['f']

        # an out place run to get result
        g.send_and_recv((u, v), fn.copy_src(src='f', out='m'),
                fn.sum(msg='m', out='f'), apply_func)
        result = g.ndata['f']

        # inplace deg bucket
48
        v1 = F.clone(f)
Lingfan Yu's avatar
Lingfan Yu committed
49
50
51
52
        g.ndata['f'] = v1
        g.send_and_recv((u, v), message_func, reduce_func, apply_func, inplace=True)
        r1 = g.ndata['f']
        # check result
53
        assert F.allclose(r1, result)
Lingfan Yu's avatar
Lingfan Yu committed
54
        # check inplace
55
        assert F.allclose(v1, r1)
Lingfan Yu's avatar
Lingfan Yu committed
56
57

        # inplace v2v spmv
58
        v1 = F.clone(f)
Lingfan Yu's avatar
Lingfan Yu committed
59
60
61
62
63
        g.ndata['f'] = v1
        g.send_and_recv((u, v), fn.copy_src(src='f', out='m'),
                        fn.sum(msg='m', out='f'), apply_func, inplace=True)
        r1 = g.ndata['f']
        # check result
64
        assert F.allclose(r1, result)
Lingfan Yu's avatar
Lingfan Yu committed
65
        # check inplace
66
        assert F.allclose(v1, r1)
Lingfan Yu's avatar
Lingfan Yu committed
67
68

        # inplace e2v spmv
69
        v1 = F.clone(f)
Lingfan Yu's avatar
Lingfan Yu committed
70
71
72
73
74
        g.ndata['f'] = v1
        g.send_and_recv((u, v), message_func,
                        fn.sum(msg='m', out='f'), apply_func, inplace=True)
        r1 = g.ndata['f']
        # check result
75
        assert F.allclose(r1, result)
Lingfan Yu's avatar
Lingfan Yu committed
76
        # check inplace
77
        assert F.allclose(v1, r1)
Lingfan Yu's avatar
Lingfan Yu committed
78
79
80
81
82
83

    # test send_and_recv with apply_func
    _test(apply_func)
    # test send_and_recv without apply_func
    _test(None)

VoVAllen's avatar
VoVAllen committed
84
85

@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support inplace update")
Lingfan Yu's avatar
Lingfan Yu committed
86
def test_inplace_push():
87
    nodes = F.tensor([0, 3, 4, 9])
Lingfan Yu's avatar
Lingfan Yu committed
88
89
90
91
92

    def message_func(edges):
        return {'m' : edges.src['f']}

    def reduce_func(nodes):
93
        return {'f' : F.sum(nodes.mailbox['m'], 1)}
Lingfan Yu's avatar
Lingfan Yu committed
94
95
96
97
98
99
100
101
102
103
104
105
106
107

    def apply_func(nodes):
        return {'f' : 2 * nodes.data['f']}

    def _test(apply_func):
        g = generate_graph()
        f = g.ndata['f']

        # an out place run to get result
        g.push(nodes,
               fn.copy_src(src='f', out='m'), fn.sum(msg='m', out='f'), apply_func)
        result = g.ndata['f']

        # inplace deg bucket
108
        v1 = F.clone(f)
Lingfan Yu's avatar
Lingfan Yu committed
109
110
111
112
        g.ndata['f'] = v1
        g.push(nodes, message_func, reduce_func, apply_func, inplace=True)
        r1 = g.ndata['f']
        # check result
113
        assert F.allclose(r1, result)
Lingfan Yu's avatar
Lingfan Yu committed
114
        # check inplace
115
        assert F.allclose(v1, r1)
Lingfan Yu's avatar
Lingfan Yu committed
116
117

        # inplace v2v spmv
118
        v1 = F.clone(f)
Lingfan Yu's avatar
Lingfan Yu committed
119
120
121
122
123
        g.ndata['f'] = v1
        g.push(nodes, fn.copy_src(src='f', out='m'),
               fn.sum(msg='m', out='f'), apply_func, inplace=True)
        r1 = g.ndata['f']
        # check result
124
        assert F.allclose(r1, result)
Lingfan Yu's avatar
Lingfan Yu committed
125
        # check inplace
126
        assert F.allclose(v1, r1)
Lingfan Yu's avatar
Lingfan Yu committed
127
128

        # inplace e2v spmv
129
        v1 = F.clone(f)
Lingfan Yu's avatar
Lingfan Yu committed
130
131
132
133
134
        g.ndata['f'] = v1
        g.push(nodes,
               message_func, fn.sum(msg='m', out='f'), apply_func, inplace=True)
        r1 = g.ndata['f']
        # check result
135
        assert F.allclose(r1, result)
Lingfan Yu's avatar
Lingfan Yu committed
136
        # check inplace
137
        assert F.allclose(v1, r1)
Lingfan Yu's avatar
Lingfan Yu committed
138
139
140
141
142
143

    # test send_and_recv with apply_func
    _test(apply_func)
    # test send_and_recv without apply_func
    _test(None)

VoVAllen's avatar
VoVAllen committed
144
145

@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support inplace update")
Lingfan Yu's avatar
Lingfan Yu committed
146
def test_inplace_pull():
147
    nodes = F.tensor([1, 2, 3, 9])
Lingfan Yu's avatar
Lingfan Yu committed
148
149
150
151
152

    def message_func(edges):
        return {'m' : edges.src['f']}

    def reduce_func(nodes):
153
        return {'f' : F.sum(nodes.mailbox['m'], 1)}
Lingfan Yu's avatar
Lingfan Yu committed
154
155
156
157
158
159
160
161
162
163
164
165
166
167

    def apply_func(nodes):
        return {'f' : 2 * nodes.data['f']}

    def _test(apply_func):
        g = generate_graph()
        f = g.ndata['f']

        # an out place run to get result
        g.pull(nodes,
               fn.copy_src(src='f', out='m'), fn.sum(msg='m', out='f'), apply_func)
        result = g.ndata['f']

        # inplace deg bucket
168
        v1 = F.clone(f)
Lingfan Yu's avatar
Lingfan Yu committed
169
170
171
172
        g.ndata['f'] = v1
        g.pull(nodes, message_func, reduce_func, apply_func, inplace=True)
        r1 = g.ndata['f']
        # check result
173
        assert F.allclose(r1, result)
Lingfan Yu's avatar
Lingfan Yu committed
174
        # check inplace
175
        assert F.allclose(v1, r1)
Lingfan Yu's avatar
Lingfan Yu committed
176
177

        # inplace v2v spmv
178
        v1 = F.clone(f)
Lingfan Yu's avatar
Lingfan Yu committed
179
180
181
182
183
        g.ndata['f'] = v1
        g.pull(nodes, fn.copy_src(src='f', out='m'),
               fn.sum(msg='m', out='f'), apply_func, inplace=True)
        r1 = g.ndata['f']
        # check result
184
        assert F.allclose(r1, result)
Lingfan Yu's avatar
Lingfan Yu committed
185
        # check inplace
186
        assert F.allclose(v1, r1)
Lingfan Yu's avatar
Lingfan Yu committed
187
188

        # inplace e2v spmv
189
        v1 = F.clone(f)
Lingfan Yu's avatar
Lingfan Yu committed
190
191
192
193
194
        g.ndata['f'] = v1
        g.pull(nodes,
               message_func, fn.sum(msg='m', out='f'), apply_func, inplace=True)
        r1 = g.ndata['f']
        # check result
195
        assert F.allclose(r1, result)
Lingfan Yu's avatar
Lingfan Yu committed
196
        # check inplace
197
        assert F.allclose(v1, r1)
Lingfan Yu's avatar
Lingfan Yu committed
198
199
200
201
202
203

    # test send_and_recv with apply_func
    _test(apply_func)
    # test send_and_recv without apply_func
    _test(None)

VoVAllen's avatar
VoVAllen committed
204
205

@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support inplace update")
Lingfan Yu's avatar
Lingfan Yu committed
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def test_inplace_apply():
    def apply_node_func(nodes):
        return {'f': nodes.data['f'] * 2}

    def apply_edge_func(edges):
        return {'e': edges.data['e'] * 2}

    g = generate_graph()
    nodes = [1, 2, 3, 9]
    nf = g.ndata['f']
    # out place run
    g.apply_nodes(apply_node_func, nodes)
    new_nf = g.ndata['f']
    # in place run
    g.ndata['f'] = nf
    g.apply_nodes(apply_node_func, nodes, inplace=True)
    # check results correct and in place
223
    assert F.allclose(nf, new_nf)
Lingfan Yu's avatar
Lingfan Yu committed
224
225
226
    # test apply all nodes, should not be done in place
    g.ndata['f'] = nf
    g.apply_nodes(apply_node_func, inplace=True)
227
    assert (F.allclose(nf, g.ndata['f']) == False)
Lingfan Yu's avatar
Lingfan Yu committed
228
229
230
231
232
233
234
235
236
237

    edges = [3, 5, 7, 10]
    ef = g.edata['e']
    # out place run
    g.apply_edges(apply_edge_func, edges)
    new_ef = g.edata['e']
    # in place run
    g.edata['e'] = ef
    g.apply_edges(apply_edge_func, edges, inplace=True)
    g.edata['e'] = ef
238
    assert F.allclose(ef, new_ef)
Lingfan Yu's avatar
Lingfan Yu committed
239
240
241
    # test apply all edges, should not be done in place
    g.edata['e'] == ef
    g.apply_edges(apply_edge_func, inplace=True)
242
    assert F.allclose(ef, g.edata['e']) == False
Lingfan Yu's avatar
Lingfan Yu committed
243
244
245
246
247
248

if __name__ == '__main__':
    test_inplace_snr()
    test_inplace_push()
    test_inplace_pull()
    test_inplace_apply()