test_pickle.py 6.74 KB
Newer Older
1
import networkx as nx
2
import scipy.sparse as ssp
Gan Quan's avatar
Gan Quan committed
3
import dgl
4
import dgl.contrib as contrib
Gan Quan's avatar
Gan Quan committed
5
6
from dgl.graph_index import create_graph_index
from dgl.utils import toindex
7
8
import backend as F
import dgl.function as fn
Gan Quan's avatar
Gan Quan committed
9
10
import pickle
import io
11
12
import unittest, pytest
import test_utils
nv-dlasalle's avatar
nv-dlasalle committed
13
from test_utils import parametrize_idtype, get_cases
14
from utils import assert_is_identical, assert_is_identical_hetero
15

16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def _assert_is_identical_nodeflow(nf1, nf2):
    assert nf1.is_readonly == nf2.is_readonly
    assert nf1.number_of_nodes() == nf2.number_of_nodes()
    src, dst = nf1.all_edges()
    src2, dst2 = nf2.all_edges()
    assert F.array_equal(src, src2)
    assert F.array_equal(dst, dst2)

    assert nf1.num_layers == nf2.num_layers
    for i in range(nf1.num_layers):
        assert nf1.layer_size(i) == nf2.layer_size(i)
        assert nf1.layers[i].data.keys() == nf2.layers[i].data.keys()
        for k in nf1.layers[i].data:
            assert F.allclose(nf1.layers[i].data[k], nf2.layers[i].data[k])
    assert nf1.num_blocks == nf2.num_blocks
    for i in range(nf1.num_blocks):
        assert nf1.block_size(i) == nf2.block_size(i)
        assert nf1.blocks[i].data.keys() == nf2.blocks[i].data.keys()
        for k in nf1.blocks[i].data:
            assert F.allclose(nf1.blocks[i].data[k], nf2.blocks[i].data[k])

def _assert_is_identical_batchedgraph(bg1, bg2):
38
    assert_is_identical(bg1, bg2)
39
40
41
42
    assert bg1.batch_size == bg2.batch_size
    assert bg1.batch_num_nodes == bg2.batch_num_nodes
    assert bg1.batch_num_edges == bg2.batch_num_edges

43
def _assert_is_identical_batchedhetero(bg1, bg2):
44
    assert_is_identical_hetero(bg1, bg2)
45
46
47
48
49
    for ntype in bg1.ntypes:
        assert bg1.batch_num_nodes(ntype) == bg2.batch_num_nodes(ntype)
    for canonical_etype in bg1.canonical_etypes:
        assert bg1.batch_num_edges(canonical_etype) == bg2.batch_num_edges(canonical_etype)

50
51
52
53
def _assert_is_identical_index(i1, i2):
    assert i1.slice_data() == i2.slice_data()
    assert F.array_equal(i1.tousertensor(), i2.tousertensor())

Gan Quan's avatar
Gan Quan committed
54
55
56
57
58
59
60
61
62
63
def _reconstruct_pickle(obj):
    f = io.BytesIO()
    pickle.dump(obj, f)
    f.seek(0)
    obj = pickle.load(f)
    f.close()

    return obj

def test_pickling_index():
64
    # normal index
Gan Quan's avatar
Gan Quan committed
65
66
67
68
    i = toindex([1, 2, 3])
    i.tousertensor()
    i.todgltensor() # construct a dgl tensor which is unpicklable
    i2 = _reconstruct_pickle(i)
69
    _assert_is_identical_index(i, i2)
Gan Quan's avatar
Gan Quan committed
70

71
72
73
74
    # slice index
    i = toindex(slice(5, 10))
    i2 = _reconstruct_pickle(i)
    _assert_is_identical_index(i, i2)
Gan Quan's avatar
Gan Quan committed
75
76

def test_pickling_graph_index():
77
    gi = create_graph_index(None, False)
Gan Quan's avatar
Gan Quan committed
78
79
80
81
82
83
84
85
86
    gi.add_nodes(3)
    src_idx = toindex([0, 0])
    dst_idx = toindex([1, 2])
    gi.add_edges(src_idx, dst_idx)

    gi2 = _reconstruct_pickle(gi)

    assert gi2.number_of_nodes() == gi.number_of_nodes()
    src_idx2, dst_idx2, _ = gi2.edges()
87
88
    assert F.array_equal(src_idx.tousertensor(), src_idx2.tousertensor())
    assert F.array_equal(dst_idx.tousertensor(), dst_idx2.tousertensor())
Gan Quan's avatar
Gan Quan committed
89
90
91
92
93


def _global_message_func(nodes):
    return {'x': nodes.data['x']}

94
@unittest.skipIf(F._default_context_str == 'gpu', reason="GPU not implemented")
nv-dlasalle's avatar
nv-dlasalle committed
95
@parametrize_idtype
96
@pytest.mark.parametrize('g', get_cases(exclude=['dglgraph', 'two_hetero_batch']))
97
98
def test_pickling_graph(g, idtype):
    g = g.astype(idtype)
