test_distributed_sampling.py 40 KB
Newer Older
Jinjing Zhou's avatar
Jinjing Zhou committed
1
2
3
import dgl
import unittest
import os
4
import traceback
Jinjing Zhou's avatar
Jinjing Zhou committed
5
from dgl.data import CitationGraphDataset
6
7
from dgl.data import WN18Dataset
from dgl.distributed import sample_neighbors, sample_etype_neighbors
Jinjing Zhou's avatar
Jinjing Zhou committed
8
9
10
11
12
13
from dgl.distributed import partition_graph, load_partition, load_partition_book
import sys
import multiprocessing as mp
import numpy as np
import backend as F
import time
14
from utils import generate_ip_config, reset_envs
Jinjing Zhou's avatar
Jinjing Zhou committed
15
from pathlib import Path
16
import pytest
17
from scipy import sparse as spsp
18
import random
Jinjing Zhou's avatar
Jinjing Zhou committed
19
20
21
from dgl.distributed import DistGraphServer, DistGraph


22
23
def start_server(rank, tmpdir, disable_shared_mem, graph_name, graph_format=['csc', 'coo'],
                 keep_alive=False):
24
    g = DistGraphServer(rank, "rpc_ip_config.txt", 1, 1,
25
                        tmpdir / (graph_name + '.json'), disable_shared_mem=disable_shared_mem,
26
                        graph_format=graph_format, keep_alive=keep_alive)
Jinjing Zhou's avatar
Jinjing Zhou committed
27
28
29
    g.start()


30
def start_sample_client(rank, tmpdir, disable_shared_mem):
31
32
    gpb = None
    if disable_shared_mem:
33
        _, _, _, gpb, _, _, _ = load_partition(tmpdir / 'test_sampling.json', rank)
34
    dgl.distributed.initialize("rpc_ip_config.txt")
35
    dist_graph = DistGraph("test_sampling", gpb=gpb)
36
37
38
    try:
        sampled_graph = sample_neighbors(dist_graph, [0, 10, 99, 66, 1024, 2008], 3)
    except Exception as e:
39
        print(traceback.format_exc())
40
        sampled_graph = None
41
    dgl.distributed.exit_client()
Jinjing Zhou's avatar
Jinjing Zhou committed
42
43
    return sampled_graph

44

45
46
def start_sample_client_shuffle(rank, tmpdir, disable_shared_mem, g, num_servers, group_id,
        orig_nid, orig_eid):
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
    os.environ['DGL_GROUP_ID'] = str(group_id)
    gpb = None
    if disable_shared_mem:
        _, _, _, gpb, _, _, _ = load_partition(tmpdir / 'test_sampling.json', rank)
    dgl.distributed.initialize("rpc_ip_config.txt")
    dist_graph = DistGraph("test_sampling", gpb=gpb)
    sampled_graph = sample_neighbors(dist_graph, [0, 10, 99, 66, 1024, 2008], 3)

    src, dst = sampled_graph.edges()
    src = orig_nid[src]
    dst = orig_nid[dst]
    assert sampled_graph.number_of_nodes() == g.number_of_nodes()
    assert np.all(F.asnumpy(g.has_edges_between(src, dst)))
    eids = g.edge_ids(src, dst)
    eids1 = orig_eid[sampled_graph.edata[dgl.EID]]
    assert np.array_equal(F.asnumpy(eids1), F.asnumpy(eids))

64
def start_find_edges_client(rank, tmpdir, disable_shared_mem, eids, etype=None):
65
66
    gpb = None
    if disable_shared_mem:
67
        _, _, _, gpb, _, _, _ = load_partition(tmpdir / 'test_find_edges.json', rank)
68
    dgl.distributed.initialize("rpc_ip_config.txt")
69
    dist_graph = DistGraph("test_find_edges", gpb=gpb)
70
    try:
71
        u, v = dist_graph.find_edges(eids, etype=etype)
72
    except Exception as e:
73
        print(traceback.format_exc())
74
        u, v = None, None
75
76
    dgl.distributed.exit_client()
    return u, v
Jinjing Zhou's avatar
Jinjing Zhou committed
77

78
79
80
81
def start_get_degrees_client(rank, tmpdir, disable_shared_mem, nids=None):
    gpb = None
    if disable_shared_mem:
        _, _, _, gpb, _, _, _ = load_partition(tmpdir / 'test_get_degrees.json', rank)
82
    dgl.distributed.initialize("rpc_ip_config.txt")
83
84
85
86
87
88
89
    dist_graph = DistGraph("test_get_degrees", gpb=gpb)
    try:
        in_deg = dist_graph.in_degrees(nids)
        all_in_deg = dist_graph.in_degrees()
        out_deg = dist_graph.out_degrees(nids)
        all_out_deg = dist_graph.out_degrees()
    except Exception as e:
90
        print(traceback.format_exc())
91
92
93
94
        in_deg, out_deg, all_in_deg, all_out_deg = None, None, None, None
    dgl.distributed.exit_client()
    return in_deg, out_deg, all_in_deg, all_out_deg

95
def check_rpc_sampling(tmpdir, num_server):
96
    generate_ip_config("rpc_ip_config.txt", num_server, num_server)
Jinjing Zhou's avatar
Jinjing Zhou committed
97
98
99
100
101
102
103
104
105
106
107
108

    g = CitationGraphDataset("cora")[0]
    print(g.idtype)
    num_parts = num_server
    num_hops = 1

    partition_graph(g, 'test_sampling', num_parts, tmpdir,
                    num_hops=num_hops, part_method='metis', reshuffle=False)

    pserver_list = []
    ctx = mp.get_context('spawn')
    for i in range(num_server):
109
        p = ctx.Process(target=start_server, args=(i, tmpdir, num_server > 1, 'test_sampling'))
Jinjing Zhou's avatar
Jinjing Zhou committed
110
111
112
113
        p.start()
        time.sleep(1)
        pserver_list.append(p)

114
    sampled_graph = start_sample_client(0, tmpdir, num_server > 1)
Jinjing Zhou's avatar
Jinjing Zhou committed
115
116
117
    print("Done sampling")
    for p in pserver_list:
        p.join()
118
        assert p.exitcode == 0
Jinjing Zhou's avatar
Jinjing Zhou committed
119
120
121
122
123
124
125
126

    src, dst = sampled_graph.edges()
    assert sampled_graph.number_of_nodes() == g.number_of_nodes()
    assert np.all(F.asnumpy(g.has_edges_between(src, dst)))
    eids = g.edge_ids(src, dst)
    assert np.array_equal(
        F.asnumpy(sampled_graph.edata[dgl.EID]), F.asnumpy(eids))

