test_dist_graph_store.py 15.9 KB
Newer Older
1
2
3
4
5
6
import os
os.environ['OMP_NUM_THREADS'] = '1'
import dgl
import sys
import numpy as np
import time
7
import socket
8
9
10
11
from scipy import sparse as spsp
from numpy.testing import assert_array_equal
from multiprocessing import Process, Manager, Condition, Value
import multiprocessing as mp
12
from dgl.heterograph_index import create_unitgraph_from_coo
13
14
from dgl.data.utils import load_graphs, save_graphs
from dgl.distributed import DistGraphServer, DistGraph
15
from dgl.distributed import partition_graph, load_partition, load_partition_book, node_split, edge_split
16
from dgl.distributed import SparseAdagrad, DistEmbedding
17
from numpy.testing import assert_almost_equal
18
import backend as F
19
import math
20
21
22
import unittest
import pickle

23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
if os.name != 'nt':
    import fcntl
    import struct

def get_local_usable_addr():
    """Get local usable IP and port

    Returns
    -------
    str
        IP address, e.g., '192.168.8.12:50051'
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    try:
        # doesn't even have to be reachable
        sock.connect(('10.255.255.255', 1))
        ip_addr = sock.getsockname()[0]
    except ValueError:
        ip_addr = '127.0.0.1'
    finally:
        sock.close()
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(("", 0))
    sock.listen(1)
    port = sock.getsockname()[1]
    sock.close()

    return ip_addr + ' ' + str(port)

52
def create_random_graph(n):
53
    arr = (spsp.random(n, n, density=0.001, format='coo', random_state=100) != 0).astype(np.int64)
54
    return dgl.from_scipy(arr)
55

56
57
def run_server(graph_name, server_id, server_count, num_clients, shared_mem):
    g = DistGraphServer(server_id, "kv_ip_config.txt", num_clients, server_count,
58
59
                        '/tmp/dist_graph/{}.json'.format(graph_name),
                        disable_shared_mem=not shared_mem)
60
61
62
    print('start server', server_id)
    g.start()

63
64
65
def emb_init(shape, dtype):
    return F.zeros(shape, dtype, F.cpu())

66
def rand_init(shape, dtype):
67
    return F.tensor(np.random.normal(size=shape), F.float32)
68

69
def run_client(graph_name, part_id, server_count, num_clients, num_nodes, num_edges):
70
    time.sleep(5)
71
    dgl.distributed.initialize("kv_ip_config.txt", server_count)
72
73
    gpb, graph_name = load_partition_book('/tmp/dist_graph/{}.json'.format(graph_name),
                                          part_id, None)
74
    g = DistGraph(graph_name, gpb=gpb)
75
    check_dist_graph(g, num_clients, num_nodes, num_edges)
76

77
def check_dist_graph(g, num_clients, num_nodes, num_edges):
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
    # Test API
    assert g.number_of_nodes() == num_nodes
    assert g.number_of_edges() == num_edges

    # Test reading node data
    nids = F.arange(0, int(g.number_of_nodes() / 2))
    feats1 = g.ndata['features'][nids]
    feats = F.squeeze(feats1, 1)
    assert np.all(F.asnumpy(feats == nids))

    # Test reading edge data
    eids = F.arange(0, int(g.number_of_edges() / 2))
    feats1 = g.edata['features'][eids]
    feats = F.squeeze(feats1, 1)
    assert np.all(F.asnumpy(feats == eids))

    # Test init node data
    new_shape = (g.number_of_nodes(), 2)
96
    g.ndata['test1'] = dgl.distributed.DistTensor(new_shape, F.int32)
97
98
99
    feats = g.ndata['test1'][nids]
    assert np.all(F.asnumpy(feats) == 0)

100
    # reference to a one that exists
101
102
    test2 = dgl.distributed.DistTensor(new_shape, F.float32, 'test2', init_func=rand_init)
    test3 = dgl.distributed.DistTensor(new_shape, F.float32, 'test2')
103
104
105
    assert np.all(F.asnumpy(test2[nids]) == F.asnumpy(test3[nids]))

    # create a tensor and destroy a tensor and create it again.
106
    test3 = dgl.distributed.DistTensor(new_shape, F.float32, 'test3', init_func=rand_init)
107
    del test3
108
    test3 = dgl.distributed.DistTensor((g.number_of_nodes(), 3), F.float32, 'test3')
109
110
    del test3

Da Zheng's avatar
Da Zheng committed
111
112
113
114
115
116
117
118
    # add tests for anonymous distributed tensor.
    test3 = dgl.distributed.DistTensor(new_shape, F.float32, init_func=rand_init)
    data = test3[0:10]
    test4 = dgl.distributed.DistTensor(new_shape, F.float32, init_func=rand_init)
    del test3
    test5 = dgl.distributed.DistTensor(new_shape, F.float32, init_func=rand_init)
    assert np.sum(F.asnumpy(test5[0:10] != data)) > 0

119
    # test a persistent tesnor
120
    test4 = dgl.distributed.DistTensor(new_shape, F.float32, 'test4', init_func=rand_init,
121
122
123
                                       persistent=True)
    del test4
    try:
124
        test4 = dgl.distributed.DistTensor((g.number_of_nodes(), 3), F.float32, 'test4')
125
126
127
        raise Exception('')
    except:
        pass
128

129
130
    # Test sparse emb
    try:
131
        emb = DistEmbedding(g.number_of_nodes(), 1, 'emb1', emb_init)
132
133
134
135
136
137
138
139
140
        lr = 0.001
        optimizer = SparseAdagrad([emb], lr=lr)
        with F.record_grad():
            feats = emb(nids)
            assert np.all(F.asnumpy(feats) == np.zeros((len(nids), 1)))
            loss = F.sum(feats + 1, 0)
        loss.backward()
        optimizer.step()
        feats = emb(nids)
141
142
        if num_clients == 1:
            assert_almost_equal(F.asnumpy(feats), np.ones((len(nids), 1)) * -lr)
143
144
145
146
147
        rest = np.setdiff1d(np.arange(g.number_of_nodes()), F.asnumpy(nids))
        feats1 = emb(rest)
        assert np.all(F.asnumpy(feats1) == np.zeros((len(rest), 1)))

        policy = dgl.distributed.PartitionPolicy('node', g.get_partition_book())
148
        grad_sum = dgl.distributed.DistTensor((g.number_of_nodes(),), F.float32,
149
                                              'emb1_sum', policy)
150
151
        if num_clients == 1:
            assert np.all(F.asnumpy(grad_sum[nids]) == np.ones((len(nids), 1)) * num_clients)
152
153
        assert np.all(F.asnumpy(grad_sum[rest]) == np.zeros((len(rest), 1)))

154
        emb = DistEmbedding(g.number_of_nodes(), 1, 'emb2', emb_init)
155
156
157
158
        with F.no_grad():
            feats1 = emb(nids)
        assert np.all(F.asnumpy(feats1) == 0)

159
160
161
162
163
164
165
166
167
        optimizer = SparseAdagrad([emb], lr=lr)
        with F.record_grad():
            feats1 = emb(nids)
            feats2 = emb(nids)
            feats = F.cat([feats1, feats2], 0)
            assert np.all(F.asnumpy(feats) == np.zeros((len(nids) * 2, 1)))
            loss = F.sum(feats + 1, 0)
        loss.backward()
        optimizer.step()
168
169
        with F.no_grad():
            feats = emb(nids)
170
171
        if num_clients == 1:
            assert_almost_equal(F.asnumpy(feats), np.ones((len(nids), 1)) * math.sqrt(2) * -lr)
172
173
174
175
176
177
        rest = np.setdiff1d(np.arange(g.number_of_nodes()), F.asnumpy(nids))
        feats1 = emb(rest)
        assert np.all(F.asnumpy(feats1) == np.zeros((len(rest), 1)))
    except NotImplementedError as e:
        pass

178
179
180
181
182
183
184
185
186
187
188
189
190
191
    # Test write data
    new_feats = F.ones((len(nids), 2), F.int32, F.cpu())
    g.ndata['test1'][nids] = new_feats
    feats = g.ndata['test1'][nids]
    assert np.all(F.asnumpy(feats) == 1)

    # Test metadata operations.
    assert len(g.ndata['features']) == g.number_of_nodes()
    assert g.ndata['features'].shape == (g.number_of_nodes(), 1)
    assert g.ndata['features'].dtype == F.int64
    assert g.node_attr_schemes()['features'].dtype == F.int64
    assert g.node_attr_schemes()['test1'].dtype == F.int32
    assert g.node_attr_schemes()['features'].shape == (1,)

192
193
    selected_nodes = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
    # Test node split
194
    nodes = node_split(selected_nodes, g.get_partition_book())
195
196
197
198
199
200
    nodes = F.asnumpy(nodes)
    # We only have one partition, so the local nodes are basically all nodes in the graph.
    local_nids = np.arange(g.number_of_nodes())
    for n in nodes:
        assert n in local_nids

201
202
    print('end')

203
def check_server_client(shared_mem, num_servers, num_clients):
204
    prepare_dist()
205
206
207
208
    g = create_random_graph(10000)

    # Partition the graph
    num_parts = 1
209
    graph_name = 'dist_graph_test_2'
210
211
    g.ndata['features'] = F.unsqueeze(F.arange(0, g.number_of_nodes()), 1)
    g.edata['features'] = F.unsqueeze(F.arange(0, g.number_of_edges()), 1)
212
    partition_graph(g, graph_name, num_parts, '/tmp/dist_graph')
213
214
215
216

    # let's just test on one partition for now.
    # We cannot run multiple servers and clients on the same machine.
    serv_ps = []
217
    ctx = mp.get_context('spawn')
218
    for serv_id in range(num_servers):
219
        p = ctx.Process(target=run_server, args=(graph_name, serv_id, num_servers,
220
                                                 num_clients, shared_mem))
221
222
223
224
        serv_ps.append(p)
        p.start()

    cli_ps = []
225
    for cli_id in range(num_clients):
226
        print('start client', cli_id)
227
        p = ctx.Process(target=run_client, args=(graph_name, 0, num_servers, num_clients, g.number_of_nodes(),
228
                                                 g.number_of_edges()))
229
230
231
232
233
        p.start()
        cli_ps.append(p)

    for p in cli_ps:
        p.join()
234
235
236
237

    for p in serv_ps:
        p.join()

238
239
    print('clients have terminated')

240
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
241
242
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support some of operations in DistGraph")
def test_server_client():
243
    os.environ['DGL_DIST_MODE'] = 'distributed'
244
245
246
247
    check_server_client(True, 1, 1)
    check_server_client(False, 1, 1)
    check_server_client(True, 2, 2)
    check_server_client(False, 2, 2)
248

249
250
251
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support some of operations in DistGraph")
def test_standalone():
    os.environ['DGL_DIST_MODE'] = 'standalone'
Da Zheng's avatar
Da Zheng committed
252

253
254
255
256
257
258
259
    g = create_random_graph(10000)
    # Partition the graph
    num_parts = 1
    graph_name = 'dist_graph_test_3'
    g.ndata['features'] = F.unsqueeze(F.arange(0, g.number_of_nodes()), 1)
    g.edata['features'] = F.unsqueeze(F.arange(0, g.number_of_edges()), 1)
    partition_graph(g, graph_name, num_parts, '/tmp/dist_graph')
260
261

    dgl.distributed.initialize("kv_ip_config.txt")
262
    dist_g = DistGraph(graph_name, part_config='/tmp/dist_graph/{}.json'.format(graph_name))
263
264
265
266
    try:
        check_dist_graph(dist_g, 1, g.number_of_nodes(), g.number_of_edges())
    except Exception as e:
        print(e)
267
    dgl.distributed.exit_client() # this is needed since there's two test here in one process
268

269
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
270
def test_split():
271
    #prepare_dist()
272
273
274
    g = create_random_graph(10000)
    num_parts = 4
    num_hops = 2
275
    partition_graph(g, 'dist_graph_test', num_parts, '/tmp/dist_graph', num_hops=num_hops, part_method='metis')
276
277
278
279
280

    node_mask = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
    edge_mask = np.random.randint(0, 100, size=g.number_of_edges()) > 30
    selected_nodes = np.nonzero(node_mask)[0]
    selected_edges = np.nonzero(edge_mask)[0]
Da Zheng's avatar
Da Zheng committed
281
282
283
284
285
286
287
288
289

    # The code now collects the roles of all client processes and use the information
    # to determine how to split the workloads. Here is to simulate the multi-client
    # use case.
    def set_roles(num_clients):
        dgl.distributed.role.CUR_ROLE = 'default'
        dgl.distributed.role.GLOBAL_RANK = {i:i for i in range(num_clients)}
        dgl.distributed.role.PER_ROLE_RANK['default'] = {i:i for i in range(num_clients)}

290
    for i in range(num_parts):
Da Zheng's avatar
Da Zheng committed
291
        set_roles(num_parts)
292
        part_g, node_feats, edge_feats, gpb, _ = load_partition('/tmp/dist_graph/dist_graph_test.json', i)
Da Zheng's avatar
Da Zheng committed
293
        local_nids = F.nonzero_1d(part_g.ndata['inner_node'])
294
295
        local_nids = F.gather_row(part_g.ndata[dgl.NID], local_nids)
        nodes1 = np.intersect1d(selected_nodes, F.asnumpy(local_nids))
Da Zheng's avatar
Da Zheng committed
296
        nodes2 = node_split(node_mask, gpb, i, force_even=False)
297
298
299
300
301
        assert np.all(np.sort(nodes1) == np.sort(F.asnumpy(nodes2)))
        local_nids = F.asnumpy(local_nids)
        for n in nodes1:
            assert n in local_nids

Da Zheng's avatar
Da Zheng committed
302
        set_roles(num_parts * 2)
Da Zheng's avatar
Da Zheng committed
303
304
        nodes3 = node_split(node_mask, gpb, i * 2, force_even=False)
        nodes4 = node_split(node_mask, gpb, i * 2 + 1, force_even=False)
305
306
307
        nodes5 = F.cat([nodes3, nodes4], 0)
        assert np.all(np.sort(nodes1) == np.sort(F.asnumpy(nodes5)))

Da Zheng's avatar
Da Zheng committed
308
        set_roles(num_parts)
Da Zheng's avatar
Da Zheng committed
309
        local_eids = F.nonzero_1d(part_g.edata['inner_edge'])
310
311
        local_eids = F.gather_row(part_g.edata[dgl.EID], local_eids)
        edges1 = np.intersect1d(selected_edges, F.asnumpy(local_eids))
Da Zheng's avatar
Da Zheng committed
312
        edges2 = edge_split(edge_mask, gpb, i, force_even=False)
313
314
315
316
317
        assert np.all(np.sort(edges1) == np.sort(F.asnumpy(edges2)))
        local_eids = F.asnumpy(local_eids)
        for e in edges1:
            assert e in local_eids

Da Zheng's avatar
Da Zheng committed
318
        set_roles(num_parts * 2)
Da Zheng's avatar
Da Zheng committed
319
320
        edges3 = edge_split(edge_mask, gpb, i * 2, force_even=False)
        edges4 = edge_split(edge_mask, gpb, i * 2 + 1, force_even=False)
321
322
323
        edges5 = F.cat([edges3, edges4], 0)
        assert np.all(np.sort(edges1) == np.sort(F.asnumpy(edges5)))

324
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
325
def test_split_even():
326
    #prepare_dist(1)
327
328
329
330
331
332
333
334
335
336
337
338
339
    g = create_random_graph(10000)
    num_parts = 4
    num_hops = 2
    partition_graph(g, 'dist_graph_test', num_parts, '/tmp/dist_graph', num_hops=num_hops, part_method='metis')

    node_mask = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
    edge_mask = np.random.randint(0, 100, size=g.number_of_edges()) > 30
    selected_nodes = np.nonzero(node_mask)[0]
    selected_edges = np.nonzero(edge_mask)[0]
    all_nodes1 = []
    all_nodes2 = []
    all_edges1 = []
    all_edges2 = []
Da Zheng's avatar
Da Zheng committed
340
341
342
343
344
345
346
347
348

    # The code now collects the roles of all client processes and use the information
    # to determine how to split the workloads. Here is to simulate the multi-client
    # use case.
    def set_roles(num_clients):
        dgl.distributed.role.CUR_ROLE = 'default'
        dgl.distributed.role.GLOBAL_RANK = {i:i for i in range(num_clients)}
        dgl.distributed.role.PER_ROLE_RANK['default'] = {i:i for i in range(num_clients)}

349
    for i in range(num_parts):
Da Zheng's avatar
Da Zheng committed
350
        set_roles(num_parts)
351
        part_g, node_feats, edge_feats, gpb, _ = load_partition('/tmp/dist_graph/dist_graph_test.json', i)
352
353
354
355
356
357
358
        local_nids = F.nonzero_1d(part_g.ndata['inner_node'])
        local_nids = F.gather_row(part_g.ndata[dgl.NID], local_nids)
        nodes = node_split(node_mask, gpb, i, force_even=True)
        all_nodes1.append(nodes)
        subset = np.intersect1d(F.asnumpy(nodes), F.asnumpy(local_nids))
        print('part {} get {} nodes and {} are in the partition'.format(i, len(nodes), len(subset)))

Da Zheng's avatar
Da Zheng committed
359
        set_roles(num_parts * 2)
360
361
362
363
364
365
366
        nodes1 = node_split(node_mask, gpb, i * 2, force_even=True)
        nodes2 = node_split(node_mask, gpb, i * 2 + 1, force_even=True)
        nodes3 = F.cat([nodes1, nodes2], 0)
        all_nodes2.append(nodes3)
        subset = np.intersect1d(F.asnumpy(nodes), F.asnumpy(nodes3))
        print('intersection has', len(subset))

Da Zheng's avatar
Da Zheng committed
367
        set_roles(num_parts)
368
369
370
371
372
373
374
        local_eids = F.nonzero_1d(part_g.edata['inner_edge'])
        local_eids = F.gather_row(part_g.edata[dgl.EID], local_eids)
        edges = edge_split(edge_mask, gpb, i, force_even=True)
        all_edges1.append(edges)
        subset = np.intersect1d(F.asnumpy(edges), F.asnumpy(local_eids))
        print('part {} get {} edges and {} are in the partition'.format(i, len(edges), len(subset)))

Da Zheng's avatar
Da Zheng committed
375
        set_roles(num_parts * 2)
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
        edges1 = edge_split(edge_mask, gpb, i * 2, force_even=True)
        edges2 = edge_split(edge_mask, gpb, i * 2 + 1, force_even=True)
        edges3 = F.cat([edges1, edges2], 0)
        all_edges2.append(edges3)
        subset = np.intersect1d(F.asnumpy(edges), F.asnumpy(edges3))
        print('intersection has', len(subset))
    all_nodes1 = F.cat(all_nodes1, 0)
    all_edges1 = F.cat(all_edges1, 0)
    all_nodes2 = F.cat(all_nodes2, 0)
    all_edges2 = F.cat(all_edges2, 0)
    all_nodes = np.nonzero(node_mask)[0]
    all_edges = np.nonzero(edge_mask)[0]
    assert np.all(all_nodes == F.asnumpy(all_nodes1))
    assert np.all(all_edges == F.asnumpy(all_edges1))
    assert np.all(all_nodes == F.asnumpy(all_nodes2))
    assert np.all(all_edges == F.asnumpy(all_edges2))

393
def prepare_dist():
394
    ip_config = open("kv_ip_config.txt", "w")
395
    ip_addr = get_local_usable_addr()
396
    ip_config.write('{}\n'.format(ip_addr))
397
398
    ip_config.close()

399
if __name__ == '__main__':
Da Zheng's avatar
Da Zheng committed
400
    os.makedirs('/tmp/dist_graph', exist_ok=True)
401
402
    test_split()
    test_split_even()
Da Zheng's avatar
Da Zheng committed
403
    test_server_client()
404
    test_standalone()