"src/vscode:/vscode.git/clone" did not exist on "10db5d0b88e9dd8a1a0a1ea38b6dbbaca912c861"
test_dist_graph_store.py 12.2 KB
Newer Older
1
2
3
4
5
6
import os
os.environ['OMP_NUM_THREADS'] = '1'
import dgl
import sys
import numpy as np
import time
7
import socket
8
9
10
11
12
13
14
from scipy import sparse as spsp
from numpy.testing import assert_array_equal
from multiprocessing import Process, Manager, Condition, Value
import multiprocessing as mp
from dgl.graph_index import create_graph_index
from dgl.data.utils import load_graphs, save_graphs
from dgl.distributed import DistGraphServer, DistGraph
15
from dgl.distributed import partition_graph, load_partition, load_partition_book, node_split, edge_split
16
17
from dgl.distributed import SparseAdagrad, SparseNodeEmbedding
from numpy.testing import assert_almost_equal
18
import backend as F
19
import math
20
21
22
import unittest
import pickle

23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
if os.name != 'nt':
    import fcntl
    import struct

def get_local_usable_addr():
    """Get local usable IP and port

    Returns
    -------
    str
        IP address, e.g., '192.168.8.12:50051'
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    try:
        # doesn't even have to be reachable
        sock.connect(('10.255.255.255', 1))
        ip_addr = sock.getsockname()[0]
    except ValueError:
        ip_addr = '127.0.0.1'
    finally:
        sock.close()
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(("", 0))
    sock.listen(1)
    port = sock.getsockname()[1]
    sock.close()

    return ip_addr + ' ' + str(port)

52
53
54
55
56
def create_random_graph(n):
    arr = (spsp.random(n, n, density=0.001, format='coo') != 0).astype(np.int64)
    ig = create_graph_index(arr, readonly=True)
    return dgl.DGLGraph(ig)

57
def run_server(graph_name, server_id, num_clients, shared_mem):
58
    g = DistGraphServer(server_id, "kv_ip_config.txt", num_clients,
59
60
                        '/tmp/dist_graph/{}.json'.format(graph_name),
                        disable_shared_mem=not shared_mem)
61
62
63
    print('start server', server_id)
    g.start()

64
65
66
def emb_init(shape, dtype):
    return F.zeros(shape, dtype, F.cpu())

67
68
def run_client(graph_name, part_id, num_nodes, num_edges):
    time.sleep(5)
69
70
    gpb, graph_name = load_partition_book('/tmp/dist_graph/{}.json'.format(graph_name),
                                          part_id, None)
71
    g = DistGraph("kv_ip_config.txt", graph_name, gpb=gpb)
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100

    # Test API
    assert g.number_of_nodes() == num_nodes
    assert g.number_of_edges() == num_edges

    # Test reading node data
    nids = F.arange(0, int(g.number_of_nodes() / 2))
    feats1 = g.ndata['features'][nids]
    feats = F.squeeze(feats1, 1)
    assert np.all(F.asnumpy(feats == nids))

    # Test reading edge data
    eids = F.arange(0, int(g.number_of_edges() / 2))
    feats1 = g.edata['features'][eids]
    feats = F.squeeze(feats1, 1)
    assert np.all(F.asnumpy(feats == eids))

    # Test init node data
    new_shape = (g.number_of_nodes(), 2)
    g.init_ndata('test1', new_shape, F.int32)
    feats = g.ndata['test1'][nids]
    assert np.all(F.asnumpy(feats) == 0)

    # Test init edge data
    new_shape = (g.number_of_edges(), 2)
    g.init_edata('test1', new_shape, F.int32)
    feats = g.edata['test1'][eids]
    assert np.all(F.asnumpy(feats) == 0)

101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
    # Test sparse emb
    try:
        new_shape = (g.number_of_nodes(), 1)
        emb = SparseNodeEmbedding(g, 'emb1', new_shape, emb_init)
        lr = 0.001
        optimizer = SparseAdagrad([emb], lr=lr)
        with F.record_grad():
            feats = emb(nids)
            assert np.all(F.asnumpy(feats) == np.zeros((len(nids), 1)))
            loss = F.sum(feats + 1, 0)
        loss.backward()
        optimizer.step()
        feats = emb(nids)
        assert_almost_equal(F.asnumpy(feats), np.ones((len(nids), 1)) * -lr)
        rest = np.setdiff1d(np.arange(g.number_of_nodes()), F.asnumpy(nids))
        feats1 = emb(rest)
        assert np.all(F.asnumpy(feats1) == np.zeros((len(rest), 1)))

        policy = dgl.distributed.PartitionPolicy('node', g.get_partition_book())
        grad_sum = dgl.distributed.DistTensor(g, 'node:emb1_sum', policy)
        assert np.all(F.asnumpy(grad_sum[nids]) == np.ones((len(nids), 1)))
        assert np.all(F.asnumpy(grad_sum[rest]) == np.zeros((len(rest), 1)))

        emb = SparseNodeEmbedding(g, 'emb2', new_shape, emb_init)
        optimizer = SparseAdagrad([emb], lr=lr)
        with F.record_grad():
            feats1 = emb(nids)
            feats2 = emb(nids)
            feats = F.cat([feats1, feats2], 0)
            assert np.all(F.asnumpy(feats) == np.zeros((len(nids) * 2, 1)))
            loss = F.sum(feats + 1, 0)
        loss.backward()
        optimizer.step()
        feats = emb(nids)
        assert_almost_equal(F.asnumpy(feats), np.ones((len(nids), 1)) * math.sqrt(2) * -lr)
        rest = np.setdiff1d(np.arange(g.number_of_nodes()), F.asnumpy(nids))
        feats1 = emb(rest)
        assert np.all(F.asnumpy(feats1) == np.zeros((len(rest), 1)))
    except NotImplementedError as e:
        pass

142
143
144
145
146
147
148
149
150
151
152
153
154
155
    # Test write data
    new_feats = F.ones((len(nids), 2), F.int32, F.cpu())
    g.ndata['test1'][nids] = new_feats
    feats = g.ndata['test1'][nids]
    assert np.all(F.asnumpy(feats) == 1)

    # Test metadata operations.
    assert len(g.ndata['features']) == g.number_of_nodes()
    assert g.ndata['features'].shape == (g.number_of_nodes(), 1)
    assert g.ndata['features'].dtype == F.int64
    assert g.node_attr_schemes()['features'].dtype == F.int64
    assert g.node_attr_schemes()['test1'].dtype == F.int32
    assert g.node_attr_schemes()['features'].shape == (1,)

156
157
    selected_nodes = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
    # Test node split
158
    nodes = node_split(selected_nodes, g.get_partition_book())
159
160
161
162
163
164
    nodes = F.asnumpy(nodes)
    # We only have one partition, so the local nodes are basically all nodes in the graph.
    local_nids = np.arange(g.number_of_nodes())
    for n in nodes:
        assert n in local_nids

165
166
167
    # clean up
    dgl.distributed.shutdown_servers()
    dgl.distributed.finalize_client()
168
169
    print('end')

170
def check_server_client(shared_mem):
171
    prepare_dist()
172
173
174
175
    g = create_random_graph(10000)

    # Partition the graph
    num_parts = 1
176
    graph_name = 'dist_graph_test_2'
177
178
    g.ndata['features'] = F.unsqueeze(F.arange(0, g.number_of_nodes()), 1)
    g.edata['features'] = F.unsqueeze(F.arange(0, g.number_of_edges()), 1)
179
    partition_graph(g, graph_name, num_parts, '/tmp/dist_graph')
180
181
182
183

    # let's just test on one partition for now.
    # We cannot run multiple servers and clients on the same machine.
    serv_ps = []
184
    ctx = mp.get_context('spawn')
185
    for serv_id in range(1):
186
        p = ctx.Process(target=run_server, args=(graph_name, serv_id, 1, shared_mem))
187
188
189
190
191
192
        serv_ps.append(p)
        p.start()

    cli_ps = []
    for cli_id in range(1):
        print('start client', cli_id)
193
        p = ctx.Process(target=run_client, args=(graph_name, cli_id, g.number_of_nodes(),
194
                                                 g.number_of_edges()))
195
196
197
198
199
        p.start()
        cli_ps.append(p)

    for p in cli_ps:
        p.join()
200
201
202
203

    for p in serv_ps:
        p.join()

204
205
    print('clients have terminated')

206
207
208
209
210
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support some of operations in DistGraph")
def test_server_client():
    check_server_client(True)
    check_server_client(False)

211
def test_split():
212
    prepare_dist()
213
214
215
    g = create_random_graph(10000)
    num_parts = 4
    num_hops = 2
216
    partition_graph(g, 'dist_graph_test', num_parts, '/tmp/dist_graph', num_hops=num_hops, part_method='metis')
217
218
219
220
221
222

    node_mask = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
    edge_mask = np.random.randint(0, 100, size=g.number_of_edges()) > 30
    selected_nodes = np.nonzero(node_mask)[0]
    selected_edges = np.nonzero(edge_mask)[0]
    for i in range(num_parts):
223
        dgl.distributed.set_num_client(num_parts)
224
        part_g, node_feats, edge_feats, gpb, _ = load_partition('/tmp/dist_graph/dist_graph_test.json', i)
Da Zheng's avatar
Da Zheng committed
225
        local_nids = F.nonzero_1d(part_g.ndata['inner_node'])
226
227
228
229
230
231
232
233
        local_nids = F.gather_row(part_g.ndata[dgl.NID], local_nids)
        nodes1 = np.intersect1d(selected_nodes, F.asnumpy(local_nids))
        nodes2 = node_split(node_mask, gpb, i)
        assert np.all(np.sort(nodes1) == np.sort(F.asnumpy(nodes2)))
        local_nids = F.asnumpy(local_nids)
        for n in nodes1:
            assert n in local_nids

234
235
236
237
238
239
240
        dgl.distributed.set_num_client(num_parts * 2)
        nodes3 = node_split(node_mask, gpb, i * 2)
        nodes4 = node_split(node_mask, gpb, i * 2 + 1)
        nodes5 = F.cat([nodes3, nodes4], 0)
        assert np.all(np.sort(nodes1) == np.sort(F.asnumpy(nodes5)))

        dgl.distributed.set_num_client(num_parts)
Da Zheng's avatar
Da Zheng committed
241
        local_eids = F.nonzero_1d(part_g.edata['inner_edge'])
242
243
244
245
246
247
248
249
        local_eids = F.gather_row(part_g.edata[dgl.EID], local_eids)
        edges1 = np.intersect1d(selected_edges, F.asnumpy(local_eids))
        edges2 = edge_split(edge_mask, gpb, i)
        assert np.all(np.sort(edges1) == np.sort(F.asnumpy(edges2)))
        local_eids = F.asnumpy(local_eids)
        for e in edges1:
            assert e in local_eids

250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
        dgl.distributed.set_num_client(num_parts * 2)
        edges3 = edge_split(edge_mask, gpb, i * 2)
        edges4 = edge_split(edge_mask, gpb, i * 2 + 1)
        edges5 = F.cat([edges3, edges4], 0)
        assert np.all(np.sort(edges1) == np.sort(F.asnumpy(edges5)))

def test_split_even():
    prepare_dist()
    g = create_random_graph(10000)
    num_parts = 4
    num_hops = 2
    partition_graph(g, 'dist_graph_test', num_parts, '/tmp/dist_graph', num_hops=num_hops, part_method='metis')

    node_mask = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
    edge_mask = np.random.randint(0, 100, size=g.number_of_edges()) > 30
    selected_nodes = np.nonzero(node_mask)[0]
    selected_edges = np.nonzero(edge_mask)[0]
    all_nodes1 = []
    all_nodes2 = []
    all_edges1 = []
    all_edges2 = []
    for i in range(num_parts):
        dgl.distributed.set_num_client(num_parts)
273
        part_g, node_feats, edge_feats, gpb, _ = load_partition('/tmp/dist_graph/dist_graph_test.json', i)
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
        local_nids = F.nonzero_1d(part_g.ndata['inner_node'])
        local_nids = F.gather_row(part_g.ndata[dgl.NID], local_nids)
        nodes = node_split(node_mask, gpb, i, force_even=True)
        all_nodes1.append(nodes)
        subset = np.intersect1d(F.asnumpy(nodes), F.asnumpy(local_nids))
        print('part {} get {} nodes and {} are in the partition'.format(i, len(nodes), len(subset)))

        dgl.distributed.set_num_client(num_parts * 2)
        nodes1 = node_split(node_mask, gpb, i * 2, force_even=True)
        nodes2 = node_split(node_mask, gpb, i * 2 + 1, force_even=True)
        nodes3 = F.cat([nodes1, nodes2], 0)
        all_nodes2.append(nodes3)
        subset = np.intersect1d(F.asnumpy(nodes), F.asnumpy(nodes3))
        print('intersection has', len(subset))

        dgl.distributed.set_num_client(num_parts)
        local_eids = F.nonzero_1d(part_g.edata['inner_edge'])
        local_eids = F.gather_row(part_g.edata[dgl.EID], local_eids)
        edges = edge_split(edge_mask, gpb, i, force_even=True)
        all_edges1.append(edges)
        subset = np.intersect1d(F.asnumpy(edges), F.asnumpy(local_eids))
        print('part {} get {} edges and {} are in the partition'.format(i, len(edges), len(subset)))

        dgl.distributed.set_num_client(num_parts * 2)
        edges1 = edge_split(edge_mask, gpb, i * 2, force_even=True)
        edges2 = edge_split(edge_mask, gpb, i * 2 + 1, force_even=True)
        edges3 = F.cat([edges1, edges2], 0)
        all_edges2.append(edges3)
        subset = np.intersect1d(F.asnumpy(edges), F.asnumpy(edges3))
        print('intersection has', len(subset))
    all_nodes1 = F.cat(all_nodes1, 0)
    all_edges1 = F.cat(all_edges1, 0)
    all_nodes2 = F.cat(all_nodes2, 0)
    all_edges2 = F.cat(all_edges2, 0)
    all_nodes = np.nonzero(node_mask)[0]
    all_edges = np.nonzero(edge_mask)[0]
    assert np.all(all_nodes == F.asnumpy(all_nodes1))
    assert np.all(all_edges == F.asnumpy(all_edges1))
    assert np.all(all_nodes == F.asnumpy(all_nodes2))
    assert np.all(all_edges == F.asnumpy(all_edges2))

315
316
def prepare_dist():
    ip_config = open("kv_ip_config.txt", "w")
317
318
    ip_addr = get_local_usable_addr()
    ip_config.write('%s 1\n' % ip_addr)
319
320
    ip_config.close()

321
if __name__ == '__main__':
Da Zheng's avatar
Da Zheng committed
322
    os.makedirs('/tmp/dist_graph', exist_ok=True)
323
324
    test_split()
    test_split_even()
Da Zheng's avatar
Da Zheng committed
325
    test_server_client()