127
def check_rpc_find_edges_shuffle(tmpdir, num_server):
128
    generate_ip_config("rpc_ip_config.txt", num_server, num_server)
129
130
131
132

    g = CitationGraphDataset("cora")[0]
    num_parts = num_server

133
134
135
    orig_nid, orig_eid = partition_graph(g, 'test_find_edges', num_parts, tmpdir,
                                         num_hops=1, part_method='metis',
                                         reshuffle=True, return_mapping=True)
136
137
138
139

    pserver_list = []
    ctx = mp.get_context('spawn')
    for i in range(num_server):
140
141
        p = ctx.Process(target=start_server, args=(i, tmpdir, num_server > 1,
                                                   'test_find_edges', ['csr', 'coo']))
142
143
144
145
146
        p.start()
        time.sleep(1)
        pserver_list.append(p)

    eids = F.tensor(np.random.randint(g.number_of_edges(), size=100))
147
    u, v = g.find_edges(orig_eid[eids])
148
    du, dv = start_find_edges_client(0, tmpdir, num_server > 1, eids)
149
150
    du = orig_nid[du]
    dv = orig_nid[dv]
151
152
153
    assert F.array_equal(u, du)
    assert F.array_equal(v, dv)

154
155
156
def create_random_hetero(dense=False, empty=False):
    num_nodes = {'n1': 210, 'n2': 200, 'n3': 220} if dense else \
        {'n1': 1010, 'n2': 1000, 'n3': 1020}
157
158
159
    etypes = [('n1', 'r12', 'n2'),
              ('n1', 'r13', 'n3'),
              ('n2', 'r23', 'n3')]
160
    edges = {}
161
    random.seed(42)
162
163
    for etype in etypes:
        src_ntype, _, dst_ntype = etype
164
165
166
167
        arr = spsp.random(num_nodes[src_ntype] - 10 if empty else num_nodes[src_ntype],
                          num_nodes[dst_ntype] - 10 if empty else num_nodes[dst_ntype],
                          density=0.1 if dense else 0.001,
                          format='coo', random_state=100)
168
        edges[etype] = (arr.row, arr.col)
169
170
171
    g = dgl.heterograph(edges, num_nodes)
    g.nodes['n1'].data['feat'] = F.ones((g.number_of_nodes('n1'), 10), F.float32, F.cpu())
    return g
172
173

def check_rpc_hetero_find_edges_shuffle(tmpdir, num_server):
174
    generate_ip_config("rpc_ip_config.txt", num_server, num_server)
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191

    g = create_random_hetero()
    num_parts = num_server

    orig_nid, orig_eid = partition_graph(g, 'test_find_edges', num_parts, tmpdir,
                                         num_hops=1, part_method='metis',
                                         reshuffle=True, return_mapping=True)

    pserver_list = []
    ctx = mp.get_context('spawn')
    for i in range(num_server):
        p = ctx.Process(target=start_server, args=(i, tmpdir, num_server > 1,
                                                   'test_find_edges', ['csr', 'coo']))
        p.start()
        time.sleep(1)
        pserver_list.append(p)

192
193
    test_etype = g.to_canonical_etype('r12')
    eids = F.tensor(np.random.randint(g.num_edges(test_etype), size=100))
194
195
    expect_except = False
    try:
196
        _, _ = g.find_edges(orig_eid[test_etype][eids], etype=('n1', 'r12'))
197
198
199
    except:
        expect_except = True
    assert expect_except
200
201
    u, v = g.find_edges(orig_eid[test_etype][eids], etype='r12')
    u1, v1 = g.find_edges(orig_eid[test_etype][eids], etype=('n1', 'r12', 'n2'))
202
203
204
    assert F.array_equal(u, u1)
    assert F.array_equal(v, v1)
    du, dv = start_find_edges_client(0, tmpdir, num_server > 1, eids, etype='r12')
205
206
207
208
209
    du = orig_nid['n1'][du]
    dv = orig_nid['n2'][dv]
    assert F.array_equal(u, du)
    assert F.array_equal(v, dv)

210
211
212
# Wait non shared memory graph store
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(dgl.backend.backend_name == 'tensorflow', reason='Not support tensorflow for now')
213
@unittest.skipIf(dgl.backend.backend_name == "mxnet", reason="Turn off Mxnet support")
214
215
@pytest.mark.parametrize("num_server", [1, 2])
def test_rpc_find_edges_shuffle(num_server):
216
    reset_envs()
217
218
219
    import tempfile
    os.environ['DGL_DIST_MODE'] = 'distributed'
    with tempfile.TemporaryDirectory() as tmpdirname:
220
        check_rpc_hetero_find_edges_shuffle(Path(tmpdirname), num_server)
221
222
223
        check_rpc_find_edges_shuffle(Path(tmpdirname), num_server)

def check_rpc_get_degree_shuffle(tmpdir, num_server):
224
    generate_ip_config("rpc_ip_config.txt", num_server, num_server)
225
226
227
228

    g = CitationGraphDataset("cora")[0]
    num_parts = num_server

229
230
    orig_nid, _ = partition_graph(g, 'test_get_degrees', num_parts, tmpdir,
        num_hops=1, part_method='metis', reshuffle=True, return_mapping=True)
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245

    pserver_list = []
    ctx = mp.get_context('spawn')
    for i in range(num_server):
        p = ctx.Process(target=start_server, args=(i, tmpdir, num_server > 1, 'test_get_degrees'))
        p.start()
        time.sleep(1)
        pserver_list.append(p)

    nids = F.tensor(np.random.randint(g.number_of_nodes(), size=100))
    in_degs, out_degs, all_in_degs, all_out_degs = start_get_degrees_client(0, tmpdir, num_server > 1, nids)

    print("Done get_degree")
    for p in pserver_list:
        p.join()
246
        assert p.exitcode == 0
247
248
249
250
251
252
253
254
255
256

    print('check results')
    assert F.array_equal(g.in_degrees(orig_nid[nids]), in_degs)
    assert F.array_equal(g.in_degrees(orig_nid), all_in_degs)
    assert F.array_equal(g.out_degrees(orig_nid[nids]), out_degs)
    assert F.array_equal(g.out_degrees(orig_nid), all_out_degs)

