"torchvision/csrc/roi_align.cpp" did not exist on "4480603831a6ad0bb97e3ffb5234f9d128e548a3"
test_dist_graph_store.py 24.1 KB
Newer Older
1
2
3
4
5
6
import os
os.environ['OMP_NUM_THREADS'] = '1'
import dgl
import sys
import numpy as np
import time
7
import socket
8
9
10
11
from scipy import sparse as spsp
from numpy.testing import assert_array_equal
from multiprocessing import Process, Manager, Condition, Value
import multiprocessing as mp
12
from dgl.heterograph_index import create_unitgraph_from_coo
13
14
from dgl.data.utils import load_graphs, save_graphs
from dgl.distributed import DistGraphServer, DistGraph
15
from dgl.distributed import partition_graph, load_partition, load_partition_book, node_split, edge_split
16
from dgl.distributed import SparseAdagrad, DistEmbedding
17
from numpy.testing import assert_almost_equal
18
import backend as F
19
import math
20
21
22
import unittest
import pickle

23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
if os.name != 'nt':
    import fcntl
    import struct

def get_local_usable_addr():
    """Get local usable IP and port

    Returns
    -------
    str
        IP address, e.g., '192.168.8.12:50051'
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    try:
        # doesn't even have to be reachable
        sock.connect(('10.255.255.255', 1))
        ip_addr = sock.getsockname()[0]
    except ValueError:
        ip_addr = '127.0.0.1'
    finally:
        sock.close()
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(("", 0))
    sock.listen(1)
    port = sock.getsockname()[1]
    sock.close()

    return ip_addr + ' ' + str(port)

52
def create_random_graph(n):
53
    arr = (spsp.random(n, n, density=0.001, format='coo', random_state=100) != 0).astype(np.int64)
54
    return dgl.from_scipy(arr)
55

56
57
def run_server(graph_name, server_id, server_count, num_clients, shared_mem):
    g = DistGraphServer(server_id, "kv_ip_config.txt", num_clients, server_count,
58
59
                        '/tmp/dist_graph/{}.json'.format(graph_name),
                        disable_shared_mem=not shared_mem)
60
61
62
    print('start server', server_id)
    g.start()

63
64
65
def emb_init(shape, dtype):
    return F.zeros(shape, dtype, F.cpu())

66
def rand_init(shape, dtype):
67
    return F.tensor(np.random.normal(size=shape), F.float32)
68

69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def check_dist_graph_empty(g, num_clients, num_nodes, num_edges):
    # Test API
    assert g.number_of_nodes() == num_nodes
    assert g.number_of_edges() == num_edges

    # Test init node data
    new_shape = (g.number_of_nodes(), 2)
    g.ndata['test1'] = dgl.distributed.DistTensor(new_shape, F.int32)
    nids = F.arange(0, int(g.number_of_nodes() / 2))
    feats = g.ndata['test1'][nids]
    assert np.all(F.asnumpy(feats) == 0)

    # create a tensor and destroy a tensor and create it again.
    test3 = dgl.distributed.DistTensor(new_shape, F.float32, 'test3', init_func=rand_init)
    del test3
    test3 = dgl.distributed.DistTensor((g.number_of_nodes(), 3), F.float32, 'test3')
    del test3

    # Test write data
    new_feats = F.ones((len(nids), 2), F.int32, F.cpu())
    g.ndata['test1'][nids] = new_feats
    feats = g.ndata['test1'][nids]
    assert np.all(F.asnumpy(feats) == 1)

    # Test metadata operations.
    assert g.node_attr_schemes()['test1'].dtype == F.int32

    print('end')

def run_client_empty(graph_name, part_id, server_count, num_clients, num_nodes, num_edges):
    time.sleep(5)
    os.environ['DGL_NUM_SERVER'] = str(server_count)
    dgl.distributed.initialize("kv_ip_config.txt")
    gpb, graph_name, _, _ = load_partition_book('/tmp/dist_graph/{}.json'.format(graph_name),
                                                part_id, None)
    g = DistGraph(graph_name, gpb=gpb)
    check_dist_graph_empty(g, num_clients, num_nodes, num_edges)

def check_server_client_empty(shared_mem, num_servers, num_clients):
    prepare_dist()
    g = create_random_graph(10000)

    # Partition the graph
    num_parts = 1
    graph_name = 'dist_graph_test_1'
    partition_graph(g, graph_name, num_parts, '/tmp/dist_graph')

    # let's just test on one partition for now.
    # We cannot run multiple servers and clients on the same machine.
    serv_ps = []
    ctx = mp.get_context('spawn')
    for serv_id in range(num_servers):
        p = ctx.Process(target=run_server, args=(graph_name, serv_id, num_servers,
                                                 num_clients, shared_mem))
        serv_ps.append(p)
        p.start()

    cli_ps = []
    for cli_id in range(num_clients):
        print('start client', cli_id)
        p = ctx.Process(target=run_client_empty, args=(graph_name, 0, num_servers, num_clients,
                                                       g.number_of_nodes(), g.number_of_edges()))
        p.start()
        cli_ps.append(p)

    for p in cli_ps:
        p.join()

    for p in serv_ps:
        p.join()

    print('clients have terminated')

142
def run_client(graph_name, part_id, server_count, num_clients, num_nodes, num_edges):
143
    time.sleep(5)
144
145
    os.environ['DGL_NUM_SERVER'] = str(server_count)
    dgl.distributed.initialize("kv_ip_config.txt")
146
147
    gpb, graph_name, _, _ = load_partition_book('/tmp/dist_graph/{}.json'.format(graph_name),
                                                part_id, None)
148
    g = DistGraph(graph_name, gpb=gpb)
149
    check_dist_graph(g, num_clients, num_nodes, num_edges)
150

151
def check_dist_graph(g, num_clients, num_nodes, num_edges):
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
    # Test API
    assert g.number_of_nodes() == num_nodes
    assert g.number_of_edges() == num_edges

    # Test reading node data
    nids = F.arange(0, int(g.number_of_nodes() / 2))
    feats1 = g.ndata['features'][nids]
    feats = F.squeeze(feats1, 1)
    assert np.all(F.asnumpy(feats == nids))

    # Test reading edge data
    eids = F.arange(0, int(g.number_of_edges() / 2))
    feats1 = g.edata['features'][eids]
    feats = F.squeeze(feats1, 1)
    assert np.all(F.asnumpy(feats == eids))

    # Test init node data
    new_shape = (g.number_of_nodes(), 2)
170
    g.ndata['test1'] = dgl.distributed.DistTensor(new_shape, F.int32)
171
172
173
    feats = g.ndata['test1'][nids]
    assert np.all(F.asnumpy(feats) == 0)

174
    # reference to a one that exists
175
176
    test2 = dgl.distributed.DistTensor(new_shape, F.float32, 'test2', init_func=rand_init)
    test3 = dgl.distributed.DistTensor(new_shape, F.float32, 'test2')
177
178
179
    assert np.all(F.asnumpy(test2[nids]) == F.asnumpy(test3[nids]))

    # create a tensor and destroy a tensor and create it again.
180
    test3 = dgl.distributed.DistTensor(new_shape, F.float32, 'test3', init_func=rand_init)
181
    del test3
182
    test3 = dgl.distributed.DistTensor((g.number_of_nodes(), 3), F.float32, 'test3')
183
184
    del test3

Da Zheng's avatar
Da Zheng committed
185
186
187
188
189
190
191
192
    # add tests for anonymous distributed tensor.
    test3 = dgl.distributed.DistTensor(new_shape, F.float32, init_func=rand_init)
    data = test3[0:10]
    test4 = dgl.distributed.DistTensor(new_shape, F.float32, init_func=rand_init)
    del test3
    test5 = dgl.distributed.DistTensor(new_shape, F.float32, init_func=rand_init)
    assert np.sum(F.asnumpy(test5[0:10] != data)) > 0

193
    # test a persistent tesnor
194
    test4 = dgl.distributed.DistTensor(new_shape, F.float32, 'test4', init_func=rand_init,
195
196
197
                                       persistent=True)
    del test4
    try:
198
        test4 = dgl.distributed.DistTensor((g.number_of_nodes(), 3), F.float32, 'test4')
199
200
201
        raise Exception('')
    except:
        pass
202

203
204
    # Test sparse emb
    try:
205
        emb = DistEmbedding(g.number_of_nodes(), 1, 'emb1', emb_init)
206
207
208
209
210
211
212
213
214
        lr = 0.001
        optimizer = SparseAdagrad([emb], lr=lr)
        with F.record_grad():
            feats = emb(nids)
            assert np.all(F.asnumpy(feats) == np.zeros((len(nids), 1)))
            loss = F.sum(feats + 1, 0)
        loss.backward()
        optimizer.step()
        feats = emb(nids)
215
216
        if num_clients == 1:
            assert_almost_equal(F.asnumpy(feats), np.ones((len(nids), 1)) * -lr)
217
218
219
220
221
        rest = np.setdiff1d(np.arange(g.number_of_nodes()), F.asnumpy(nids))
        feats1 = emb(rest)
        assert np.all(F.asnumpy(feats1) == np.zeros((len(rest), 1)))

        policy = dgl.distributed.PartitionPolicy('node', g.get_partition_book())
222
        grad_sum = dgl.distributed.DistTensor((g.number_of_nodes(),), F.float32,
223
                                              'emb1_sum', policy)
224
225
        if num_clients == 1:
            assert np.all(F.asnumpy(grad_sum[nids]) == np.ones((len(nids), 1)) * num_clients)
226
227
        assert np.all(F.asnumpy(grad_sum[rest]) == np.zeros((len(rest), 1)))

228
        emb = DistEmbedding(g.number_of_nodes(), 1, 'emb2', emb_init)
229
230
231
232
        with F.no_grad():
            feats1 = emb(nids)
        assert np.all(F.asnumpy(feats1) == 0)

233
234
235
236
237
238
239
240
241
        optimizer = SparseAdagrad([emb], lr=lr)
        with F.record_grad():
            feats1 = emb(nids)
            feats2 = emb(nids)
            feats = F.cat([feats1, feats2], 0)
            assert np.all(F.asnumpy(feats) == np.zeros((len(nids) * 2, 1)))
            loss = F.sum(feats + 1, 0)
        loss.backward()
        optimizer.step()
242
243
        with F.no_grad():
            feats = emb(nids)
244
245
        if num_clients == 1:
            assert_almost_equal(F.asnumpy(feats), np.ones((len(nids), 1)) * math.sqrt(2) * -lr)
246
247
248
249
250
251
        rest = np.setdiff1d(np.arange(g.number_of_nodes()), F.asnumpy(nids))
        feats1 = emb(rest)
        assert np.all(F.asnumpy(feats1) == np.zeros((len(rest), 1)))
    except NotImplementedError as e:
        pass

252
253
254
255
256
257
258
259
260
261
262
263
264
265
    # Test write data
    new_feats = F.ones((len(nids), 2), F.int32, F.cpu())
    g.ndata['test1'][nids] = new_feats
    feats = g.ndata['test1'][nids]
    assert np.all(F.asnumpy(feats) == 1)

    # Test metadata operations.
    assert len(g.ndata['features']) == g.number_of_nodes()
    assert g.ndata['features'].shape == (g.number_of_nodes(), 1)
    assert g.ndata['features'].dtype == F.int64
    assert g.node_attr_schemes()['features'].dtype == F.int64
    assert g.node_attr_schemes()['test1'].dtype == F.int32
    assert g.node_attr_schemes()['features'].shape == (1,)

266
267
    selected_nodes = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
    # Test node split
268
    nodes = node_split(selected_nodes, g.get_partition_book())
269
270
271
272
273
274
    nodes = F.asnumpy(nodes)
    # We only have one partition, so the local nodes are basically all nodes in the graph.
    local_nids = np.arange(g.number_of_nodes())
    for n in nodes:
        assert n in local_nids

275
276
    print('end')

277
def check_server_client(shared_mem, num_servers, num_clients):
278
    prepare_dist()
279
280
281
282
    g = create_random_graph(10000)

    # Partition the graph
    num_parts = 1
283
    graph_name = 'dist_graph_test_2'
284
285
    g.ndata['features'] = F.unsqueeze(F.arange(0, g.number_of_nodes()), 1)
    g.edata['features'] = F.unsqueeze(F.arange(0, g.number_of_edges()), 1)
286
    partition_graph(g, graph_name, num_parts, '/tmp/dist_graph')
287
288
289
290

    # let's just test on one partition for now.
    # We cannot run multiple servers and clients on the same machine.
    serv_ps = []
291
    ctx = mp.get_context('spawn')
292
    for serv_id in range(num_servers):
293
        p = ctx.Process(target=run_server, args=(graph_name, serv_id, num_servers,
294
                                                 num_clients, shared_mem))
295
296
297
298
        serv_ps.append(p)
        p.start()

    cli_ps = []
299
    for cli_id in range(num_clients):
300
        print('start client', cli_id)
301
        p = ctx.Process(target=run_client, args=(graph_name, 0, num_servers, num_clients, g.number_of_nodes(),
302
                                                 g.number_of_edges()))
303
304
305
306
307
        p.start()
        cli_ps.append(p)

    for p in cli_ps:
        p.join()
308
309
310
311

    for p in serv_ps:
        p.join()

312
313
    print('clients have terminated')

314
315
316

def run_client_hetero(graph_name, part_id, server_count, num_clients, num_nodes, num_edges):
    time.sleep(5)
317
318
    os.environ['DGL_NUM_SERVER'] = str(server_count)
    dgl.distributed.initialize("kv_ip_config.txt")
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
    gpb, graph_name, _, _ = load_partition_book('/tmp/dist_graph/{}.json'.format(graph_name),
                                                part_id, None)
    g = DistGraph(graph_name, gpb=gpb)
    check_dist_graph_hetero(g, num_clients, num_nodes, num_edges)

def create_random_hetero():
    num_nodes = {'n1': 10000, 'n2': 10010, 'n3': 10020}
    etypes = [('n1', 'r1', 'n2'),
              ('n1', 'r2', 'n3'),
              ('n2', 'r3', 'n3')]
    edges = {}
    for etype in etypes:
        src_ntype, _, dst_ntype = etype
        arr = spsp.random(num_nodes[src_ntype], num_nodes[dst_ntype], density=0.001, format='coo',
                          random_state=100)
        edges[etype] = (arr.row, arr.col)
    g = dgl.heterograph(edges, num_nodes)
    g.nodes['n1'].data['feat'] = F.unsqueeze(F.arange(0, g.number_of_nodes('n1')), 1)
    g.edges['r1'].data['feat'] = F.unsqueeze(F.arange(0, g.number_of_edges('r1')), 1)
    return g

def check_dist_graph_hetero(g, num_clients, num_nodes, num_edges):
    # Test API
    for ntype in num_nodes:
        assert ntype in g.ntypes
        assert num_nodes[ntype] == g.number_of_nodes(ntype)
    for etype in num_edges:
        assert etype in g.etypes
        assert num_edges[etype] == g.number_of_edges(etype)
    assert g.number_of_nodes() == sum([num_nodes[ntype] for ntype in num_nodes])
    assert g.number_of_edges() == sum([num_edges[etype] for etype in num_edges])

    # Test reading node data
    nids = F.arange(0, int(g.number_of_nodes('n1') / 2))
    feats1 = g.nodes['n1'].data['feat'][nids]
    feats = F.squeeze(feats1, 1)
    assert np.all(F.asnumpy(feats == nids))

    # Test reading edge data
    eids = F.arange(0, int(g.number_of_edges('r1') / 2))
    feats1 = g.edges['r1'].data['feat'][eids]
    feats = F.squeeze(feats1, 1)
    assert np.all(F.asnumpy(feats == eids))

    # Test init node data
    new_shape = (g.number_of_nodes('n1'), 2)
    g.nodes['n1'].data['test1'] = dgl.distributed.DistTensor(new_shape, F.int32)
    feats = g.nodes['n1'].data['test1'][nids]
    assert np.all(F.asnumpy(feats) == 0)

    # create a tensor and destroy a tensor and create it again.
    test3 = dgl.distributed.DistTensor(new_shape, F.float32, 'test3', init_func=rand_init)
    del test3
    test3 = dgl.distributed.DistTensor((g.number_of_nodes('n1'), 3), F.float32, 'test3')
    del test3

    # add tests for anonymous distributed tensor.
    test3 = dgl.distributed.DistTensor(new_shape, F.float32, init_func=rand_init)
    data = test3[0:10]
    test4 = dgl.distributed.DistTensor(new_shape, F.float32, init_func=rand_init)
    del test3
    test5 = dgl.distributed.DistTensor(new_shape, F.float32, init_func=rand_init)
    assert np.sum(F.asnumpy(test5[0:10] != data)) > 0

    # test a persistent tesnor
    test4 = dgl.distributed.DistTensor(new_shape, F.float32, 'test4', init_func=rand_init,
                                       persistent=True)
    del test4
    try:
        test4 = dgl.distributed.DistTensor((g.number_of_nodes('n1'), 3), F.float32, 'test4')
        raise Exception('')
    except:
        pass

    # Test write data
    new_feats = F.ones((len(nids), 2), F.int32, F.cpu())
    g.nodes['n1'].data['test1'][nids] = new_feats
    feats = g.nodes['n1'].data['test1'][nids]
    assert np.all(F.asnumpy(feats) == 1)

    # Test metadata operations.
    assert len(g.nodes['n1'].data['feat']) == g.number_of_nodes('n1')
    assert g.nodes['n1'].data['feat'].shape == (g.number_of_nodes('n1'), 1)
    assert g.nodes['n1'].data['feat'].dtype == F.int64

    selected_nodes = np.random.randint(0, 100, size=g.number_of_nodes('n1')) > 30
    # Test node split
    nodes = node_split(selected_nodes, g.get_partition_book(), ntype='n1')
    nodes = F.asnumpy(nodes)
    # We only have one partition, so the local nodes are basically all nodes in the graph.
    local_nids = np.arange(g.number_of_nodes('n1'))
    for n in nodes:
        assert n in local_nids

    print('end')

def check_server_client_hetero(shared_mem, num_servers, num_clients):
    prepare_dist()
    g = create_random_hetero()

    # Partition the graph
    num_parts = 1
    graph_name = 'dist_graph_test_3'
    partition_graph(g, graph_name, num_parts, '/tmp/dist_graph')

    # let's just test on one partition for now.
    # We cannot run multiple servers and clients on the same machine.
    serv_ps = []
    ctx = mp.get_context('spawn')
    for serv_id in range(num_servers):
        p = ctx.Process(target=run_server, args=(graph_name, serv_id, num_servers,
                                                 num_clients, shared_mem))
        serv_ps.append(p)
        p.start()

    cli_ps = []
    num_nodes = {ntype: g.number_of_nodes(ntype) for ntype in g.ntypes}
    num_edges = {etype: g.number_of_edges(etype) for etype in g.etypes}
    for cli_id in range(num_clients):
        print('start client', cli_id)
        p = ctx.Process(target=run_client_hetero, args=(graph_name, 0, num_servers, num_clients, num_nodes,
                                                        num_edges))
        p.start()
        cli_ps.append(p)

    for p in cli_ps:
        p.join()

    for p in serv_ps:
        p.join()

    print('clients have terminated')

452
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
453
454
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support some of operations in DistGraph")
def test_server_client():
455
    os.environ['DGL_DIST_MODE'] = 'distributed'
456
    check_server_client_empty(True, 1, 1)
457
458
    check_server_client_hetero(True, 1, 1)
    check_server_client_hetero(False, 1, 1)
459
460
461
462
    check_server_client(True, 1, 1)
    check_server_client(False, 1, 1)
    check_server_client(True, 2, 2)
    check_server_client(False, 2, 2)
463

464
465
466
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support some of operations in DistGraph")
def test_standalone():
    os.environ['DGL_DIST_MODE'] = 'standalone'
Da Zheng's avatar
Da Zheng committed
467

468
469
470
471
472
473
474
    g = create_random_graph(10000)
    # Partition the graph
    num_parts = 1
    graph_name = 'dist_graph_test_3'
    g.ndata['features'] = F.unsqueeze(F.arange(0, g.number_of_nodes()), 1)
    g.edata['features'] = F.unsqueeze(F.arange(0, g.number_of_edges()), 1)
    partition_graph(g, graph_name, num_parts, '/tmp/dist_graph')
475
476

    dgl.distributed.initialize("kv_ip_config.txt")
477
    dist_g = DistGraph(graph_name, part_config='/tmp/dist_graph/{}.json'.format(graph_name))
478
479
480
481
    try:
        check_dist_graph(dist_g, 1, g.number_of_nodes(), g.number_of_edges())
    except Exception as e:
        print(e)
482
    dgl.distributed.exit_client() # this is needed since there's two test here in one process
483

484
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
485
def test_split():
486
    #prepare_dist()
487
488
489
    g = create_random_graph(10000)
    num_parts = 4
    num_hops = 2
490
    partition_graph(g, 'dist_graph_test', num_parts, '/tmp/dist_graph', num_hops=num_hops, part_method='metis')
491
492
493
494
495

    node_mask = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
    edge_mask = np.random.randint(0, 100, size=g.number_of_edges()) > 30
    selected_nodes = np.nonzero(node_mask)[0]
    selected_edges = np.nonzero(edge_mask)[0]
Da Zheng's avatar
Da Zheng committed
496
497
498
499
500
501
502
503
504

    # The code now collects the roles of all client processes and use the information
    # to determine how to split the workloads. Here is to simulate the multi-client
    # use case.
    def set_roles(num_clients):
        dgl.distributed.role.CUR_ROLE = 'default'
        dgl.distributed.role.GLOBAL_RANK = {i:i for i in range(num_clients)}
        dgl.distributed.role.PER_ROLE_RANK['default'] = {i:i for i in range(num_clients)}

505
    for i in range(num_parts):
Da Zheng's avatar
Da Zheng committed
506
        set_roles(num_parts)
507
        part_g, node_feats, edge_feats, gpb, _, _, _ = load_partition('/tmp/dist_graph/dist_graph_test.json', i)
Da Zheng's avatar
Da Zheng committed
508
        local_nids = F.nonzero_1d(part_g.ndata['inner_node'])
509
510
        local_nids = F.gather_row(part_g.ndata[dgl.NID], local_nids)
        nodes1 = np.intersect1d(selected_nodes, F.asnumpy(local_nids))
511
        nodes2 = node_split(node_mask, gpb, rank=i, force_even=False)
512
513
514
515
516
        assert np.all(np.sort(nodes1) == np.sort(F.asnumpy(nodes2)))
        local_nids = F.asnumpy(local_nids)
        for n in nodes1:
            assert n in local_nids

Da Zheng's avatar
Da Zheng committed
517
        set_roles(num_parts * 2)
518
519
        nodes3 = node_split(node_mask, gpb, rank=i * 2, force_even=False)
        nodes4 = node_split(node_mask, gpb, rank=i * 2 + 1, force_even=False)
520
521
522
        nodes5 = F.cat([nodes3, nodes4], 0)
        assert np.all(np.sort(nodes1) == np.sort(F.asnumpy(nodes5)))

Da Zheng's avatar
Da Zheng committed
523
        set_roles(num_parts)
Da Zheng's avatar
Da Zheng committed
524
        local_eids = F.nonzero_1d(part_g.edata['inner_edge'])
525
526
        local_eids = F.gather_row(part_g.edata[dgl.EID], local_eids)
        edges1 = np.intersect1d(selected_edges, F.asnumpy(local_eids))
527
        edges2 = edge_split(edge_mask, gpb, rank=i, force_even=False)
528
529
530
531
532
        assert np.all(np.sort(edges1) == np.sort(F.asnumpy(edges2)))
        local_eids = F.asnumpy(local_eids)
        for e in edges1:
            assert e in local_eids

Da Zheng's avatar
Da Zheng committed
533
        set_roles(num_parts * 2)
534
535
        edges3 = edge_split(edge_mask, gpb, rank=i * 2, force_even=False)
        edges4 = edge_split(edge_mask, gpb, rank=i * 2 + 1, force_even=False)
536
537
538
        edges5 = F.cat([edges3, edges4], 0)
        assert np.all(np.sort(edges1) == np.sort(F.asnumpy(edges5)))

539
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
540
def test_split_even():
541
    #prepare_dist(1)
542
543
544
545
546
547
548
549
550
551
552
553
554
    g = create_random_graph(10000)
    num_parts = 4
    num_hops = 2
    partition_graph(g, 'dist_graph_test', num_parts, '/tmp/dist_graph', num_hops=num_hops, part_method='metis')

    node_mask = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
    edge_mask = np.random.randint(0, 100, size=g.number_of_edges()) > 30
    selected_nodes = np.nonzero(node_mask)[0]
    selected_edges = np.nonzero(edge_mask)[0]
    all_nodes1 = []
    all_nodes2 = []
    all_edges1 = []
    all_edges2 = []
Da Zheng's avatar
Da Zheng committed
555
556
557
558
559
560
561
562
563

    # The code now collects the roles of all client processes and use the information
    # to determine how to split the workloads. Here is to simulate the multi-client
    # use case.
    def set_roles(num_clients):
        dgl.distributed.role.CUR_ROLE = 'default'
        dgl.distributed.role.GLOBAL_RANK = {i:i for i in range(num_clients)}
        dgl.distributed.role.PER_ROLE_RANK['default'] = {i:i for i in range(num_clients)}

564
    for i in range(num_parts):
Da Zheng's avatar
Da Zheng committed
565
        set_roles(num_parts)
566
        part_g, node_feats, edge_feats, gpb, _, _, _ = load_partition('/tmp/dist_graph/dist_graph_test.json', i)
567
568
        local_nids = F.nonzero_1d(part_g.ndata['inner_node'])
        local_nids = F.gather_row(part_g.ndata[dgl.NID], local_nids)
569
        nodes = node_split(node_mask, gpb, rank=i, force_even=True)
570
571
572
573
        all_nodes1.append(nodes)
        subset = np.intersect1d(F.asnumpy(nodes), F.asnumpy(local_nids))
        print('part {} get {} nodes and {} are in the partition'.format(i, len(nodes), len(subset)))

Da Zheng's avatar
Da Zheng committed
574
        set_roles(num_parts * 2)
575
576
577
        nodes1 = node_split(node_mask, gpb, rank=i * 2, force_even=True)
        nodes2 = node_split(node_mask, gpb, rank=i * 2 + 1, force_even=True)
        nodes3, _ = F.sort_1d(F.cat([nodes1, nodes2], 0))
578
579
580
581
        all_nodes2.append(nodes3)
        subset = np.intersect1d(F.asnumpy(nodes), F.asnumpy(nodes3))
        print('intersection has', len(subset))

Da Zheng's avatar
Da Zheng committed
582
        set_roles(num_parts)
583
584
        local_eids = F.nonzero_1d(part_g.edata['inner_edge'])
        local_eids = F.gather_row(part_g.edata[dgl.EID], local_eids)
585
        edges = edge_split(edge_mask, gpb, rank=i, force_even=True)
586
587
588
589
        all_edges1.append(edges)
        subset = np.intersect1d(F.asnumpy(edges), F.asnumpy(local_eids))
        print('part {} get {} edges and {} are in the partition'.format(i, len(edges), len(subset)))

Da Zheng's avatar
Da Zheng committed
590
        set_roles(num_parts * 2)
591
592
593
        edges1 = edge_split(edge_mask, gpb, rank=i * 2, force_even=True)
        edges2 = edge_split(edge_mask, gpb, rank=i * 2 + 1, force_even=True)
        edges3, _ = F.sort_1d(F.cat([edges1, edges2], 0))
594
595
596
597
598
599
600
601
602
603
604
605
606
607
        all_edges2.append(edges3)
        subset = np.intersect1d(F.asnumpy(edges), F.asnumpy(edges3))
        print('intersection has', len(subset))
    all_nodes1 = F.cat(all_nodes1, 0)
    all_edges1 = F.cat(all_edges1, 0)
    all_nodes2 = F.cat(all_nodes2, 0)
    all_edges2 = F.cat(all_edges2, 0)
    all_nodes = np.nonzero(node_mask)[0]
    all_edges = np.nonzero(edge_mask)[0]
    assert np.all(all_nodes == F.asnumpy(all_nodes1))
    assert np.all(all_edges == F.asnumpy(all_edges1))
    assert np.all(all_nodes == F.asnumpy(all_nodes2))
    assert np.all(all_edges == F.asnumpy(all_edges2))

608
def prepare_dist():
609
    ip_config = open("kv_ip_config.txt", "w")
610
    ip_addr = get_local_usable_addr()
611
    ip_config.write('{}\n'.format(ip_addr))
612
613
    ip_config.close()

614
if __name__ == '__main__':
Da Zheng's avatar
Da Zheng committed
615
    os.makedirs('/tmp/dist_graph', exist_ok=True)
616
617
    test_split()
    test_split_even()
Da Zheng's avatar
Da Zheng committed
618
    test_server_client()
619
    test_standalone()