test_dist_graph_store.py 15.6 KB
Newer Older
1
2
3
4
5
6
import os
os.environ['OMP_NUM_THREADS'] = '1'
import dgl
import sys
import numpy as np
import time
7
import socket
8
9
10
11
from scipy import sparse as spsp
from numpy.testing import assert_array_equal
from multiprocessing import Process, Manager, Condition, Value
import multiprocessing as mp
12
from dgl.heterograph_index import create_unitgraph_from_coo
13
14
from dgl.data.utils import load_graphs, save_graphs
from dgl.distributed import DistGraphServer, DistGraph
15
from dgl.distributed import partition_graph, load_partition, load_partition_book, node_split, edge_split
16
from dgl.distributed import SparseAdagrad, DistEmbedding
17
from numpy.testing import assert_almost_equal
18
import backend as F
19
import math
20
21
22
import unittest
import pickle

23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
if os.name != 'nt':
    import fcntl
    import struct

def get_local_usable_addr():
    """Get local usable IP and port

    Returns
    -------
    str
        IP address, e.g., '192.168.8.12:50051'
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    try:
        # doesn't even have to be reachable
        sock.connect(('10.255.255.255', 1))
        ip_addr = sock.getsockname()[0]
    except ValueError:
        ip_addr = '127.0.0.1'
    finally:
        sock.close()
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(("", 0))
    sock.listen(1)
    port = sock.getsockname()[1]
    sock.close()

    return ip_addr + ' ' + str(port)

52
def create_random_graph(n):
53
54
    arr = (spsp.random(n, n, density=0.001, format='coo', random_state=100) != 0).astype(np.int64)
    return dgl.graph(arr)
55

56
def run_server(graph_name, server_id, num_clients, shared_mem):
57
    g = DistGraphServer(server_id, "kv_ip_config.txt", num_clients,
58
59
                        '/tmp/dist_graph/{}.json'.format(graph_name),
                        disable_shared_mem=not shared_mem)
60
61
62
    print('start server', server_id)
    g.start()

63
64
65
def emb_init(shape, dtype):
    return F.zeros(shape, dtype, F.cpu())

66
def rand_init(shape, dtype):
67
    return F.tensor(np.random.normal(size=shape), F.float32)
68

69
def run_client(graph_name, part_id, num_clients, num_nodes, num_edges):
70
    time.sleep(5)
71
    dgl.distributed.initialize("kv_ip_config.txt")
72
73
    gpb, graph_name = load_partition_book('/tmp/dist_graph/{}.json'.format(graph_name),
                                          part_id, None)
74
    g = DistGraph("kv_ip_config.txt", graph_name, gpb=gpb)
75
    check_dist_graph(g, num_clients, num_nodes, num_edges)
76

77
def check_dist_graph(g, num_clients, num_nodes, num_edges):
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
    # Test API
    assert g.number_of_nodes() == num_nodes
    assert g.number_of_edges() == num_edges

    # Test reading node data
    nids = F.arange(0, int(g.number_of_nodes() / 2))
    feats1 = g.ndata['features'][nids]
    feats = F.squeeze(feats1, 1)
    assert np.all(F.asnumpy(feats == nids))

    # Test reading edge data
    eids = F.arange(0, int(g.number_of_edges() / 2))
    feats1 = g.edata['features'][eids]
    feats = F.squeeze(feats1, 1)
    assert np.all(F.asnumpy(feats == eids))

    # Test init node data
    new_shape = (g.number_of_nodes(), 2)
96
    g.ndata['test1'] = dgl.distributed.DistTensor(g, new_shape, F.int32)
97
98
99
    feats = g.ndata['test1'][nids]
    assert np.all(F.asnumpy(feats) == 0)

100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
    # reference to a one that exists
    test2 = dgl.distributed.DistTensor(g, new_shape, F.float32, 'test2', init_func=rand_init)
    test3 = dgl.distributed.DistTensor(g, new_shape, F.float32, 'test2')
    assert np.all(F.asnumpy(test2[nids]) == F.asnumpy(test3[nids]))

    # create a tensor and destroy a tensor and create it again.
    test3 = dgl.distributed.DistTensor(g, new_shape, F.float32, 'test3', init_func=rand_init)
    del test3
    test3 = dgl.distributed.DistTensor(g, (g.number_of_nodes(), 3), F.float32, 'test3')
    del test3

    # test a persistent tesnor
    test4 = dgl.distributed.DistTensor(g, new_shape, F.float32, 'test4', init_func=rand_init,
                                       persistent=True)
    del test4
    try:
        test4 = dgl.distributed.DistTensor(g, (g.number_of_nodes(), 3), F.float32, 'test4')
        raise Exception('')
    except:
        pass
120

121
122
    # Test sparse emb
    try:
123
        emb = DistEmbedding(g, g.number_of_nodes(), 1, 'emb1', emb_init)
124
125
126
127
128
129
130
131
132
        lr = 0.001
        optimizer = SparseAdagrad([emb], lr=lr)
        with F.record_grad():
            feats = emb(nids)
            assert np.all(F.asnumpy(feats) == np.zeros((len(nids), 1)))
            loss = F.sum(feats + 1, 0)
        loss.backward()
        optimizer.step()
        feats = emb(nids)
133
134
        if num_clients == 1:
            assert_almost_equal(F.asnumpy(feats), np.ones((len(nids), 1)) * -lr)
135
136
137
138
139
        rest = np.setdiff1d(np.arange(g.number_of_nodes()), F.asnumpy(nids))
        feats1 = emb(rest)
        assert np.all(F.asnumpy(feats1) == np.zeros((len(rest), 1)))

        policy = dgl.distributed.PartitionPolicy('node', g.get_partition_book())
140
141
        grad_sum = dgl.distributed.DistTensor(g, (g.number_of_nodes(),), F.float32,
                                              'emb1_sum', policy)
142
        assert np.all(F.asnumpy(grad_sum[nids]) == np.ones((len(nids), 1)) * num_clients)
143
144
        assert np.all(F.asnumpy(grad_sum[rest]) == np.zeros((len(rest), 1)))

145
        emb = DistEmbedding(g, g.number_of_nodes(), 1, 'emb2', emb_init)
146
147
148
149
        with F.no_grad():
            feats1 = emb(nids)
        assert np.all(F.asnumpy(feats1) == 0)

150
151
152
153
154
155
156
157
158
        optimizer = SparseAdagrad([emb], lr=lr)
        with F.record_grad():
            feats1 = emb(nids)
            feats2 = emb(nids)
            feats = F.cat([feats1, feats2], 0)
            assert np.all(F.asnumpy(feats) == np.zeros((len(nids) * 2, 1)))
            loss = F.sum(feats + 1, 0)
        loss.backward()
        optimizer.step()
159
160
        with F.no_grad():
            feats = emb(nids)
161
162
        if num_clients == 1:
            assert_almost_equal(F.asnumpy(feats), np.ones((len(nids), 1)) * math.sqrt(2) * -lr)
163
164
165
166
167
168
        rest = np.setdiff1d(np.arange(g.number_of_nodes()), F.asnumpy(nids))
        feats1 = emb(rest)
        assert np.all(F.asnumpy(feats1) == np.zeros((len(rest), 1)))
    except NotImplementedError as e:
        pass

169
170
171
172
173
174
175
176
177
178
179
180
181
182
    # Test write data
    new_feats = F.ones((len(nids), 2), F.int32, F.cpu())
    g.ndata['test1'][nids] = new_feats
    feats = g.ndata['test1'][nids]
    assert np.all(F.asnumpy(feats) == 1)

    # Test metadata operations.
    assert len(g.ndata['features']) == g.number_of_nodes()
    assert g.ndata['features'].shape == (g.number_of_nodes(), 1)
    assert g.ndata['features'].dtype == F.int64
    assert g.node_attr_schemes()['features'].dtype == F.int64
    assert g.node_attr_schemes()['test1'].dtype == F.int32
    assert g.node_attr_schemes()['features'].shape == (1,)

183
184
    selected_nodes = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
    # Test node split
185
    nodes = node_split(selected_nodes, g.get_partition_book())
186
187
188
189
190
191
    nodes = F.asnumpy(nodes)
    # We only have one partition, so the local nodes are basically all nodes in the graph.
    local_nids = np.arange(g.number_of_nodes())
    for n in nodes:
        assert n in local_nids

192
193
    print('end')

194
195
def check_server_client(shared_mem, num_servers, num_clients):
    prepare_dist(num_servers)
196
197
198
199
    g = create_random_graph(10000)

    # Partition the graph
    num_parts = 1
200
    graph_name = 'dist_graph_test_2'
201
202
    g.ndata['features'] = F.unsqueeze(F.arange(0, g.number_of_nodes()), 1)
    g.edata['features'] = F.unsqueeze(F.arange(0, g.number_of_edges()), 1)
203
    partition_graph(g, graph_name, num_parts, '/tmp/dist_graph')
204
205
206
207

    # let's just test on one partition for now.
    # We cannot run multiple servers and clients on the same machine.
    serv_ps = []
208
    ctx = mp.get_context('spawn')
209
210
211
    for serv_id in range(num_servers):
        p = ctx.Process(target=run_server, args=(graph_name, serv_id,
                                                 num_clients, shared_mem))
212
213
214
215
        serv_ps.append(p)
        p.start()

    cli_ps = []
216
    for cli_id in range(num_clients):
217
        print('start client', cli_id)
218
        p = ctx.Process(target=run_client, args=(graph_name, 0, num_clients, g.number_of_nodes(),
219
                                                 g.number_of_edges()))
220
221
222
223
224
        p.start()
        cli_ps.append(p)

    for p in cli_ps:
        p.join()
225
226
227
228

    for p in serv_ps:
        p.join()

229
230
    print('clients have terminated')

231
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
232
233
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support some of operations in DistGraph")
def test_server_client():
234
    os.environ['DGL_DIST_MODE'] = 'distributed'
235
236
237
238
    check_server_client(True, 1, 1)
    check_server_client(False, 1, 1)
    check_server_client(True, 2, 2)
    check_server_client(False, 2, 2)
239

240
241
242
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support some of operations in DistGraph")
def test_standalone():
    os.environ['DGL_DIST_MODE'] = 'standalone'
Da Zheng's avatar
Da Zheng committed
243
244
245
246
247
248
    # TODO(zhengda) this is a temporary fix. We need to make initialize work
    # for standalone mode as well.
    dgl.distributed.role.CUR_ROLE = 'default'
    dgl.distributed.role.GLOBAL_RANK = {-1:0}
    dgl.distributed.role.PER_ROLE_RANK['default'] = {-1:0}

249
250
251
252
253
254
255
256
    g = create_random_graph(10000)
    # Partition the graph
    num_parts = 1
    graph_name = 'dist_graph_test_3'
    g.ndata['features'] = F.unsqueeze(F.arange(0, g.number_of_nodes()), 1)
    g.edata['features'] = F.unsqueeze(F.arange(0, g.number_of_edges()), 1)
    partition_graph(g, graph_name, num_parts, '/tmp/dist_graph')
    dist_g = DistGraph("kv_ip_config.txt", graph_name,
257
                       part_config='/tmp/dist_graph/{}.json'.format(graph_name))
258
    check_dist_graph(dist_g, 1, g.number_of_nodes(), g.number_of_edges())
259

260
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
261
def test_split():
262
    #prepare_dist()
263
264
265
    g = create_random_graph(10000)
    num_parts = 4
    num_hops = 2
266
    partition_graph(g, 'dist_graph_test', num_parts, '/tmp/dist_graph', num_hops=num_hops, part_method='metis')
267
268
269
270
271

    node_mask = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
    edge_mask = np.random.randint(0, 100, size=g.number_of_edges()) > 30
    selected_nodes = np.nonzero(node_mask)[0]
    selected_edges = np.nonzero(edge_mask)[0]
Da Zheng's avatar
Da Zheng committed
272
273
274
275
276
277
278
279
280

    # The code now collects the roles of all client processes and use the information
    # to determine how to split the workloads. Here is to simulate the multi-client
    # use case.
    def set_roles(num_clients):
        dgl.distributed.role.CUR_ROLE = 'default'
        dgl.distributed.role.GLOBAL_RANK = {i:i for i in range(num_clients)}
        dgl.distributed.role.PER_ROLE_RANK['default'] = {i:i for i in range(num_clients)}

281
    for i in range(num_parts):
Da Zheng's avatar
Da Zheng committed
282
        set_roles(num_parts)
283
        part_g, node_feats, edge_feats, gpb, _ = load_partition('/tmp/dist_graph/dist_graph_test.json', i)
Da Zheng's avatar
Da Zheng committed
284
        local_nids = F.nonzero_1d(part_g.ndata['inner_node'])
285
286
        local_nids = F.gather_row(part_g.ndata[dgl.NID], local_nids)
        nodes1 = np.intersect1d(selected_nodes, F.asnumpy(local_nids))
Da Zheng's avatar
Da Zheng committed
287
        nodes2 = node_split(node_mask, gpb, i, force_even=False)
288
289
290
291
292
        assert np.all(np.sort(nodes1) == np.sort(F.asnumpy(nodes2)))
        local_nids = F.asnumpy(local_nids)
        for n in nodes1:
            assert n in local_nids

Da Zheng's avatar
Da Zheng committed
293
        set_roles(num_parts * 2)
Da Zheng's avatar
Da Zheng committed
294
295
        nodes3 = node_split(node_mask, gpb, i * 2, force_even=False)
        nodes4 = node_split(node_mask, gpb, i * 2 + 1, force_even=False)
296
297
298
        nodes5 = F.cat([nodes3, nodes4], 0)
        assert np.all(np.sort(nodes1) == np.sort(F.asnumpy(nodes5)))

Da Zheng's avatar
Da Zheng committed
299
        set_roles(num_parts)
Da Zheng's avatar
Da Zheng committed
300
        local_eids = F.nonzero_1d(part_g.edata['inner_edge'])
301
302
        local_eids = F.gather_row(part_g.edata[dgl.EID], local_eids)
        edges1 = np.intersect1d(selected_edges, F.asnumpy(local_eids))
Da Zheng's avatar
Da Zheng committed
303
        edges2 = edge_split(edge_mask, gpb, i, force_even=False)
304
305
306
307
308
        assert np.all(np.sort(edges1) == np.sort(F.asnumpy(edges2)))
        local_eids = F.asnumpy(local_eids)
        for e in edges1:
            assert e in local_eids

Da Zheng's avatar
Da Zheng committed
309
        set_roles(num_parts * 2)
Da Zheng's avatar
Da Zheng committed
310
311
        edges3 = edge_split(edge_mask, gpb, i * 2, force_even=False)
        edges4 = edge_split(edge_mask, gpb, i * 2 + 1, force_even=False)
312
313
314
        edges5 = F.cat([edges3, edges4], 0)
        assert np.all(np.sort(edges1) == np.sort(F.asnumpy(edges5)))

315
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
316
def test_split_even():
317
    #prepare_dist(1)
318
319
320
321
322
323
324
325
326
327
328
329
330
    g = create_random_graph(10000)
    num_parts = 4
    num_hops = 2
    partition_graph(g, 'dist_graph_test', num_parts, '/tmp/dist_graph', num_hops=num_hops, part_method='metis')

    node_mask = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
    edge_mask = np.random.randint(0, 100, size=g.number_of_edges()) > 30
    selected_nodes = np.nonzero(node_mask)[0]
    selected_edges = np.nonzero(edge_mask)[0]
    all_nodes1 = []
    all_nodes2 = []
    all_edges1 = []
    all_edges2 = []
Da Zheng's avatar
Da Zheng committed
331
332
333
334
335
336
337
338
339

    # The code now collects the roles of all client processes and use the information
    # to determine how to split the workloads. Here is to simulate the multi-client
    # use case.
    def set_roles(num_clients):
        dgl.distributed.role.CUR_ROLE = 'default'
        dgl.distributed.role.GLOBAL_RANK = {i:i for i in range(num_clients)}
        dgl.distributed.role.PER_ROLE_RANK['default'] = {i:i for i in range(num_clients)}

340
    for i in range(num_parts):
Da Zheng's avatar
Da Zheng committed
341
        set_roles(num_parts)
342
        part_g, node_feats, edge_feats, gpb, _ = load_partition('/tmp/dist_graph/dist_graph_test.json', i)
343
344
345
346
347
348
349
        local_nids = F.nonzero_1d(part_g.ndata['inner_node'])
        local_nids = F.gather_row(part_g.ndata[dgl.NID], local_nids)
        nodes = node_split(node_mask, gpb, i, force_even=True)
        all_nodes1.append(nodes)
        subset = np.intersect1d(F.asnumpy(nodes), F.asnumpy(local_nids))
        print('part {} get {} nodes and {} are in the partition'.format(i, len(nodes), len(subset)))

Da Zheng's avatar
Da Zheng committed
350
        set_roles(num_parts * 2)
351
352
353
354
355
356
357
        nodes1 = node_split(node_mask, gpb, i * 2, force_even=True)
        nodes2 = node_split(node_mask, gpb, i * 2 + 1, force_even=True)
        nodes3 = F.cat([nodes1, nodes2], 0)
        all_nodes2.append(nodes3)
        subset = np.intersect1d(F.asnumpy(nodes), F.asnumpy(nodes3))
        print('intersection has', len(subset))

Da Zheng's avatar
Da Zheng committed
358
        set_roles(num_parts)
359
360
361
362
363
364
365
        local_eids = F.nonzero_1d(part_g.edata['inner_edge'])
        local_eids = F.gather_row(part_g.edata[dgl.EID], local_eids)
        edges = edge_split(edge_mask, gpb, i, force_even=True)
        all_edges1.append(edges)
        subset = np.intersect1d(F.asnumpy(edges), F.asnumpy(local_eids))
        print('part {} get {} edges and {} are in the partition'.format(i, len(edges), len(subset)))

Da Zheng's avatar
Da Zheng committed
366
        set_roles(num_parts * 2)
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
        edges1 = edge_split(edge_mask, gpb, i * 2, force_even=True)
        edges2 = edge_split(edge_mask, gpb, i * 2 + 1, force_even=True)
        edges3 = F.cat([edges1, edges2], 0)
        all_edges2.append(edges3)
        subset = np.intersect1d(F.asnumpy(edges), F.asnumpy(edges3))
        print('intersection has', len(subset))
    all_nodes1 = F.cat(all_nodes1, 0)
    all_edges1 = F.cat(all_edges1, 0)
    all_nodes2 = F.cat(all_nodes2, 0)
    all_edges2 = F.cat(all_edges2, 0)
    all_nodes = np.nonzero(node_mask)[0]
    all_edges = np.nonzero(edge_mask)[0]
    assert np.all(all_nodes == F.asnumpy(all_nodes1))
    assert np.all(all_edges == F.asnumpy(all_edges1))
    assert np.all(all_nodes == F.asnumpy(all_nodes2))
    assert np.all(all_edges == F.asnumpy(all_edges2))

384
def prepare_dist(num_servers):
385
    ip_config = open("kv_ip_config.txt", "w")
386
    ip_addr = get_local_usable_addr()
387
    ip_config.write('{} {}\n'.format(ip_addr, num_servers))
388
389
    ip_config.close()

390
if __name__ == '__main__':
Da Zheng's avatar
Da Zheng committed
391
    os.makedirs('/tmp/dist_graph', exist_ok=True)
392
393
    test_split()
    test_split_even()
Da Zheng's avatar
Da Zheng committed
394
    test_server_client()
395
    test_standalone()