# Wait non shared memory graph store
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(dgl.backend.backend_name == 'tensorflow', reason='Not support tensorflow for now')
257
@unittest.skipIf(dgl.backend.backend_name == "mxnet", reason="Turn off Mxnet support")
258
259
@pytest.mark.parametrize("num_server", [1, 2])
def test_rpc_get_degree_shuffle(num_server):
260
    reset_envs()
261
262
263
264
265
    import tempfile
    os.environ['DGL_DIST_MODE'] = 'distributed'
    with tempfile.TemporaryDirectory() as tmpdirname:
        check_rpc_get_degree_shuffle(Path(tmpdirname), num_server)

266
267
268
#@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
#@unittest.skipIf(dgl.backend.backend_name == 'tensorflow', reason='Not support tensorflow for now')
@unittest.skip('Only support partition with shuffle')
Jinjing Zhou's avatar
Jinjing Zhou committed
269
def test_rpc_sampling():
270
    reset_envs()
Jinjing Zhou's avatar
Jinjing Zhou committed
271
    import tempfile
272
    os.environ['DGL_DIST_MODE'] = 'distributed'
Jinjing Zhou's avatar
Jinjing Zhou committed
273
    with tempfile.TemporaryDirectory() as tmpdirname:
274
        check_rpc_sampling(Path(tmpdirname), 2)
Jinjing Zhou's avatar
Jinjing Zhou committed
275

276
def check_rpc_sampling_shuffle(tmpdir, num_server, num_groups=1):
277
    generate_ip_config("rpc_ip_config.txt", num_server, num_server)
278

Jinjing Zhou's avatar
Jinjing Zhou committed
279
280
281
282
    g = CitationGraphDataset("cora")[0]
    num_parts = num_server
    num_hops = 1

283
284
    orig_nids, orig_eids = partition_graph(g, 'test_sampling', num_parts, tmpdir,
        num_hops=num_hops, part_method='metis', reshuffle=True, return_mapping=True)
Jinjing Zhou's avatar
Jinjing Zhou committed
285
286
287

    pserver_list = []
    ctx = mp.get_context('spawn')
288
    keep_alive = num_groups > 1
Jinjing Zhou's avatar
Jinjing Zhou committed
289
    for i in range(num_server):
290
291
        p = ctx.Process(target=start_server, args=(
            i, tmpdir, num_server > 1, 'test_sampling', ['csc', 'coo'], keep_alive))
Jinjing Zhou's avatar
Jinjing Zhou committed
292
293
294
295
        p.start()
        time.sleep(1)
        pserver_list.append(p)

296
297
298
299
    pclient_list = []
    num_clients = 1
    for client_id in range(num_clients):
        for group_id in range(num_groups):
300
301
302
            p = ctx.Process(target=start_sample_client_shuffle,
                args=(client_id, tmpdir, num_server > 1, g, num_server,
                    group_id, orig_nids, orig_eids))
303
            p.start()
304
            time.sleep(1) # avoid race condition when instantiating DistGraph
305
306
307
            pclient_list.append(p)
    for p in pclient_list:
        p.join()
308
        assert p.exitcode == 0
309
310
311
312
313
    if keep_alive:
        for p in pserver_list:
            assert p.is_alive()
        # force shutdown server
        dgl.distributed.shutdown_servers("rpc_ip_config.txt", 1)
Jinjing Zhou's avatar
Jinjing Zhou committed
314
315
    for p in pserver_list:
        p.join()
316
        assert p.exitcode == 0
Jinjing Zhou's avatar
Jinjing Zhou committed
317

318
def start_hetero_sample_client(rank, tmpdir, disable_shared_mem, nodes):
319
320
321
    gpb = None
    if disable_shared_mem:
        _, _, _, gpb, _, _, _ = load_partition(tmpdir / 'test_sampling.json', rank)
322
    dgl.distributed.initialize("rpc_ip_config.txt")
323
324
325
326
327
328
329
330
331
332
333
    dist_graph = DistGraph("test_sampling", gpb=gpb)
    assert 'feat' in dist_graph.nodes['n1'].data
    assert 'feat' not in dist_graph.nodes['n2'].data
    assert 'feat' not in dist_graph.nodes['n3'].data
    if gpb is None:
        gpb = dist_graph.get_partition_book()
    try:
        sampled_graph = sample_neighbors(dist_graph, nodes, 3)
        block = dgl.to_block(sampled_graph, nodes)
        block.edata[dgl.EID] = sampled_graph.edata[dgl.EID]
    except Exception as e:
334
        print(traceback.format_exc())
335
336
337
338
        block = None
    dgl.distributed.exit_client()
    return block, gpb

339
def start_hetero_etype_sample_client(rank, tmpdir, disable_shared_mem, fanout=3,
340
341
                                     nodes={'n3': [0, 10, 99, 66, 124, 208]},
                                     etype_sorted=False):
342
343
344
345
346
347
348
349
    gpb = None
    if disable_shared_mem:
        _, _, _, gpb, _, _, _ = load_partition(tmpdir / 'test_sampling.json', rank)
    dgl.distributed.initialize("rpc_ip_config.txt")
    dist_graph = DistGraph("test_sampling", gpb=gpb)
    assert 'feat' in dist_graph.nodes['n1'].data
    assert 'feat' not in dist_graph.nodes['n2'].data
    assert 'feat' not in dist_graph.nodes['n3'].data
350
351
352
353
354
355
356
357
358
359
360

    if dist_graph.local_partition is not None:
        # Check whether etypes are sorted in dist_graph
        local_g = dist_graph.local_partition
        local_nids = np.arange(local_g.num_nodes())
        for lnid in local_nids:
            leids = local_g.in_edges(lnid, form='eid')
            letids = F.asnumpy(local_g.edata[dgl.ETYPE][leids])
            _, idices = np.unique(letids, return_index=True)
            assert np.all(idices[:-1] <= idices[1:])

361
362
363
    if gpb is None:
        gpb = dist_graph.get_partition_book()
    try:
364
365
        sampled_graph = sample_etype_neighbors(
                dist_graph, nodes, fanout, etype_sorted=etype_sorted)
366
367
368
        block = dgl.to_block(sampled_graph, nodes)
        block.edata[dgl.EID] = sampled_graph.edata[dgl.EID]
    except Exception as e:
369
        print(traceback.format_exc())
370
371
372
373
        block = None
    dgl.distributed.exit_client()
    return block, gpb

