test_dist_graph_store.py 15.6 KB
Newer Older
1
2
3
4
5
6
import os
os.environ['OMP_NUM_THREADS'] = '1'
import dgl
import sys
import numpy as np
import time
7
import socket
8
9
10
11
from scipy import sparse as spsp
from numpy.testing import assert_array_equal
from multiprocessing import Process, Manager, Condition, Value
import multiprocessing as mp
12
from dgl.heterograph_index import create_unitgraph_from_coo
13
14
from dgl.data.utils import load_graphs, save_graphs
from dgl.distributed import DistGraphServer, DistGraph
15
from dgl.distributed import partition_graph, load_partition, load_partition_book, node_split, edge_split
16
from dgl.distributed import SparseAdagrad, DistEmbedding
17
from numpy.testing import assert_almost_equal
18
import backend as F
19
import math
20
21
22
import unittest
import pickle

23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
if os.name != 'nt':
    import fcntl
    import struct

def get_local_usable_addr():
    """Get local usable IP and port

    Returns
    -------
    str
        IP address, e.g., '192.168.8.12:50051'
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    try:
        # doesn't even have to be reachable
        sock.connect(('10.255.255.255', 1))
        ip_addr = sock.getsockname()[0]
    except ValueError:
        ip_addr = '127.0.0.1'
    finally:
        sock.close()
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(("", 0))
    sock.listen(1)
    port = sock.getsockname()[1]
    sock.close()

    return ip_addr + ' ' + str(port)

52
def create_random_graph(n):
53
54
    arr = (spsp.random(n, n, density=0.001, format='coo', random_state=100) != 0).astype(np.int64)
    return dgl.graph(arr)
55

56
57
def run_server(graph_name, server_id, server_count, num_clients, shared_mem):
    g = DistGraphServer(server_id, "kv_ip_config.txt", num_clients, server_count,
58
59
                        '/tmp/dist_graph/{}.json'.format(graph_name),
                        disable_shared_mem=not shared_mem)
60
61
62
    print('start server', server_id)
    g.start()

63
64
65
def emb_init(shape, dtype):
    return F.zeros(shape, dtype, F.cpu())

66
def rand_init(shape, dtype):
67
    return F.tensor(np.random.normal(size=shape), F.float32)
68

69
def run_client(graph_name, part_id, server_count, num_clients, num_nodes, num_edges):
70
    time.sleep(5)
71
    dgl.distributed.initialize("kv_ip_config.txt", server_count)
72
73
    gpb, graph_name = load_partition_book('/tmp/dist_graph/{}.json'.format(graph_name),
                                          part_id, None)
74
    g = DistGraph("kv_ip_config.txt", graph_name, gpb=gpb)
75
    check_dist_graph(g, num_clients, num_nodes, num_edges)
76

77
def check_dist_graph(g, num_clients, num_nodes, num_edges):
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
    # Test API
    assert g.number_of_nodes() == num_nodes
    assert g.number_of_edges() == num_edges

    # Test reading node data
    nids = F.arange(0, int(g.number_of_nodes() / 2))
    feats1 = g.ndata['features'][nids]
    feats = F.squeeze(feats1, 1)
    assert np.all(F.asnumpy(feats == nids))

    # Test reading edge data
    eids = F.arange(0, int(g.number_of_edges() / 2))
    feats1 = g.edata['features'][eids]
    feats = F.squeeze(feats1, 1)
    assert np.all(F.asnumpy(feats == eids))

    # Test init node data
    new_shape = (g.number_of_nodes(), 2)
96
    g.ndata['test1'] = dgl.distributed.DistTensor(g, new_shape, F.int32)
97
98
99
    feats = g.ndata['test1'][nids]
    assert np.all(F.asnumpy(feats) == 0)

100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
    # reference to a one that exists
    test2 = dgl.distributed.DistTensor(g, new_shape, F.float32, 'test2', init_func=rand_init)
    test3 = dgl.distributed.DistTensor(g, new_shape, F.float32, 'test2')
    assert np.all(F.asnumpy(test2[nids]) == F.asnumpy(test3[nids]))

    # create a tensor and destroy a tensor and create it again.
    test3 = dgl.distributed.DistTensor(g, new_shape, F.float32, 'test3', init_func=rand_init)
    del test3
    test3 = dgl.distributed.DistTensor(g, (g.number_of_nodes(), 3), F.float32, 'test3')
    del test3

    # test a persistent tesnor
    test4 = dgl.distributed.DistTensor(g, new_shape, F.float32, 'test4', init_func=rand_init,
                                       persistent=True)
    del test4
    try:
        test4 = dgl.distributed.DistTensor(g, (g.number_of_nodes(), 3), F.float32, 'test4')
        raise Exception('')
    except:
        pass
120

121
122
    # Test sparse emb
    try:
123
        emb = DistEmbedding(g, g.number_of_nodes(), 1, 'emb1', emb_init)
124
125
126
127
128
129
130
131
132
        lr = 0.001
        optimizer = SparseAdagrad([emb], lr=lr)
        with F.record_grad():
            feats = emb(nids)
            assert np.all(F.asnumpy(feats) == np.zeros((len(nids), 1)))
            loss = F.sum(feats + 1, 0)
        loss.backward()
        optimizer.step()
        feats = emb(nids)
133
134
        if num_clients == 1:
            assert_almost_equal(F.asnumpy(feats), np.ones((len(nids), 1)) * -lr)
135
136
137
138
139
        rest = np.setdiff1d(np.arange(g.number_of_nodes()), F.asnumpy(nids))
        feats1 = emb(rest)
        assert np.all(F.asnumpy(feats1) == np.zeros((len(rest), 1)))

        policy = dgl.distributed.PartitionPolicy('node', g.get_partition_book())
140
141
        grad_sum = dgl.distributed.DistTensor(g, (g.number_of_nodes(),), F.float32,
                                              'emb1_sum', policy)
142
143
        if num_clients == 1:
            assert np.all(F.asnumpy(grad_sum[nids]) == np.ones((len(nids), 1)) * num_clients)
144
145
        assert np.all(F.asnumpy(grad_sum[rest]) == np.zeros((len(rest), 1)))

146
        emb = DistEmbedding(g, g.number_of_nodes(), 1, 'emb2', emb_init)
147
148
149
150
        with F.no_grad():
            feats1 = emb(nids)
        assert np.all(F.asnumpy(feats1) == 0)

151
152
153
154
155
156
157
158
159
        optimizer = SparseAdagrad([emb], lr=lr)
        with F.record_grad():
            feats1 = emb(nids)
            feats2 = emb(nids)
            feats = F.cat([feats1, feats2], 0)
            assert np.all(F.asnumpy(feats) == np.zeros((len(nids) * 2, 1)))
            loss = F.sum(feats + 1, 0)
        loss.backward()
        optimizer.step()
160
161
        with F.no_grad():
            feats = emb(nids)
162
163
        if num_clients == 1:
            assert_almost_equal(F.asnumpy(feats), np.ones((len(nids), 1)) * math.sqrt(2) * -lr)
164
165
166
167
168
169
        rest = np.setdiff1d(np.arange(g.number_of_nodes()), F.asnumpy(nids))
        feats1 = emb(rest)
        assert np.all(F.asnumpy(feats1) == np.zeros((len(rest), 1)))
    except NotImplementedError as e:
        pass

170
171
172
173
174
175
176
177
178
179
180
181
182
183
    # Test write data
    new_feats = F.ones((len(nids), 2), F.int32, F.cpu())
    g.ndata['test1'][nids] = new_feats
    feats = g.ndata['test1'][nids]
    assert np.all(F.asnumpy(feats) == 1)

    # Test metadata operations.
    assert len(g.ndata['features']) == g.number_of_nodes()
    assert g.ndata['features'].shape == (g.number_of_nodes(), 1)
    assert g.ndata['features'].dtype == F.int64
    assert g.node_attr_schemes()['features'].dtype == F.int64
    assert g.node_attr_schemes()['test1'].dtype == F.int32
    assert g.node_attr_schemes()['features'].shape == (1,)

184
185
    selected_nodes = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
    # Test node split
186
    nodes = node_split(selected_nodes, g.get_partition_book())
187
188
189
190
191
192
    nodes = F.asnumpy(nodes)
    # We only have one partition, so the local nodes are basically all nodes in the graph.
    local_nids = np.arange(g.number_of_nodes())
    for n in nodes:
        assert n in local_nids

193
194
    print('end')

195
def check_server_client(shared_mem, num_servers, num_clients):
196
    prepare_dist()
197
198
199
200
    g = create_random_graph(10000)

    # Partition the graph
    num_parts = 1
201
    graph_name = 'dist_graph_test_2'
202
203
    g.ndata['features'] = F.unsqueeze(F.arange(0, g.number_of_nodes()), 1)
    g.edata['features'] = F.unsqueeze(F.arange(0, g.number_of_edges()), 1)
204
    partition_graph(g, graph_name, num_parts, '/tmp/dist_graph')
205
206
207
208

    # let's just test on one partition for now.
    # We cannot run multiple servers and clients on the same machine.
    serv_ps = []
209
    ctx = mp.get_context('spawn')
210
    for serv_id in range(num_servers):
211
        p = ctx.Process(target=run_server, args=(graph_name, serv_id, num_servers,
212
                                                 num_clients, shared_mem))
213
214
215
216
        serv_ps.append(p)
        p.start()

    cli_ps = []
217
    for cli_id in range(num_clients):
218
        print('start client', cli_id)
219
        p = ctx.Process(target=run_client, args=(graph_name, 0, num_servers, num_clients, g.number_of_nodes(),
220
                                                 g.number_of_edges()))
221
222
223
224
225
        p.start()
        cli_ps.append(p)

    for p in cli_ps:
        p.join()
226
227
228
229

    for p in serv_ps:
        p.join()

230
231
    print('clients have terminated')

232
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
233
234
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support some of operations in DistGraph")
def test_server_client():
235
    os.environ['DGL_DIST_MODE'] = 'distributed'
236
237
238
239
    check_server_client(True, 1, 1)
    check_server_client(False, 1, 1)
    check_server_client(True, 2, 2)
    check_server_client(False, 2, 2)
240

241
242
243
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support some of operations in DistGraph")
def test_standalone():
    os.environ['DGL_DIST_MODE'] = 'standalone'
Da Zheng's avatar
Da Zheng committed
244

245
246
247
248
249
250
251
    g = create_random_graph(10000)
    # Partition the graph
    num_parts = 1
    graph_name = 'dist_graph_test_3'
    g.ndata['features'] = F.unsqueeze(F.arange(0, g.number_of_nodes()), 1)
    g.edata['features'] = F.unsqueeze(F.arange(0, g.number_of_edges()), 1)
    partition_graph(g, graph_name, num_parts, '/tmp/dist_graph')
252
253

    dgl.distributed.initialize("kv_ip_config.txt")
254
    dist_g = DistGraph("kv_ip_config.txt", graph_name,
255
                       part_config='/tmp/dist_graph/{}.json'.format(graph_name))
256
    check_dist_graph(dist_g, 1, g.number_of_nodes(), g.number_of_edges())
257
    dgl.distributed.exit_client() # this is needed since there's two test here in one process
258

259
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
260
def test_split():
261
    #prepare_dist()
262
263
264
    g = create_random_graph(10000)
    num_parts = 4
    num_hops = 2
265
    partition_graph(g, 'dist_graph_test', num_parts, '/tmp/dist_graph', num_hops=num_hops, part_method='metis')
266
267
268
269
270

    node_mask = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
    edge_mask = np.random.randint(0, 100, size=g.number_of_edges()) > 30
    selected_nodes = np.nonzero(node_mask)[0]
    selected_edges = np.nonzero(edge_mask)[0]
Da Zheng's avatar
Da Zheng committed
271
272
273
274
275
276
277
278
279

    # The code now collects the roles of all client processes and use the information
    # to determine how to split the workloads. Here is to simulate the multi-client
    # use case.
    def set_roles(num_clients):
        dgl.distributed.role.CUR_ROLE = 'default'
        dgl.distributed.role.GLOBAL_RANK = {i:i for i in range(num_clients)}
        dgl.distributed.role.PER_ROLE_RANK['default'] = {i:i for i in range(num_clients)}

280
    for i in range(num_parts):
Da Zheng's avatar
Da Zheng committed
281
        set_roles(num_parts)
282
        part_g, node_feats, edge_feats, gpb, _ = load_partition('/tmp/dist_graph/dist_graph_test.json', i)
Da Zheng's avatar
Da Zheng committed
283
        local_nids = F.nonzero_1d(part_g.ndata['inner_node'])
284
285
        local_nids = F.gather_row(part_g.ndata[dgl.NID], local_nids)
        nodes1 = np.intersect1d(selected_nodes, F.asnumpy(local_nids))
Da Zheng's avatar
Da Zheng committed
286
        nodes2 = node_split(node_mask, gpb, i, force_even=False)
287
288
289
290
291
        assert np.all(np.sort(nodes1) == np.sort(F.asnumpy(nodes2)))
        local_nids = F.asnumpy(local_nids)
        for n in nodes1:
            assert n in local_nids

Da Zheng's avatar
Da Zheng committed
292
        set_roles(num_parts * 2)
Da Zheng's avatar
Da Zheng committed
293
294
        nodes3 = node_split(node_mask, gpb, i * 2, force_even=False)
        nodes4 = node_split(node_mask, gpb, i * 2 + 1, force_even=False)
295
296
297
        nodes5 = F.cat([nodes3, nodes4], 0)
        assert np.all(np.sort(nodes1) == np.sort(F.asnumpy(nodes5)))

Da Zheng's avatar
Da Zheng committed
298
        set_roles(num_parts)
Da Zheng's avatar
Da Zheng committed
299
        local_eids = F.nonzero_1d(part_g.edata['inner_edge'])
300
301
        local_eids = F.gather_row(part_g.edata[dgl.EID], local_eids)
        edges1 = np.intersect1d(selected_edges, F.asnumpy(local_eids))
Da Zheng's avatar
Da Zheng committed
302
        edges2 = edge_split(edge_mask, gpb, i, force_even=False)
303
304
305
306
307
        assert np.all(np.sort(edges1) == np.sort(F.asnumpy(edges2)))
        local_eids = F.asnumpy(local_eids)
        for e in edges1:
            assert e in local_eids

Da Zheng's avatar
Da Zheng committed
308
        set_roles(num_parts * 2)
Da Zheng's avatar
Da Zheng committed
309
310
        edges3 = edge_split(edge_mask, gpb, i * 2, force_even=False)
        edges4 = edge_split(edge_mask, gpb, i * 2 + 1, force_even=False)
311
312
313
        edges5 = F.cat([edges3, edges4], 0)
        assert np.all(np.sort(edges1) == np.sort(F.asnumpy(edges5)))

314
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
315
def test_split_even():
316
    #prepare_dist(1)
317
318
319
320
321
322
323
324
325
326
327
328
329
    g = create_random_graph(10000)
    num_parts = 4
    num_hops = 2
    partition_graph(g, 'dist_graph_test', num_parts, '/tmp/dist_graph', num_hops=num_hops, part_method='metis')

    node_mask = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
    edge_mask = np.random.randint(0, 100, size=g.number_of_edges()) > 30
    selected_nodes = np.nonzero(node_mask)[0]
    selected_edges = np.nonzero(edge_mask)[0]
    all_nodes1 = []
    all_nodes2 = []
    all_edges1 = []
    all_edges2 = []
Da Zheng's avatar
Da Zheng committed
330
331
332
333
334
335
336
337
338

    # The code now collects the roles of all client processes and use the information
    # to determine how to split the workloads. Here is to simulate the multi-client
    # use case.
    def set_roles(num_clients):
        dgl.distributed.role.CUR_ROLE = 'default'
        dgl.distributed.role.GLOBAL_RANK = {i:i for i in range(num_clients)}
        dgl.distributed.role.PER_ROLE_RANK['default'] = {i:i for i in range(num_clients)}

339
    for i in range(num_parts):
Da Zheng's avatar
Da Zheng committed
340
        set_roles(num_parts)
341
        part_g, node_feats, edge_feats, gpb, _ = load_partition('/tmp/dist_graph/dist_graph_test.json', i)
342
343
344
345
346
347
348
        local_nids = F.nonzero_1d(part_g.ndata['inner_node'])
        local_nids = F.gather_row(part_g.ndata[dgl.NID], local_nids)
        nodes = node_split(node_mask, gpb, i, force_even=True)
        all_nodes1.append(nodes)
        subset = np.intersect1d(F.asnumpy(nodes), F.asnumpy(local_nids))
        print('part {} get {} nodes and {} are in the partition'.format(i, len(nodes), len(subset)))

Da Zheng's avatar
Da Zheng committed
349
        set_roles(num_parts * 2)
350
351
352
353
354
355
356
        nodes1 = node_split(node_mask, gpb, i * 2, force_even=True)
        nodes2 = node_split(node_mask, gpb, i * 2 + 1, force_even=True)
        nodes3 = F.cat([nodes1, nodes2], 0)
        all_nodes2.append(nodes3)
        subset = np.intersect1d(F.asnumpy(nodes), F.asnumpy(nodes3))
        print('intersection has', len(subset))

Da Zheng's avatar
Da Zheng committed
357
        set_roles(num_parts)
358
359
360
361
362
363
364
        local_eids = F.nonzero_1d(part_g.edata['inner_edge'])
        local_eids = F.gather_row(part_g.edata[dgl.EID], local_eids)
        edges = edge_split(edge_mask, gpb, i, force_even=True)
        all_edges1.append(edges)
        subset = np.intersect1d(F.asnumpy(edges), F.asnumpy(local_eids))
        print('part {} get {} edges and {} are in the partition'.format(i, len(edges), len(subset)))

Da Zheng's avatar
Da Zheng committed
365
        set_roles(num_parts * 2)
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
        edges1 = edge_split(edge_mask, gpb, i * 2, force_even=True)
        edges2 = edge_split(edge_mask, gpb, i * 2 + 1, force_even=True)
        edges3 = F.cat([edges1, edges2], 0)
        all_edges2.append(edges3)
        subset = np.intersect1d(F.asnumpy(edges), F.asnumpy(edges3))
        print('intersection has', len(subset))
    all_nodes1 = F.cat(all_nodes1, 0)
    all_edges1 = F.cat(all_edges1, 0)
    all_nodes2 = F.cat(all_nodes2, 0)
    all_edges2 = F.cat(all_edges2, 0)
    all_nodes = np.nonzero(node_mask)[0]
    all_edges = np.nonzero(edge_mask)[0]
    assert np.all(all_nodes == F.asnumpy(all_nodes1))
    assert np.all(all_edges == F.asnumpy(all_edges1))
    assert np.all(all_nodes == F.asnumpy(all_nodes2))
    assert np.all(all_edges == F.asnumpy(all_edges2))

383
def prepare_dist():
384
    ip_config = open("kv_ip_config.txt", "w")
385
    ip_addr = get_local_usable_addr()
386
    ip_config.write('{}\n'.format(ip_addr))
387
388
    ip_config.close()

389
if __name__ == '__main__':
Da Zheng's avatar
Da Zheng committed
390
    os.makedirs('/tmp/dist_graph', exist_ok=True)
391
392
    test_split()
    test_split_even()
Da Zheng's avatar
Da Zheng committed
393
    test_server_client()
394
    test_standalone()