99
    new_g = _reconstruct_pickle(g)
100
    test_utils.check_graph_equal(g, new_g, check_feature=True)
101

102
@unittest.skipIf(F._default_context_str == 'gpu', reason="GPU not implemented")
103
104
def test_pickling_batched_heterograph():
    # copied from test_heterograph.create_test_heterograph()
105
106
107
108
109
110
111
112
113
114
115
116
    g = dgl.heterograph({
        ('user', 'follows', 'user'): ([0, 1], [1, 2]),
        ('user', 'plays', 'game'): ([0, 1, 2, 1], [0, 0, 1, 1]),
        ('user', 'wishes', 'game'): ([0, 2], [1, 0]),
        ('developer', 'develops', 'game'): ([0, 1], [0, 1])
    })
    g2 = dgl.heterograph({
        ('user', 'follows', 'user'): ([0, 1], [1, 2]),
        ('user', 'plays', 'game'): ([0, 1, 2, 1], [0, 0, 1, 1]),
        ('user', 'wishes', 'game'): ([0, 2], [1, 0]),
        ('developer', 'develops', 'game'): ([0, 1], [0, 1])
    })
117
118
119
120
121
122
123
124

    g.nodes['user'].data['u_h'] = F.randn((3, 4))
    g.nodes['game'].data['g_h'] = F.randn((2, 5))
    g.edges['plays'].data['p_h'] = F.randn((4, 6))
    g2.nodes['user'].data['u_h'] = F.randn((3, 4))
    g2.nodes['game'].data['g_h'] = F.randn((2, 5))
    g2.edges['plays'].data['p_h'] = F.randn((4, 6))

peizhou001's avatar
peizhou001 committed
125
    bg = dgl.batch([g, g2])
126
    new_bg = _reconstruct_pickle(bg)
127
    test_utils.check_graph_equal(bg, new_bg)
128

129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
@unittest.skipIf(F._default_context_str == 'gpu', reason="GPU edge_subgraph w/ relabeling not implemented")
def test_pickling_subgraph():
    f1 = io.BytesIO()
    f2 = io.BytesIO()
    g = dgl.rand_graph(10000, 100000)
    g.ndata['x'] = F.randn((10000, 4))
    g.edata['x'] = F.randn((100000, 5))
    pickle.dump(g, f1)
    sg = g.subgraph([0, 1])
    sgx = sg.ndata['x'] # materialize
    pickle.dump(sg, f2)
    # TODO(BarclayII): How should I test that the size of the subgraph pickle file should not
    # be as large as the size of the original pickle file?
    assert f1.tell() > f2.tell() * 50

    f2.seek(0)
    f2.truncate()
    sgx = sg.edata['x'] # materialize
    pickle.dump(sg, f2)
    assert f1.tell() > f2.tell() * 50

    f2.seek(0)
    f2.truncate()
    sg = g.edge_subgraph([0])
    sgx = sg.edata['x'] # materialize
    pickle.dump(sg, f2)
    assert f1.tell() > f2.tell() * 50

    f2.seek(0)
    f2.truncate()
    sgx = sg.ndata['x'] # materialize
    pickle.dump(sg, f2)
    assert f1.tell() > f2.tell() * 50

    f1.close()
    f2.close()

166
167
@unittest.skipIf(F._default_context_str != 'gpu', reason="Need GPU for pin")
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TensorFlow create graph on gpu when unpickle")
nv-dlasalle's avatar
nv-dlasalle committed
168
@parametrize_idtype
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def test_pickling_is_pinned(idtype):
    from copy import deepcopy
    g = dgl.rand_graph(10, 20, idtype=idtype, device=F.cpu())
    hg = dgl.heterograph({
        ('user', 'follows', 'user'): ([0, 1], [1, 2]),
        ('user', 'plays', 'game'): ([0, 1, 2, 1], [0, 0, 1, 1]),
        ('user', 'wishes', 'game'): ([0, 2], [1, 0]),
        ('developer', 'develops', 'game'): ([0, 1], [0, 1])
    }, idtype=idtype, device=F.cpu())
    for graph in [g, hg]:
        assert not graph.is_pinned()
        graph.pin_memory_()
        assert graph.is_pinned()
        pg = _reconstruct_pickle(graph)
        assert pg.is_pinned()
        pg.unpin_memory_()
        dg = deepcopy(graph)
        assert dg.is_pinned()
        dg.unpin_memory_()
        graph.unpin_memory_()


Gan Quan's avatar
Gan Quan committed
191
192
193
194
195
if __name__ == '__main__':
    test_pickling_index()
    test_pickling_graph_index()
    test_pickling_frame()
    test_pickling_graph()
196
    test_pickling_nodeflow()
197
    test_pickling_batched_graph()
198
    test_pickling_heterograph()
199
    test_pickling_batched_heterograph()
200
    test_pickling_is_pinned()