374
def check_rpc_hetero_sampling_shuffle(tmpdir, num_server):
375
    generate_ip_config("rpc_ip_config.txt", num_server, num_server)
376
377
378
379
380

    g = create_random_hetero()
    num_parts = num_server
    num_hops = 1

381
382
    orig_nid_map, orig_eid_map = partition_graph(g, 'test_sampling', num_parts, tmpdir,
        num_hops=num_hops, part_method='metis', reshuffle=True, return_mapping=True)
383
384
385
386
387
388
389
390
391

    pserver_list = []
    ctx = mp.get_context('spawn')
    for i in range(num_server):
        p = ctx.Process(target=start_server, args=(i, tmpdir, num_server > 1, 'test_sampling'))
        p.start()
        time.sleep(1)
        pserver_list.append(p)

392
393
    block, gpb = start_hetero_sample_client(0, tmpdir, num_server > 1,
                                            nodes = {'n3': [0, 10, 99, 66, 124, 208]})
394
395
396
    print("Done sampling")
    for p in pserver_list:
        p.join()
397
        assert p.exitcode == 0
398

399
400
    for c_etype in block.canonical_etypes:
        src_type, etype, dst_type = c_etype
401
402
403
404
405
406
407
408
        src, dst = block.edges(etype=etype)
        # These are global Ids after shuffling.
        shuffled_src = F.gather_row(block.srcnodes[src_type].data[dgl.NID], src)
        shuffled_dst = F.gather_row(block.dstnodes[dst_type].data[dgl.NID], dst)
        shuffled_eid = block.edges[etype].data[dgl.EID]

        orig_src = F.asnumpy(F.gather_row(orig_nid_map[src_type], shuffled_src))
        orig_dst = F.asnumpy(F.gather_row(orig_nid_map[dst_type], shuffled_dst))
409
        orig_eid = F.asnumpy(F.gather_row(orig_eid_map[c_etype], shuffled_eid))
410
411

        # Check the node Ids and edge Ids.
412
413
414
        orig_src1, orig_dst1 = g.find_edges(orig_eid, etype=etype)
        assert np.all(F.asnumpy(orig_src1) == orig_src)
        assert np.all(F.asnumpy(orig_dst1) == orig_dst)
415

416
417
418
419
420
421
422
423
424
425
def get_degrees(g, nids, ntype):
    deg = F.zeros((len(nids),), dtype=F.int64)
    for srctype, etype, dsttype in g.canonical_etypes:
        if srctype == ntype:
            deg += g.out_degrees(u=nids, etype=etype)
        elif dsttype == ntype:
            deg += g.in_degrees(v=nids, etype=etype)
    return deg

def check_rpc_hetero_sampling_empty_shuffle(tmpdir, num_server):
426
    generate_ip_config("rpc_ip_config.txt", num_server, num_server)
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450

    g = create_random_hetero(empty=True)
    num_parts = num_server
    num_hops = 1

    orig_nids, _ = partition_graph(g, 'test_sampling', num_parts, tmpdir,
                                   num_hops=num_hops, part_method='metis',
                                   reshuffle=True, return_mapping=True)

    pserver_list = []
    ctx = mp.get_context('spawn')
    for i in range(num_server):
        p = ctx.Process(target=start_server, args=(i, tmpdir, num_server > 1, 'test_sampling'))
        p.start()
        time.sleep(1)
        pserver_list.append(p)

    deg = get_degrees(g, orig_nids['n3'], 'n3')
    empty_nids = F.nonzero_1d(deg == 0)
    block, gpb = start_hetero_sample_client(0, tmpdir, num_server > 1,
                                            nodes = {'n3': empty_nids})
    print("Done sampling")
    for p in pserver_list:
        p.join()
451
        assert p.exitcode == 0
452
453
454
455

    assert block.number_of_edges() == 0
    assert len(block.etypes) == len(g.etypes)

456
def check_rpc_hetero_etype_sampling_shuffle(tmpdir, num_server, graph_formats=None):
457
458
    generate_ip_config("rpc_ip_config.txt", num_server, num_server)

459
460
461
462
    g = create_random_hetero(dense=True)
    num_parts = num_server
    num_hops = 1

463
    orig_nid_map, orig_eid_map = partition_graph(g, 'test_sampling', num_parts, tmpdir,
464
465
        num_hops=num_hops, part_method='metis', reshuffle=True, return_mapping=True,
        graph_formats=graph_formats)
466
467
468
469

    pserver_list = []
    ctx = mp.get_context('spawn')
    for i in range(num_server):
470
        p = ctx.Process(target=start_server, args=(i, tmpdir, num_server > 1, 'test_sampling', ['csc', 'coo']))
471
472
473
474
        p.start()
        time.sleep(1)
        pserver_list.append(p)

475
    fanout = {etype: 3 for etype in g.canonical_etypes}
476
477
478
    etype_sorted = False
    if graph_formats is not None:
        etype_sorted = 'csc' in graph_formats or 'csr' in graph_formats
479
    block, gpb = start_hetero_etype_sample_client(0, tmpdir, num_server > 1, fanout,
480
481
                                                  nodes={'n3': [0, 10, 99, 66, 124, 208]},
                                                  etype_sorted=etype_sorted)
482
483
484
    print("Done sampling")
    for p in pserver_list:
        p.join()
485
        assert p.exitcode == 0
486

487
    src, dst = block.edges(etype=('n1', 'r13', 'n3'))
488
    assert len(src) == 18
489
    src, dst = block.edges(etype=('n2', 'r23', 'n3'))
490
491
    assert len(src) == 18

492
493
    for c_etype in block.canonical_etypes:
        src_type, etype, dst_type = c_etype
494
495
496
497
498
499
500
501
        src, dst = block.edges(etype=etype)
        # These are global Ids after shuffling.
        shuffled_src = F.gather_row(block.srcnodes[src_type].data[dgl.NID], src)
        shuffled_dst = F.gather_row(block.dstnodes[dst_type].data[dgl.NID], dst)
        shuffled_eid = block.edges[etype].data[dgl.EID]

        orig_src = F.asnumpy(F.gather_row(orig_nid_map[src_type], shuffled_src))
        orig_dst = F.asnumpy(F.gather_row(orig_nid_map[dst_type], shuffled_dst))
502
        orig_eid = F.asnumpy(F.gather_row(orig_eid_map[c_etype], shuffled_eid))
