test_dist_graph_store.py 21.5 KB
Newer Older
1
2
3
4
5
6
import os
os.environ['OMP_NUM_THREADS'] = '1'
import dgl
import sys
import numpy as np
import time
7
import socket
8
9
10
11
from scipy import sparse as spsp
from numpy.testing import assert_array_equal
from multiprocessing import Process, Manager, Condition, Value
import multiprocessing as mp
12
from dgl.heterograph_index import create_unitgraph_from_coo
13
14
from dgl.data.utils import load_graphs, save_graphs
from dgl.distributed import DistGraphServer, DistGraph
15
from dgl.distributed import partition_graph, load_partition, load_partition_book, node_split, edge_split
16
from dgl.distributed import SparseAdagrad, DistEmbedding
17
from numpy.testing import assert_almost_equal
18
import backend as F
19
import math
20
21
22
import unittest
import pickle

23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
if os.name != 'nt':
    import fcntl
    import struct

def get_local_usable_addr():
    """Get local usable IP and port

    Returns
    -------
    str
        IP address, e.g., '192.168.8.12:50051'
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    try:
        # doesn't even have to be reachable
        sock.connect(('10.255.255.255', 1))
        ip_addr = sock.getsockname()[0]
    except ValueError:
        ip_addr = '127.0.0.1'
    finally:
        sock.close()
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(("", 0))
    sock.listen(1)
    port = sock.getsockname()[1]
    sock.close()

    return ip_addr + ' ' + str(port)

52
def create_random_graph(n):
53
    arr = (spsp.random(n, n, density=0.001, format='coo', random_state=100) != 0).astype(np.int64)
54
    return dgl.from_scipy(arr)
55

56
57
def run_server(graph_name, server_id, server_count, num_clients, shared_mem):
    g = DistGraphServer(server_id, "kv_ip_config.txt", num_clients, server_count,
58
59
                        '/tmp/dist_graph/{}.json'.format(graph_name),
                        disable_shared_mem=not shared_mem)
60
61
62
    print('start server', server_id)
    g.start()

63
64
65
def emb_init(shape, dtype):
    return F.zeros(shape, dtype, F.cpu())

66
def rand_init(shape, dtype):
67
    return F.tensor(np.random.normal(size=shape), F.float32)
68

69
def run_client(graph_name, part_id, server_count, num_clients, num_nodes, num_edges):
70
    time.sleep(5)
71
72
    os.environ['DGL_NUM_SERVER'] = str(server_count)
    dgl.distributed.initialize("kv_ip_config.txt")
73
74
    gpb, graph_name, _, _ = load_partition_book('/tmp/dist_graph/{}.json'.format(graph_name),
                                                part_id, None)
75
    g = DistGraph(graph_name, gpb=gpb)
76
    check_dist_graph(g, num_clients, num_nodes, num_edges)
77

78
def check_dist_graph(g, num_clients, num_nodes, num_edges):
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
    # Test API
    assert g.number_of_nodes() == num_nodes
    assert g.number_of_edges() == num_edges

    # Test reading node data
    nids = F.arange(0, int(g.number_of_nodes() / 2))
    feats1 = g.ndata['features'][nids]
    feats = F.squeeze(feats1, 1)
    assert np.all(F.asnumpy(feats == nids))

    # Test reading edge data
    eids = F.arange(0, int(g.number_of_edges() / 2))
    feats1 = g.edata['features'][eids]
    feats = F.squeeze(feats1, 1)
    assert np.all(F.asnumpy(feats == eids))

    # Test init node data
    new_shape = (g.number_of_nodes(), 2)
97
    g.ndata['test1'] = dgl.distributed.DistTensor(new_shape, F.int32)
98
99
100
    feats = g.ndata['test1'][nids]
    assert np.all(F.asnumpy(feats) == 0)

101
    # reference to a one that exists
102
103
    test2 = dgl.distributed.DistTensor(new_shape, F.float32, 'test2', init_func=rand_init)
    test3 = dgl.distributed.DistTensor(new_shape, F.float32, 'test2')
104
105
106
    assert np.all(F.asnumpy(test2[nids]) == F.asnumpy(test3[nids]))

    # create a tensor and destroy a tensor and create it again.
107
    test3 = dgl.distributed.DistTensor(new_shape, F.float32, 'test3', init_func=rand_init)
108
    del test3
109
    test3 = dgl.distributed.DistTensor((g.number_of_nodes(), 3), F.float32, 'test3')
110
111
    del test3

Da Zheng's avatar
Da Zheng committed
112
113
114
115
116
117
118
119
    # add tests for anonymous distributed tensor.
    test3 = dgl.distributed.DistTensor(new_shape, F.float32, init_func=rand_init)
    data = test3[0:10]
    test4 = dgl.distributed.DistTensor(new_shape, F.float32, init_func=rand_init)
    del test3
    test5 = dgl.distributed.DistTensor(new_shape, F.float32, init_func=rand_init)
    assert np.sum(F.asnumpy(test5[0:10] != data)) > 0

120
    # test a persistent tesnor
121
    test4 = dgl.distributed.DistTensor(new_shape, F.float32, 'test4', init_func=rand_init,
122
123
124
                                       persistent=True)
    del test4
    try:
125
        test4 = dgl.distributed.DistTensor((g.number_of_nodes(), 3), F.float32, 'test4')
126
127
128
        raise Exception('')
    except:
        pass
129

130
131
    # Test sparse emb
    try:
132
        emb = DistEmbedding(g.number_of_nodes(), 1, 'emb1', emb_init)
133
134
135
136
137
138
139
140
141
        lr = 0.001
        optimizer = SparseAdagrad([emb], lr=lr)
        with F.record_grad():
            feats = emb(nids)
            assert np.all(F.asnumpy(feats) == np.zeros((len(nids), 1)))
            loss = F.sum(feats + 1, 0)
        loss.backward()
        optimizer.step()
        feats = emb(nids)
142
143
        if num_clients == 1:
            assert_almost_equal(F.asnumpy(feats), np.ones((len(nids), 1)) * -lr)
144
145
146
147
148
        rest = np.setdiff1d(np.arange(g.number_of_nodes()), F.asnumpy(nids))
        feats1 = emb(rest)
        assert np.all(F.asnumpy(feats1) == np.zeros((len(rest), 1)))

        policy = dgl.distributed.PartitionPolicy('node', g.get_partition_book())
149
        grad_sum = dgl.distributed.DistTensor((g.number_of_nodes(),), F.float32,
150
                                              'emb1_sum', policy)
151
152
        if num_clients == 1:
            assert np.all(F.asnumpy(grad_sum[nids]) == np.ones((len(nids), 1)) * num_clients)
153
154
        assert np.all(F.asnumpy(grad_sum[rest]) == np.zeros((len(rest), 1)))

155
        emb = DistEmbedding(g.number_of_nodes(), 1, 'emb2', emb_init)
156
157
158
159
        with F.no_grad():
            feats1 = emb(nids)
        assert np.all(F.asnumpy(feats1) == 0)

160
161
162
163
164
165
166
167
168
        optimizer = SparseAdagrad([emb], lr=lr)
        with F.record_grad():
            feats1 = emb(nids)
            feats2 = emb(nids)
            feats = F.cat([feats1, feats2], 0)
            assert np.all(F.asnumpy(feats) == np.zeros((len(nids) * 2, 1)))
            loss = F.sum(feats + 1, 0)
        loss.backward()
        optimizer.step()
169
170
        with F.no_grad():
            feats = emb(nids)
171
172
        if num_clients == 1:
            assert_almost_equal(F.asnumpy(feats), np.ones((len(nids), 1)) * math.sqrt(2) * -lr)
173
174
175
176
177
178
        rest = np.setdiff1d(np.arange(g.number_of_nodes()), F.asnumpy(nids))
        feats1 = emb(rest)
        assert np.all(F.asnumpy(feats1) == np.zeros((len(rest), 1)))
    except NotImplementedError as e:
        pass

179
180
181
182
183
184
185
186
187
188
189
190
191
192
    # Test write data
    new_feats = F.ones((len(nids), 2), F.int32, F.cpu())
    g.ndata['test1'][nids] = new_feats
    feats = g.ndata['test1'][nids]
    assert np.all(F.asnumpy(feats) == 1)

    # Test metadata operations.
    assert len(g.ndata['features']) == g.number_of_nodes()
    assert g.ndata['features'].shape == (g.number_of_nodes(), 1)
    assert g.ndata['features'].dtype == F.int64
    assert g.node_attr_schemes()['features'].dtype == F.int64
    assert g.node_attr_schemes()['test1'].dtype == F.int32
    assert g.node_attr_schemes()['features'].shape == (1,)

193
194
    selected_nodes = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
    # Test node split
195
    nodes = node_split(selected_nodes, g.get_partition_book())
196
197
198
199
200
201
    nodes = F.asnumpy(nodes)
    # We only have one partition, so the local nodes are basically all nodes in the graph.
    local_nids = np.arange(g.number_of_nodes())
    for n in nodes:
        assert n in local_nids

202
203
    print('end')

204
def check_server_client(shared_mem, num_servers, num_clients):
205
    prepare_dist()
206
207
208
209
    g = create_random_graph(10000)

    # Partition the graph
    num_parts = 1
210
    graph_name = 'dist_graph_test_2'
211
212
    g.ndata['features'] = F.unsqueeze(F.arange(0, g.number_of_nodes()), 1)
    g.edata['features'] = F.unsqueeze(F.arange(0, g.number_of_edges()), 1)
213
    partition_graph(g, graph_name, num_parts, '/tmp/dist_graph')
214
215
216
217

    # let's just test on one partition for now.
    # We cannot run multiple servers and clients on the same machine.
    serv_ps = []
218
    ctx = mp.get_context('spawn')
219
    for serv_id in range(num_servers):
220
        p = ctx.Process(target=run_server, args=(graph_name, serv_id, num_servers,
221
                                                 num_clients, shared_mem))
222
223
224
225
        serv_ps.append(p)
        p.start()

    cli_ps = []
226
    for cli_id in range(num_clients):
227
        print('start client', cli_id)
228
        p = ctx.Process(target=run_client, args=(graph_name, 0, num_servers, num_clients, g.number_of_nodes(),
229
                                                 g.number_of_edges()))
230
231
232
233
234
        p.start()
        cli_ps.append(p)

    for p in cli_ps:
        p.join()
235
236
237
238

    for p in serv_ps:
        p.join()

239
240
    print('clients have terminated')

241
242
243

def run_client_hetero(graph_name, part_id, server_count, num_clients, num_nodes, num_edges):
    time.sleep(5)
244
245
    os.environ['DGL_NUM_SERVER'] = str(server_count)
    dgl.distributed.initialize("kv_ip_config.txt")
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
    gpb, graph_name, _, _ = load_partition_book('/tmp/dist_graph/{}.json'.format(graph_name),
                                                part_id, None)
    g = DistGraph(graph_name, gpb=gpb)
    check_dist_graph_hetero(g, num_clients, num_nodes, num_edges)

def create_random_hetero():
    num_nodes = {'n1': 10000, 'n2': 10010, 'n3': 10020}
    etypes = [('n1', 'r1', 'n2'),
              ('n1', 'r2', 'n3'),
              ('n2', 'r3', 'n3')]
    edges = {}
    for etype in etypes:
        src_ntype, _, dst_ntype = etype
        arr = spsp.random(num_nodes[src_ntype], num_nodes[dst_ntype], density=0.001, format='coo',
                          random_state=100)
        edges[etype] = (arr.row, arr.col)
    g = dgl.heterograph(edges, num_nodes)
    g.nodes['n1'].data['feat'] = F.unsqueeze(F.arange(0, g.number_of_nodes('n1')), 1)
    g.edges['r1'].data['feat'] = F.unsqueeze(F.arange(0, g.number_of_edges('r1')), 1)
    return g

def check_dist_graph_hetero(g, num_clients, num_nodes, num_edges):
    # Test API
    for ntype in num_nodes:
        assert ntype in g.ntypes
        assert num_nodes[ntype] == g.number_of_nodes(ntype)
    for etype in num_edges:
        assert etype in g.etypes
        assert num_edges[etype] == g.number_of_edges(etype)
    assert g.number_of_nodes() == sum([num_nodes[ntype] for ntype in num_nodes])
    assert g.number_of_edges() == sum([num_edges[etype] for etype in num_edges])

    # Test reading node data
    nids = F.arange(0, int(g.number_of_nodes('n1') / 2))
    feats1 = g.nodes['n1'].data['feat'][nids]
    feats = F.squeeze(feats1, 1)
    assert np.all(F.asnumpy(feats == nids))

    # Test reading edge data
    eids = F.arange(0, int(g.number_of_edges('r1') / 2))
    feats1 = g.edges['r1'].data['feat'][eids]
    feats = F.squeeze(feats1, 1)
    assert np.all(F.asnumpy(feats == eids))

    # Test init node data
    new_shape = (g.number_of_nodes('n1'), 2)
    g.nodes['n1'].data['test1'] = dgl.distributed.DistTensor(new_shape, F.int32)
    feats = g.nodes['n1'].data['test1'][nids]
    assert np.all(F.asnumpy(feats) == 0)

    # create a tensor and destroy a tensor and create it again.
    test3 = dgl.distributed.DistTensor(new_shape, F.float32, 'test3', init_func=rand_init)
    del test3
    test3 = dgl.distributed.DistTensor((g.number_of_nodes('n1'), 3), F.float32, 'test3')
    del test3

    # add tests for anonymous distributed tensor.
    test3 = dgl.distributed.DistTensor(new_shape, F.float32, init_func=rand_init)
    data = test3[0:10]
    test4 = dgl.distributed.DistTensor(new_shape, F.float32, init_func=rand_init)
    del test3
    test5 = dgl.distributed.DistTensor(new_shape, F.float32, init_func=rand_init)
    assert np.sum(F.asnumpy(test5[0:10] != data)) > 0

    # test a persistent tesnor
    test4 = dgl.distributed.DistTensor(new_shape, F.float32, 'test4', init_func=rand_init,
                                       persistent=True)
    del test4
    try:
        test4 = dgl.distributed.DistTensor((g.number_of_nodes('n1'), 3), F.float32, 'test4')
        raise Exception('')
    except:
        pass

    # Test write data
    new_feats = F.ones((len(nids), 2), F.int32, F.cpu())
    g.nodes['n1'].data['test1'][nids] = new_feats
    feats = g.nodes['n1'].data['test1'][nids]
    assert np.all(F.asnumpy(feats) == 1)

    # Test metadata operations.
    assert len(g.nodes['n1'].data['feat']) == g.number_of_nodes('n1')
    assert g.nodes['n1'].data['feat'].shape == (g.number_of_nodes('n1'), 1)
    assert g.nodes['n1'].data['feat'].dtype == F.int64

    selected_nodes = np.random.randint(0, 100, size=g.number_of_nodes('n1')) > 30
    # Test node split
    nodes = node_split(selected_nodes, g.get_partition_book(), ntype='n1')
    nodes = F.asnumpy(nodes)
    # We only have one partition, so the local nodes are basically all nodes in the graph.
    local_nids = np.arange(g.number_of_nodes('n1'))
    for n in nodes:
        assert n in local_nids

    print('end')

def check_server_client_hetero(shared_mem, num_servers, num_clients):
    prepare_dist()
    g = create_random_hetero()

    # Partition the graph
    num_parts = 1
    graph_name = 'dist_graph_test_3'
    partition_graph(g, graph_name, num_parts, '/tmp/dist_graph')

    # let's just test on one partition for now.
    # We cannot run multiple servers and clients on the same machine.
    serv_ps = []
    ctx = mp.get_context('spawn')
    for serv_id in range(num_servers):
        p = ctx.Process(target=run_server, args=(graph_name, serv_id, num_servers,
                                                 num_clients, shared_mem))
        serv_ps.append(p)
        p.start()

    cli_ps = []
    num_nodes = {ntype: g.number_of_nodes(ntype) for ntype in g.ntypes}
    num_edges = {etype: g.number_of_edges(etype) for etype in g.etypes}
    for cli_id in range(num_clients):
        print('start client', cli_id)
        p = ctx.Process(target=run_client_hetero, args=(graph_name, 0, num_servers, num_clients, num_nodes,
                                                        num_edges))
        p.start()
        cli_ps.append(p)

    for p in cli_ps:
        p.join()

    for p in serv_ps:
        p.join()

    print('clients have terminated')

379
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
380
381
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support some of operations in DistGraph")
def test_server_client():
382
    os.environ['DGL_DIST_MODE'] = 'distributed'
383
384
    check_server_client_hetero(True, 1, 1)
    check_server_client_hetero(False, 1, 1)
385
386
387
388
    check_server_client(True, 1, 1)
    check_server_client(False, 1, 1)
    check_server_client(True, 2, 2)
    check_server_client(False, 2, 2)
389

390
391
392
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support some of operations in DistGraph")
def test_standalone():
    os.environ['DGL_DIST_MODE'] = 'standalone'
Da Zheng's avatar
Da Zheng committed
393

394
395
396
397
398
399
400
    g = create_random_graph(10000)
    # Partition the graph
    num_parts = 1
    graph_name = 'dist_graph_test_3'
    g.ndata['features'] = F.unsqueeze(F.arange(0, g.number_of_nodes()), 1)
    g.edata['features'] = F.unsqueeze(F.arange(0, g.number_of_edges()), 1)
    partition_graph(g, graph_name, num_parts, '/tmp/dist_graph')
401
402

    dgl.distributed.initialize("kv_ip_config.txt")
403
    dist_g = DistGraph(graph_name, part_config='/tmp/dist_graph/{}.json'.format(graph_name))
404
405
406
407
    try:
        check_dist_graph(dist_g, 1, g.number_of_nodes(), g.number_of_edges())
    except Exception as e:
        print(e)
408
    dgl.distributed.exit_client() # this is needed since there's two test here in one process
409

410
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
411
def test_split():
412
    #prepare_dist()
413
414
415
    g = create_random_graph(10000)
    num_parts = 4
    num_hops = 2
416
    partition_graph(g, 'dist_graph_test', num_parts, '/tmp/dist_graph', num_hops=num_hops, part_method='metis')
417
418
419
420
421

    node_mask = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
    edge_mask = np.random.randint(0, 100, size=g.number_of_edges()) > 30
    selected_nodes = np.nonzero(node_mask)[0]
    selected_edges = np.nonzero(edge_mask)[0]
Da Zheng's avatar
Da Zheng committed
422
423
424
425
426
427
428
429
430

    # The code now collects the roles of all client processes and use the information
    # to determine how to split the workloads. Here is to simulate the multi-client
    # use case.
    def set_roles(num_clients):
        dgl.distributed.role.CUR_ROLE = 'default'
        dgl.distributed.role.GLOBAL_RANK = {i:i for i in range(num_clients)}
        dgl.distributed.role.PER_ROLE_RANK['default'] = {i:i for i in range(num_clients)}

431
    for i in range(num_parts):
Da Zheng's avatar
Da Zheng committed
432
        set_roles(num_parts)
433
        part_g, node_feats, edge_feats, gpb, _, _, _ = load_partition('/tmp/dist_graph/dist_graph_test.json', i)
Da Zheng's avatar
Da Zheng committed
434
        local_nids = F.nonzero_1d(part_g.ndata['inner_node'])
435
436
        local_nids = F.gather_row(part_g.ndata[dgl.NID], local_nids)
        nodes1 = np.intersect1d(selected_nodes, F.asnumpy(local_nids))
437
        nodes2 = node_split(node_mask, gpb, rank=i, force_even=False)
438
439
440
441
442
        assert np.all(np.sort(nodes1) == np.sort(F.asnumpy(nodes2)))
        local_nids = F.asnumpy(local_nids)
        for n in nodes1:
            assert n in local_nids

Da Zheng's avatar
Da Zheng committed
443
        set_roles(num_parts * 2)
444
445
        nodes3 = node_split(node_mask, gpb, rank=i * 2, force_even=False)
        nodes4 = node_split(node_mask, gpb, rank=i * 2 + 1, force_even=False)
446
447
448
        nodes5 = F.cat([nodes3, nodes4], 0)
        assert np.all(np.sort(nodes1) == np.sort(F.asnumpy(nodes5)))

Da Zheng's avatar
Da Zheng committed
449
        set_roles(num_parts)
Da Zheng's avatar
Da Zheng committed
450
        local_eids = F.nonzero_1d(part_g.edata['inner_edge'])
451
452
        local_eids = F.gather_row(part_g.edata[dgl.EID], local_eids)
        edges1 = np.intersect1d(selected_edges, F.asnumpy(local_eids))
453
        edges2 = edge_split(edge_mask, gpb, rank=i, force_even=False)
454
455
456
457
458
        assert np.all(np.sort(edges1) == np.sort(F.asnumpy(edges2)))
        local_eids = F.asnumpy(local_eids)
        for e in edges1:
            assert e in local_eids

Da Zheng's avatar
Da Zheng committed
459
        set_roles(num_parts * 2)
460
461
        edges3 = edge_split(edge_mask, gpb, rank=i * 2, force_even=False)
        edges4 = edge_split(edge_mask, gpb, rank=i * 2 + 1, force_even=False)
462
463
464
        edges5 = F.cat([edges3, edges4], 0)
        assert np.all(np.sort(edges1) == np.sort(F.asnumpy(edges5)))

465
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
466
def test_split_even():
467
    #prepare_dist(1)
468
469
470
471
472
473
474
475
476
477
478
479
480
    g = create_random_graph(10000)
    num_parts = 4
    num_hops = 2
    partition_graph(g, 'dist_graph_test', num_parts, '/tmp/dist_graph', num_hops=num_hops, part_method='metis')

    node_mask = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
    edge_mask = np.random.randint(0, 100, size=g.number_of_edges()) > 30
    selected_nodes = np.nonzero(node_mask)[0]
    selected_edges = np.nonzero(edge_mask)[0]
    all_nodes1 = []
    all_nodes2 = []
    all_edges1 = []
    all_edges2 = []
Da Zheng's avatar
Da Zheng committed
481
482
483
484
485
486
487
488
489

    # The code now collects the roles of all client processes and use the information
    # to determine how to split the workloads. Here is to simulate the multi-client
    # use case.
    def set_roles(num_clients):
        dgl.distributed.role.CUR_ROLE = 'default'
        dgl.distributed.role.GLOBAL_RANK = {i:i for i in range(num_clients)}
        dgl.distributed.role.PER_ROLE_RANK['default'] = {i:i for i in range(num_clients)}

490
    for i in range(num_parts):
Da Zheng's avatar
Da Zheng committed
491
        set_roles(num_parts)
492
        part_g, node_feats, edge_feats, gpb, _, _, _ = load_partition('/tmp/dist_graph/dist_graph_test.json', i)
493
494
        local_nids = F.nonzero_1d(part_g.ndata['inner_node'])
        local_nids = F.gather_row(part_g.ndata[dgl.NID], local_nids)
495
        nodes = node_split(node_mask, gpb, rank=i, force_even=True)
496
497
498
499
        all_nodes1.append(nodes)
        subset = np.intersect1d(F.asnumpy(nodes), F.asnumpy(local_nids))
        print('part {} get {} nodes and {} are in the partition'.format(i, len(nodes), len(subset)))

Da Zheng's avatar
Da Zheng committed
500
        set_roles(num_parts * 2)
501
502
503
        nodes1 = node_split(node_mask, gpb, rank=i * 2, force_even=True)
        nodes2 = node_split(node_mask, gpb, rank=i * 2 + 1, force_even=True)
        nodes3, _ = F.sort_1d(F.cat([nodes1, nodes2], 0))
504
505
506
507
        all_nodes2.append(nodes3)
        subset = np.intersect1d(F.asnumpy(nodes), F.asnumpy(nodes3))
        print('intersection has', len(subset))

Da Zheng's avatar
Da Zheng committed
508
        set_roles(num_parts)
509
510
        local_eids = F.nonzero_1d(part_g.edata['inner_edge'])
        local_eids = F.gather_row(part_g.edata[dgl.EID], local_eids)
511
        edges = edge_split(edge_mask, gpb, rank=i, force_even=True)
512
513
514
515
        all_edges1.append(edges)
        subset = np.intersect1d(F.asnumpy(edges), F.asnumpy(local_eids))
        print('part {} get {} edges and {} are in the partition'.format(i, len(edges), len(subset)))

Da Zheng's avatar
Da Zheng committed
516
        set_roles(num_parts * 2)
517
518
519
        edges1 = edge_split(edge_mask, gpb, rank=i * 2, force_even=True)
        edges2 = edge_split(edge_mask, gpb, rank=i * 2 + 1, force_even=True)
        edges3, _ = F.sort_1d(F.cat([edges1, edges2], 0))
520
521
522
523
524
525
526
527
528
529
530
531
532
533
        all_edges2.append(edges3)
        subset = np.intersect1d(F.asnumpy(edges), F.asnumpy(edges3))
        print('intersection has', len(subset))
    all_nodes1 = F.cat(all_nodes1, 0)
    all_edges1 = F.cat(all_edges1, 0)
    all_nodes2 = F.cat(all_nodes2, 0)
    all_edges2 = F.cat(all_edges2, 0)
    all_nodes = np.nonzero(node_mask)[0]
    all_edges = np.nonzero(edge_mask)[0]
    assert np.all(all_nodes == F.asnumpy(all_nodes1))
    assert np.all(all_edges == F.asnumpy(all_edges1))
    assert np.all(all_nodes == F.asnumpy(all_nodes2))
    assert np.all(all_edges == F.asnumpy(all_edges2))

534
def prepare_dist():
535
    ip_config = open("kv_ip_config.txt", "w")
536
    ip_addr = get_local_usable_addr()
537
    ip_config.write('{}\n'.format(ip_addr))
538
539
    ip_config.close()

540
if __name__ == '__main__':
Da Zheng's avatar
Da Zheng committed
541
    os.makedirs('/tmp/dist_graph', exist_ok=True)
542
543
    test_split()
    test_split_even()
Da Zheng's avatar
Da Zheng committed
544
    test_server_client()
545
    test_standalone()