503
504
505
506
507
508

        # Check the node Ids and edge Ids.
        orig_src1, orig_dst1 = g.find_edges(orig_eid, etype=etype)
        assert np.all(F.asnumpy(orig_src1) == orig_src)
        assert np.all(F.asnumpy(orig_dst1) == orig_dst)

509
def check_rpc_hetero_etype_sampling_empty_shuffle(tmpdir, num_server):
510
511
    generate_ip_config("rpc_ip_config.txt", num_server, num_server)

512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
    g = create_random_hetero(dense=True, empty=True)
    num_parts = num_server
    num_hops = 1

    orig_nids, _ = partition_graph(g, 'test_sampling', num_parts, tmpdir,
                                   num_hops=num_hops, part_method='metis',
                                   reshuffle=True, return_mapping=True)

    pserver_list = []
    ctx = mp.get_context('spawn')
    for i in range(num_server):
        p = ctx.Process(target=start_server, args=(i, tmpdir, num_server > 1, 'test_sampling'))
        p.start()
        time.sleep(1)
        pserver_list.append(p)

    fanout = 3
    deg = get_degrees(g, orig_nids['n3'], 'n3')
    empty_nids = F.nonzero_1d(deg == 0)
    block, gpb = start_hetero_etype_sample_client(0, tmpdir, num_server > 1, fanout,
                                                  nodes={'n3': empty_nids})
    print("Done sampling")
    for p in pserver_list:
        p.join()
536
        assert p.exitcode == 0
537
538
539
540

    assert block.number_of_edges() == 0
    assert len(block.etypes) == len(g.etypes)

541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592

def create_random_bipartite():
    g = dgl.rand_bipartite('user', 'buys', 'game', 500, 1000, 1000)
    g.nodes['user'].data['feat'] = F.ones(
        (g.num_nodes('user'), 10), F.float32, F.cpu())
    g.nodes['game'].data['feat'] = F.ones(
        (g.num_nodes('game'), 10), F.float32, F.cpu())
    return g


def start_bipartite_sample_client(rank, tmpdir, disable_shared_mem, nodes):
    gpb = None
    if disable_shared_mem:
        _, _, _, gpb, _, _, _ = load_partition(
            tmpdir / 'test_sampling.json', rank)
    dgl.distributed.initialize("rpc_ip_config.txt")
    dist_graph = DistGraph("test_sampling", gpb=gpb)
    assert 'feat' in dist_graph.nodes['user'].data
    assert 'feat' in dist_graph.nodes['game'].data
    if gpb is None:
        gpb = dist_graph.get_partition_book()
    sampled_graph = sample_neighbors(dist_graph, nodes, 3)
    block = dgl.to_block(sampled_graph, nodes)
    if sampled_graph.num_edges() > 0:
        block.edata[dgl.EID] = sampled_graph.edata[dgl.EID]
    dgl.distributed.exit_client()
    return block, gpb


def start_bipartite_etype_sample_client(rank, tmpdir, disable_shared_mem, fanout=3,
                                        nodes={}):
    gpb = None
    if disable_shared_mem:
        _, _, _, gpb, _, _, _ = load_partition(
            tmpdir / 'test_sampling.json', rank)
    dgl.distributed.initialize("rpc_ip_config.txt")
    dist_graph = DistGraph("test_sampling", gpb=gpb)
    assert 'feat' in dist_graph.nodes['user'].data
    assert 'feat' in dist_graph.nodes['game'].data

    if dist_graph.local_partition is not None:
        # Check whether etypes are sorted in dist_graph
        local_g = dist_graph.local_partition
        local_nids = np.arange(local_g.num_nodes())
        for lnid in local_nids:
            leids = local_g.in_edges(lnid, form='eid')
            letids = F.asnumpy(local_g.edata[dgl.ETYPE][leids])
            _, idices = np.unique(letids, return_index=True)
            assert np.all(idices[:-1] <= idices[1:])

    if gpb is None:
        gpb = dist_graph.get_partition_book()
593
    sampled_graph = sample_etype_neighbors(dist_graph, nodes, fanout)
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
    block = dgl.to_block(sampled_graph, nodes)
    if sampled_graph.num_edges() > 0:
        block.edata[dgl.EID] = sampled_graph.edata[dgl.EID]
    dgl.distributed.exit_client()
    return block, gpb


def check_rpc_bipartite_sampling_empty(tmpdir, num_server):
    """sample on bipartite via sample_neighbors() which yields empty sample results"""
    generate_ip_config("rpc_ip_config.txt", num_server, num_server)

    g = create_random_bipartite()
    num_parts = num_server
    num_hops = 1

    orig_nids, _ = partition_graph(g, 'test_sampling', num_parts, tmpdir,
                                   num_hops=num_hops, part_method='metis', reshuffle=True, return_mapping=True)

    pserver_list = []
    ctx = mp.get_context('spawn')
    for i in range(num_server):
        p = ctx.Process(target=start_server, args=(
            i, tmpdir, num_server > 1, 'test_sampling'))
        p.start()
        time.sleep(1)
        pserver_list.append(p)

    deg = get_degrees(g, orig_nids['game'], 'game')
    empty_nids = F.nonzero_1d(deg == 0)
    block, _ = start_bipartite_sample_client(0, tmpdir, num_server > 1,
                                             nodes={'game': empty_nids, 'user': [1]})

    print("Done sampling")
    for p in pserver_list:
        p.join()
629
        assert p.exitcode == 0
630
631
632
633
634
635
636
637
638
639
640
641
642

    assert block.number_of_edges() == 0
    assert len(block.etypes) == len(g.etypes)


def check_rpc_bipartite_sampling_shuffle(tmpdir, num_server):
    """sample on bipartite via sample_neighbors() which yields non-empty sample results"""
    generate_ip_config("rpc_ip_config.txt", num_server, num_server)

    g = create_random_bipartite()
    num_parts = num_server
    num_hops = 1

643
644
    orig_nid_map, orig_eid_map = partition_graph(g, 'test_sampling', num_parts, tmpdir,
        num_hops=num_hops, part_method='metis', reshuffle=True, return_mapping=True)
645
646
647
648
649
650
651
652
653
654

    pserver_list = []
    ctx = mp.get_context('spawn')
    for i in range(num_server):
        p = ctx.Process(target=start_server, args=(
            i, tmpdir, num_server > 1, 'test_sampling'))
        p.start()
        time.sleep(1)
        pserver_list.append(p)

655
    deg = get_degrees(g, orig_nid_map['game'], 'game')
656
657
658
659
660
661
    nids = F.nonzero_1d(deg > 0)
    block, gpb = start_bipartite_sample_client(0, tmpdir, num_server > 1,
                                               nodes={'game': nids, 'user': [0]})
    print("Done sampling")
    for p in pserver_list:
        p.join()
662
        assert p.exitcode == 0
663

664
665
    for c_etype in block.canonical_etypes:
        src_type, etype, dst_type = c_etype
666
667
668
669
670
671
672
673
674
675
676
677
        src, dst = block.edges(etype=etype)
        # These are global Ids after shuffling.
        shuffled_src = F.gather_row(
            block.srcnodes[src_type].data[dgl.NID], src)
        shuffled_dst = F.gather_row(
            block.dstnodes[dst_type].data[dgl.NID], dst)
        shuffled_eid = block.edges[etype].data[dgl.EID]

        orig_src = F.asnumpy(F.gather_row(
            orig_nid_map[src_type], shuffled_src))
        orig_dst = F.asnumpy(F.gather_row(
            orig_nid_map[dst_type], shuffled_dst))
678
        orig_eid = F.asnumpy(F.gather_row(orig_eid_map[c_etype], shuffled_eid))
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713

        # Check the node Ids and edge Ids.
        orig_src1, orig_dst1 = g.find_edges(orig_eid, etype=etype)
        assert np.all(F.asnumpy(orig_src1) == orig_src)
        assert np.all(F.asnumpy(orig_dst1) == orig_dst)


def check_rpc_bipartite_etype_sampling_empty(tmpdir, num_server):
    """sample on bipartite via sample_etype_neighbors() which yields empty sample results"""
    generate_ip_config("rpc_ip_config.txt", num_server, num_server)

    g = create_random_bipartite()
    num_parts = num_server
    num_hops = 1

    orig_nids, _ = partition_graph(g, 'test_sampling', num_parts, tmpdir,
                                   num_hops=num_hops, part_method='metis', reshuffle=True, return_mapping=True)

    pserver_list = []
    ctx = mp.get_context('spawn')
    for i in range(num_server):
        p = ctx.Process(target=start_server, args=(
            i, tmpdir, num_server > 1, 'test_sampling'))
        p.start()
        time.sleep(1)
        pserver_list.append(p)

    deg = get_degrees(g, orig_nids['game'], 'game')
    empty_nids = F.nonzero_1d(deg == 0)
    block, gpb = start_bipartite_etype_sample_client(0, tmpdir, num_server > 1,
                                                     nodes={'game': empty_nids, 'user': [1]})

    print("Done sampling")
    for p in pserver_list:
        p.join()
714
        assert p.exitcode == 0
715
716
717
718
719
720
721
722
723
724
725
726
727
728

    assert block is not None
    assert block.number_of_edges() == 0
    assert len(block.etypes) == len(g.etypes)


def check_rpc_bipartite_etype_sampling_shuffle(tmpdir, num_server):
    """sample on bipartite via sample_etype_neighbors() which yields non-empty sample results"""
    generate_ip_config("rpc_ip_config.txt", num_server, num_server)

    g = create_random_bipartite()
    num_parts = num_server
    num_hops = 1

729
730
    orig_nid_map, orig_eid_map = partition_graph(g, 'test_sampling', num_parts, tmpdir,
        num_hops=num_hops, part_method='metis', reshuffle=True, return_mapping=True)
731
732
733
734
735
736
737
738
739
740
741

    pserver_list = []
    ctx = mp.get_context('spawn')
    for i in range(num_server):
        p = ctx.Process(target=start_server, args=(
            i, tmpdir, num_server > 1, 'test_sampling'))
        p.start()
        time.sleep(1)
        pserver_list.append(p)

    fanout = 3
742
    deg = get_degrees(g, orig_nid_map['game'], 'game')
743
744
745
746
747
748
    nids = F.nonzero_1d(deg > 0)
    block, gpb = start_bipartite_etype_sample_client(0, tmpdir, num_server > 1, fanout,
                                                     nodes={'game': nids, 'user': [0]})
    print("Done sampling")
    for p in pserver_list:
        p.join()
749
        assert p.exitcode == 0
750

751
752
    for c_etype in block.canonical_etypes:
        src_type, etype, dst_type = c_etype
753
754
755
756
757
758
759
760
761
762
763
764
        src, dst = block.edges(etype=etype)
        # These are global Ids after shuffling.
        shuffled_src = F.gather_row(
            block.srcnodes[src_type].data[dgl.NID], src)
        shuffled_dst = F.gather_row(
            block.dstnodes[dst_type].data[dgl.NID], dst)
        shuffled_eid = block.edges[etype].data[dgl.EID]

        orig_src = F.asnumpy(F.gather_row(
            orig_nid_map[src_type], shuffled_src))
        orig_dst = F.asnumpy(F.gather_row(
            orig_nid_map[dst_type], shuffled_dst))
765
        orig_eid = F.asnumpy(F.gather_row(orig_eid_map[c_etype], shuffled_eid))
766
767
768
769
770
771

        # Check the node Ids and edge Ids.
        orig_src1, orig_dst1 = g.find_edges(orig_eid, etype=etype)
        assert np.all(F.asnumpy(orig_src1) == orig_src)
        assert np.all(F.asnumpy(orig_dst1) == orig_dst)

Jinjing Zhou's avatar
Jinjing Zhou committed
772
773
774
# Wait non shared memory graph store
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(dgl.backend.backend_name == 'tensorflow', reason='Not support tensorflow for now')
775
@unittest.skipIf(dgl.backend.backend_name == "mxnet", reason="Turn off Mxnet support")
776
777
@pytest.mark.parametrize("num_server", [1, 2])
def test_rpc_sampling_shuffle(num_server):
778
    reset_envs()
Jinjing Zhou's avatar
Jinjing Zhou committed
779
    import tempfile
780
    os.environ['DGL_DIST_MODE'] = 'distributed'
Jinjing Zhou's avatar
Jinjing Zhou committed
781
    with tempfile.TemporaryDirectory() as tmpdirname:
782
        check_rpc_sampling_shuffle(Path(tmpdirname), num_server)
783
784
785
        # [TODO][Rhett] Tests for multiple groups may fail sometimes and
        # root cause is unknown. Let's disable them for now.
        #check_rpc_sampling_shuffle(Path(tmpdirname), num_server, num_groups=2)
786
        check_rpc_hetero_sampling_shuffle(Path(tmpdirname), num_server)
787
        check_rpc_hetero_sampling_empty_shuffle(Path(tmpdirname), num_server)
788
        check_rpc_hetero_etype_sampling_shuffle(Path(tmpdirname), num_server)
789
790
791
        check_rpc_hetero_etype_sampling_shuffle(Path(tmpdirname), num_server, ['csc'])
        check_rpc_hetero_etype_sampling_shuffle(Path(tmpdirname), num_server, ['csr'])
        check_rpc_hetero_etype_sampling_shuffle(Path(tmpdirname), num_server, ['csc', 'coo'])
792
        check_rpc_hetero_etype_sampling_empty_shuffle(Path(tmpdirname), num_server)
793
794
795
796
        check_rpc_bipartite_sampling_empty(Path(tmpdirname), num_server)
        check_rpc_bipartite_sampling_shuffle(Path(tmpdirname), num_server)
        check_rpc_bipartite_etype_sampling_empty(Path(tmpdirname), num_server)
        check_rpc_bipartite_etype_sampling_shuffle(Path(tmpdirname), num_server)
Jinjing Zhou's avatar
Jinjing Zhou committed
797

798
def check_standalone_sampling(tmpdir, reshuffle):
799
    g = CitationGraphDataset("cora")[0]
800
801
802
803
    prob = np.maximum(np.random.randn(g.num_edges()), 0)
    mask = (prob > 0)
    g.edata['prob'] = F.tensor(prob)
    g.edata['mask'] = F.tensor(mask)
804
805
806
    num_parts = 1
    num_hops = 1
    partition_graph(g, 'test_sampling', num_parts, tmpdir,
807
                    num_hops=num_hops, part_method='metis', reshuffle=reshuffle)
808

809
    os.environ['DGL_DIST_MODE'] = 'standalone'
810
    dgl.distributed.initialize("rpc_ip_config.txt")
811
    dist_graph = DistGraph("test_sampling", part_config=tmpdir / 'test_sampling.json')
812
813
814
815
816
817
818
819
    sampled_graph = sample_neighbors(dist_graph, [0, 10, 99, 66, 1024, 2008], 3)

    src, dst = sampled_graph.edges()
    assert sampled_graph.number_of_nodes() == g.number_of_nodes()
    assert np.all(F.asnumpy(g.has_edges_between(src, dst)))
    eids = g.edge_ids(src, dst)
    assert np.array_equal(
        F.asnumpy(sampled_graph.edata[dgl.EID]), F.asnumpy(eids))
820
821
822
823
824
825
826
827
828
829

    sampled_graph = sample_neighbors(
            dist_graph, [0, 10, 99, 66, 1024, 2008], 3, prob='mask')
    eid = F.asnumpy(sampled_graph.edata[dgl.EID])
    assert mask[eid].all()

    sampled_graph = sample_neighbors(
            dist_graph, [0, 10, 99, 66, 1024, 2008], 3, prob='prob')
    eid = F.asnumpy(sampled_graph.edata[dgl.EID])
    assert (prob[eid] > 0).all()
830
    dgl.distributed.exit_client()
831

832
833
def check_standalone_etype_sampling(tmpdir, reshuffle):
    hg = CitationGraphDataset('cora')[0]
834
835
836
837
    prob = np.maximum(np.random.randn(hg.num_edges()), 0)
    mask = (prob > 0)
    hg.edata['prob'] = F.tensor(prob)
    hg.edata['mask'] = F.tensor(mask)
838
839
840
841
842
843
844
845
    num_parts = 1
    num_hops = 1

    partition_graph(hg, 'test_sampling', num_parts, tmpdir,
                    num_hops=num_hops, part_method='metis', reshuffle=reshuffle)
    os.environ['DGL_DIST_MODE'] = 'standalone'
    dgl.distributed.initialize("rpc_ip_config.txt")
    dist_graph = DistGraph("test_sampling", part_config=tmpdir / 'test_sampling.json')
846
    sampled_graph = sample_etype_neighbors(dist_graph, [0, 10, 99, 66, 1023], 3)
847
848
849
850
851
852
853

    src, dst = sampled_graph.edges()
    assert sampled_graph.number_of_nodes() == hg.number_of_nodes()
    assert np.all(F.asnumpy(hg.has_edges_between(src, dst)))
    eids = hg.edge_ids(src, dst)
    assert np.array_equal(
        F.asnumpy(sampled_graph.edata[dgl.EID]), F.asnumpy(eids))
854
855
856
857
858
859
860
861
862
863

    sampled_graph = sample_etype_neighbors(
            dist_graph, [0, 10, 99, 66, 1023], 3, prob='mask')
    eid = F.asnumpy(sampled_graph.edata[dgl.EID])
    assert mask[eid].all()

    sampled_graph = sample_etype_neighbors(
            dist_graph, [0, 10, 99, 66, 1023], 3, prob='prob')
    eid = F.asnumpy(sampled_graph.edata[dgl.EID])
    assert (prob[eid] > 0).all()
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
    dgl.distributed.exit_client()

def check_standalone_etype_sampling_heterograph(tmpdir, reshuffle):
    hg = CitationGraphDataset('cora')[0]
    num_parts = 1
    num_hops = 1
    src, dst = hg.edges()
    new_hg = dgl.heterograph({('paper', 'cite', 'paper'): (src, dst),
                              ('paper', 'cite-by', 'paper'): (dst, src)},
                              {'paper': hg.number_of_nodes()})
    partition_graph(new_hg, 'test_hetero_sampling', num_parts, tmpdir,
                    num_hops=num_hops, part_method='metis', reshuffle=reshuffle)
    os.environ['DGL_DIST_MODE'] = 'standalone'
    dgl.distributed.initialize("rpc_ip_config.txt")
    dist_graph = DistGraph("test_hetero_sampling", part_config=tmpdir / 'test_hetero_sampling.json')
879
880
    sampled_graph = sample_etype_neighbors(
            dist_graph, [0, 1, 2, 10, 99, 66, 1023, 1024, 2700, 2701], 1)
881
882
883
884
885
886
887
    src, dst = sampled_graph.edges(etype=('paper', 'cite', 'paper'))
    assert len(src) == 10
    src, dst = sampled_graph.edges(etype=('paper', 'cite-by', 'paper'))
    assert len(src) == 10
    assert sampled_graph.number_of_nodes() == new_hg.number_of_nodes()
    dgl.distributed.exit_client()

888
889
890
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(dgl.backend.backend_name == 'tensorflow', reason='Not support tensorflow for now')
def test_standalone_sampling():
891
    reset_envs()
892
893
894
    import tempfile
    os.environ['DGL_DIST_MODE'] = 'standalone'
    with tempfile.TemporaryDirectory() as tmpdirname:
895
896
        check_standalone_sampling(Path(tmpdirname), False)
        check_standalone_sampling(Path(tmpdirname), True)
897

898
899
def start_in_subgraph_client(rank, tmpdir, disable_shared_mem, nodes):
    gpb = None
900
    dgl.distributed.initialize("rpc_ip_config.txt")
901
    if disable_shared_mem:
902
        _, _, _, gpb, _, _, _ = load_partition(tmpdir / 'test_in_subgraph.json', rank)
903
    dist_graph = DistGraph("test_in_subgraph", gpb=gpb)
904
905
906
    try:
        sampled_graph = dgl.distributed.in_subgraph(dist_graph, nodes)
    except Exception as e:
907
        print(traceback.format_exc())
908
        sampled_graph = None
909
    dgl.distributed.exit_client()
910
911
912
    return sampled_graph


913
def check_rpc_in_subgraph_shuffle(tmpdir, num_server):
914
    generate_ip_config("rpc_ip_config.txt", num_server, num_server)
915
916
917
918

    g = CitationGraphDataset("cora")[0]
    num_parts = num_server

919
920
    orig_nid, orig_eid = partition_graph(g, 'test_in_subgraph', num_parts, tmpdir,
        num_hops=1, part_method='metis', reshuffle=True, return_mapping=True)
921
922
923
924
925
926
927
928
929
930
931
932
933

    pserver_list = []
    ctx = mp.get_context('spawn')
    for i in range(num_server):
        p = ctx.Process(target=start_server, args=(i, tmpdir, num_server > 1, 'test_in_subgraph'))
        p.start()
        time.sleep(1)
        pserver_list.append(p)

    nodes = [0, 10, 99, 66, 1024, 2008]
    sampled_graph = start_in_subgraph_client(0, tmpdir, num_server > 1, nodes)
    for p in pserver_list:
        p.join()
934
        assert p.exitcode == 0
935
936

    src, dst = sampled_graph.edges()
937
938
    src = orig_nid[src]
    dst = orig_nid[dst]
939
    assert sampled_graph.number_of_nodes() == g.number_of_nodes()
940
941
942
    assert np.all(F.asnumpy(g.has_edges_between(src, dst)))

    subg1 = dgl.in_subgraph(g, orig_nid[nodes])
943
944
945
946
    src1, dst1 = subg1.edges()
    assert np.all(np.sort(F.asnumpy(src)) == np.sort(F.asnumpy(src1)))
    assert np.all(np.sort(F.asnumpy(dst)) == np.sort(F.asnumpy(dst1)))
    eids = g.edge_ids(src, dst)
947
948
    eids1 = orig_eid[sampled_graph.edata[dgl.EID]]
    assert np.array_equal(F.asnumpy(eids1), F.asnumpy(eids))
949
950
951
952

@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(dgl.backend.backend_name == 'tensorflow', reason='Not support tensorflow for now')
def test_rpc_in_subgraph():
953
    reset_envs()
954
    import tempfile
955
    os.environ['DGL_DIST_MODE'] = 'distributed'
956
    with tempfile.TemporaryDirectory() as tmpdirname:
957
        check_rpc_in_subgraph_shuffle(Path(tmpdirname), 2)
958

959
960
961
962
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(dgl.backend.backend_name == 'tensorflow', reason='Not support tensorflow for now')
@unittest.skipIf(dgl.backend.backend_name == "mxnet", reason="Turn off Mxnet support")
def test_standalone_etype_sampling():
963
    reset_envs()
964
965
966
967
968
969
970
971
    import tempfile
    with tempfile.TemporaryDirectory() as tmpdirname:
        os.environ['DGL_DIST_MODE'] = 'standalone'
        check_standalone_etype_sampling_heterograph(Path(tmpdirname), True)
    with tempfile.TemporaryDirectory() as tmpdirname:
        os.environ['DGL_DIST_MODE'] = 'standalone'
        check_standalone_etype_sampling(Path(tmpdirname), True)

Jinjing Zhou's avatar
Jinjing Zhou committed
972
973
974
if __name__ == "__main__":
    import tempfile
    with tempfile.TemporaryDirectory() as tmpdirname:
975
        os.environ['DGL_DIST_MODE'] = 'standalone'
976
977
978
979
980
981
        check_standalone_etype_sampling_heterograph(Path(tmpdirname), True)

    with tempfile.TemporaryDirectory() as tmpdirname:
        os.environ['DGL_DIST_MODE'] = 'standalone'
        check_standalone_etype_sampling(Path(tmpdirname), True)
        check_standalone_etype_sampling(Path(tmpdirname), False)
982
983
        check_standalone_sampling(Path(tmpdirname), True)
        check_standalone_sampling(Path(tmpdirname), False)
984
        os.environ['DGL_DIST_MODE'] = 'distributed'
985
986
        check_rpc_sampling(Path(tmpdirname), 2)
        check_rpc_sampling(Path(tmpdirname), 1)
987
988
        check_rpc_get_degree_shuffle(Path(tmpdirname), 1)
        check_rpc_get_degree_shuffle(Path(tmpdirname), 2)
989
990
        check_rpc_find_edges_shuffle(Path(tmpdirname), 2)
        check_rpc_find_edges_shuffle(Path(tmpdirname), 1)
991
992
        check_rpc_hetero_find_edges_shuffle(Path(tmpdirname), 1)
        check_rpc_hetero_find_edges_shuffle(Path(tmpdirname), 2)
993
994
995
996
        check_rpc_in_subgraph_shuffle(Path(tmpdirname), 2)
        check_rpc_sampling_shuffle(Path(tmpdirname), 1)
        check_rpc_hetero_sampling_shuffle(Path(tmpdirname), 1)
        check_rpc_hetero_sampling_shuffle(Path(tmpdirname), 2)
997
998
999
1000
        check_rpc_hetero_sampling_empty_shuffle(Path(tmpdirname), 1)
        check_rpc_hetero_etype_sampling_shuffle(Path(tmpdirname), 1)
        check_rpc_hetero_etype_sampling_shuffle(Path(tmpdirname), 2)
        check_rpc_hetero_etype_sampling_empty_shuffle(Path(tmpdirname), 1)