test_dist_graph_store.py 14.4 KB
Newer Older
1
2
3
4
5
6
import os
os.environ['OMP_NUM_THREADS'] = '1'
import dgl
import sys
import numpy as np
import time
7
import socket
8
9
10
11
from scipy import sparse as spsp
from numpy.testing import assert_array_equal
from multiprocessing import Process, Manager, Condition, Value
import multiprocessing as mp
12
from dgl.heterograph_index import create_unitgraph_from_coo
13
14
from dgl.data.utils import load_graphs, save_graphs
from dgl.distributed import DistGraphServer, DistGraph
15
from dgl.distributed import partition_graph, load_partition, load_partition_book, node_split, edge_split
16
from dgl.distributed import SparseAdagrad, DistEmbedding
17
from numpy.testing import assert_almost_equal
18
import backend as F
19
import math
20
21
22
import unittest
import pickle

23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
if os.name != 'nt':
    import fcntl
    import struct

def get_local_usable_addr():
    """Get local usable IP and port

    Returns
    -------
    str
        IP address, e.g., '192.168.8.12:50051'
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    try:
        # doesn't even have to be reachable
        sock.connect(('10.255.255.255', 1))
        ip_addr = sock.getsockname()[0]
    except ValueError:
        ip_addr = '127.0.0.1'
    finally:
        sock.close()
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(("", 0))
    sock.listen(1)
    port = sock.getsockname()[1]
    sock.close()

    return ip_addr + ' ' + str(port)

52
def create_random_graph(n):
53
54
    arr = (spsp.random(n, n, density=0.001, format='coo', random_state=100) != 0).astype(np.int64)
    return dgl.graph(arr)
55

56
def run_server(graph_name, server_id, num_clients, shared_mem):
57
    g = DistGraphServer(server_id, "kv_ip_config.txt", num_clients,
58
59
                        '/tmp/dist_graph/{}.json'.format(graph_name),
                        disable_shared_mem=not shared_mem)
60
61
62
    print('start server', server_id)
    g.start()

63
64
65
def emb_init(shape, dtype):
    return F.zeros(shape, dtype, F.cpu())

66
def rand_init(shape, dtype):
67
    return F.tensor(np.random.normal(size=shape), F.float32)
68

69
def run_client(graph_name, part_id, num_clients, num_nodes, num_edges):
70
    time.sleep(5)
71
72
    gpb, graph_name = load_partition_book('/tmp/dist_graph/{}.json'.format(graph_name),
                                          part_id, None)
73
    g = DistGraph("kv_ip_config.txt", graph_name, gpb=gpb)
74
    check_dist_graph(g, num_clients, num_nodes, num_edges)
75

76
def check_dist_graph(g, num_clients, num_nodes, num_edges):
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
    # Test API
    assert g.number_of_nodes() == num_nodes
    assert g.number_of_edges() == num_edges

    # Test reading node data
    nids = F.arange(0, int(g.number_of_nodes() / 2))
    feats1 = g.ndata['features'][nids]
    feats = F.squeeze(feats1, 1)
    assert np.all(F.asnumpy(feats == nids))

    # Test reading edge data
    eids = F.arange(0, int(g.number_of_edges() / 2))
    feats1 = g.edata['features'][eids]
    feats = F.squeeze(feats1, 1)
    assert np.all(F.asnumpy(feats == eids))

    # Test init node data
    new_shape = (g.number_of_nodes(), 2)
95
    g.ndata['test1'] = dgl.distributed.DistTensor(g, new_shape, F.int32)
96
97
98
    feats = g.ndata['test1'][nids]
    assert np.all(F.asnumpy(feats) == 0)

99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
    # reference to a one that exists
    test2 = dgl.distributed.DistTensor(g, new_shape, F.float32, 'test2', init_func=rand_init)
    test3 = dgl.distributed.DistTensor(g, new_shape, F.float32, 'test2')
    assert np.all(F.asnumpy(test2[nids]) == F.asnumpy(test3[nids]))

    # create a tensor and destroy a tensor and create it again.
    test3 = dgl.distributed.DistTensor(g, new_shape, F.float32, 'test3', init_func=rand_init)
    del test3
    test3 = dgl.distributed.DistTensor(g, (g.number_of_nodes(), 3), F.float32, 'test3')
    del test3

    # test a persistent tesnor
    test4 = dgl.distributed.DistTensor(g, new_shape, F.float32, 'test4', init_func=rand_init,
                                       persistent=True)
    del test4
    try:
        test4 = dgl.distributed.DistTensor(g, (g.number_of_nodes(), 3), F.float32, 'test4')
        raise Exception('')
    except:
        pass
119

120
121
    # Test sparse emb
    try:
122
        emb = DistEmbedding(g, g.number_of_nodes(), 1, 'emb1', emb_init)
123
124
125
126
127
128
129
130
131
        lr = 0.001
        optimizer = SparseAdagrad([emb], lr=lr)
        with F.record_grad():
            feats = emb(nids)
            assert np.all(F.asnumpy(feats) == np.zeros((len(nids), 1)))
            loss = F.sum(feats + 1, 0)
        loss.backward()
        optimizer.step()
        feats = emb(nids)
132
133
        if num_clients == 1:
            assert_almost_equal(F.asnumpy(feats), np.ones((len(nids), 1)) * -lr)
134
135
136
137
138
        rest = np.setdiff1d(np.arange(g.number_of_nodes()), F.asnumpy(nids))
        feats1 = emb(rest)
        assert np.all(F.asnumpy(feats1) == np.zeros((len(rest), 1)))

        policy = dgl.distributed.PartitionPolicy('node', g.get_partition_book())
139
140
        grad_sum = dgl.distributed.DistTensor(g, (g.number_of_nodes(),), F.float32,
                                              'emb1_sum', policy)
141
        assert np.all(F.asnumpy(grad_sum[nids]) == np.ones((len(nids), 1)) * num_clients)
142
143
        assert np.all(F.asnumpy(grad_sum[rest]) == np.zeros((len(rest), 1)))

144
        emb = DistEmbedding(g, g.number_of_nodes(), 1, 'emb2', emb_init)
145
146
147
148
        with F.no_grad():
            feats1 = emb(nids)
        assert np.all(F.asnumpy(feats1) == 0)

149
150
151
152
153
154
155
156
157
        optimizer = SparseAdagrad([emb], lr=lr)
        with F.record_grad():
            feats1 = emb(nids)
            feats2 = emb(nids)
            feats = F.cat([feats1, feats2], 0)
            assert np.all(F.asnumpy(feats) == np.zeros((len(nids) * 2, 1)))
            loss = F.sum(feats + 1, 0)
        loss.backward()
        optimizer.step()
158
159
        with F.no_grad():
            feats = emb(nids)
160
161
        if num_clients == 1:
            assert_almost_equal(F.asnumpy(feats), np.ones((len(nids), 1)) * math.sqrt(2) * -lr)
162
163
164
165
166
167
        rest = np.setdiff1d(np.arange(g.number_of_nodes()), F.asnumpy(nids))
        feats1 = emb(rest)
        assert np.all(F.asnumpy(feats1) == np.zeros((len(rest), 1)))
    except NotImplementedError as e:
        pass

168
169
170
171
172
173
174
175
176
177
178
179
180
181
    # Test write data
    new_feats = F.ones((len(nids), 2), F.int32, F.cpu())
    g.ndata['test1'][nids] = new_feats
    feats = g.ndata['test1'][nids]
    assert np.all(F.asnumpy(feats) == 1)

    # Test metadata operations.
    assert len(g.ndata['features']) == g.number_of_nodes()
    assert g.ndata['features'].shape == (g.number_of_nodes(), 1)
    assert g.ndata['features'].dtype == F.int64
    assert g.node_attr_schemes()['features'].dtype == F.int64
    assert g.node_attr_schemes()['test1'].dtype == F.int32
    assert g.node_attr_schemes()['features'].shape == (1,)

182
183
    selected_nodes = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
    # Test node split
184
    nodes = node_split(selected_nodes, g.get_partition_book())
185
186
187
188
189
190
    nodes = F.asnumpy(nodes)
    # We only have one partition, so the local nodes are basically all nodes in the graph.
    local_nids = np.arange(g.number_of_nodes())
    for n in nodes:
        assert n in local_nids

191
192
    print('end')

193
194
def check_server_client(shared_mem, num_servers, num_clients):
    prepare_dist(num_servers)
195
196
197
198
    g = create_random_graph(10000)

    # Partition the graph
    num_parts = 1
199
    graph_name = 'dist_graph_test_2'
200
201
    g.ndata['features'] = F.unsqueeze(F.arange(0, g.number_of_nodes()), 1)
    g.edata['features'] = F.unsqueeze(F.arange(0, g.number_of_edges()), 1)
202
    partition_graph(g, graph_name, num_parts, '/tmp/dist_graph')
203
204
205
206

    # let's just test on one partition for now.
    # We cannot run multiple servers and clients on the same machine.
    serv_ps = []
207
    ctx = mp.get_context('spawn')
208
209
210
    for serv_id in range(num_servers):
        p = ctx.Process(target=run_server, args=(graph_name, serv_id,
                                                 num_clients, shared_mem))
211
212
213
214
        serv_ps.append(p)
        p.start()

    cli_ps = []
215
    for cli_id in range(num_clients):
216
        print('start client', cli_id)
217
        p = ctx.Process(target=run_client, args=(graph_name, 0, num_clients, g.number_of_nodes(),
218
                                                 g.number_of_edges()))
219
220
221
222
223
        p.start()
        cli_ps.append(p)

    for p in cli_ps:
        p.join()
224
225
226
227

    for p in serv_ps:
        p.join()

228
229
    print('clients have terminated')

230
231
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support some of operations in DistGraph")
def test_server_client():
232
    os.environ['DGL_DIST_MODE'] = 'distributed'
233
234
235
236
    check_server_client(True, 1, 1)
    check_server_client(False, 1, 1)
    check_server_client(True, 2, 2)
    check_server_client(False, 2, 2)
237

238
239
240
241
242
243
244
245
246
247
248
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support some of operations in DistGraph")
def test_standalone():
    os.environ['DGL_DIST_MODE'] = 'standalone'
    g = create_random_graph(10000)
    # Partition the graph
    num_parts = 1
    graph_name = 'dist_graph_test_3'
    g.ndata['features'] = F.unsqueeze(F.arange(0, g.number_of_nodes()), 1)
    g.edata['features'] = F.unsqueeze(F.arange(0, g.number_of_edges()), 1)
    partition_graph(g, graph_name, num_parts, '/tmp/dist_graph')
    dist_g = DistGraph("kv_ip_config.txt", graph_name,
249
                       part_config='/tmp/dist_graph/{}.json'.format(graph_name))
250
    check_dist_graph(dist_g, 1, g.number_of_nodes(), g.number_of_edges())
251

252
def test_split():
253
    #prepare_dist()
254
255
256
    g = create_random_graph(10000)
    num_parts = 4
    num_hops = 2
257
    partition_graph(g, 'dist_graph_test', num_parts, '/tmp/dist_graph', num_hops=num_hops, part_method='metis')
258
259
260
261
262
263

    node_mask = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
    edge_mask = np.random.randint(0, 100, size=g.number_of_edges()) > 30
    selected_nodes = np.nonzero(node_mask)[0]
    selected_edges = np.nonzero(edge_mask)[0]
    for i in range(num_parts):
264
        dgl.distributed.set_num_client(num_parts)
265
        part_g, node_feats, edge_feats, gpb, _ = load_partition('/tmp/dist_graph/dist_graph_test.json', i)
Da Zheng's avatar
Da Zheng committed
266
        local_nids = F.nonzero_1d(part_g.ndata['inner_node'])
267
268
        local_nids = F.gather_row(part_g.ndata[dgl.NID], local_nids)
        nodes1 = np.intersect1d(selected_nodes, F.asnumpy(local_nids))
Da Zheng's avatar
Da Zheng committed
269
        nodes2 = node_split(node_mask, gpb, i, force_even=False)
270
271
272
273
274
        assert np.all(np.sort(nodes1) == np.sort(F.asnumpy(nodes2)))
        local_nids = F.asnumpy(local_nids)
        for n in nodes1:
            assert n in local_nids

275
        dgl.distributed.set_num_client(num_parts * 2)
Da Zheng's avatar
Da Zheng committed
276
277
        nodes3 = node_split(node_mask, gpb, i * 2, force_even=False)
        nodes4 = node_split(node_mask, gpb, i * 2 + 1, force_even=False)
278
279
280
281
        nodes5 = F.cat([nodes3, nodes4], 0)
        assert np.all(np.sort(nodes1) == np.sort(F.asnumpy(nodes5)))

        dgl.distributed.set_num_client(num_parts)
Da Zheng's avatar
Da Zheng committed
282
        local_eids = F.nonzero_1d(part_g.edata['inner_edge'])
283
284
        local_eids = F.gather_row(part_g.edata[dgl.EID], local_eids)
        edges1 = np.intersect1d(selected_edges, F.asnumpy(local_eids))
Da Zheng's avatar
Da Zheng committed
285
        edges2 = edge_split(edge_mask, gpb, i, force_even=False)
286
287
288
289
290
        assert np.all(np.sort(edges1) == np.sort(F.asnumpy(edges2)))
        local_eids = F.asnumpy(local_eids)
        for e in edges1:
            assert e in local_eids

291
        dgl.distributed.set_num_client(num_parts * 2)
Da Zheng's avatar
Da Zheng committed
292
293
        edges3 = edge_split(edge_mask, gpb, i * 2, force_even=False)
        edges4 = edge_split(edge_mask, gpb, i * 2 + 1, force_even=False)
294
295
296
297
        edges5 = F.cat([edges3, edges4], 0)
        assert np.all(np.sort(edges1) == np.sort(F.asnumpy(edges5)))

def test_split_even():
298
    #prepare_dist(1)
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
    g = create_random_graph(10000)
    num_parts = 4
    num_hops = 2
    partition_graph(g, 'dist_graph_test', num_parts, '/tmp/dist_graph', num_hops=num_hops, part_method='metis')

    node_mask = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
    edge_mask = np.random.randint(0, 100, size=g.number_of_edges()) > 30
    selected_nodes = np.nonzero(node_mask)[0]
    selected_edges = np.nonzero(edge_mask)[0]
    all_nodes1 = []
    all_nodes2 = []
    all_edges1 = []
    all_edges2 = []
    for i in range(num_parts):
        dgl.distributed.set_num_client(num_parts)
314
        part_g, node_feats, edge_feats, gpb, _ = load_partition('/tmp/dist_graph/dist_graph_test.json', i)
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
        local_nids = F.nonzero_1d(part_g.ndata['inner_node'])
        local_nids = F.gather_row(part_g.ndata[dgl.NID], local_nids)
        nodes = node_split(node_mask, gpb, i, force_even=True)
        all_nodes1.append(nodes)
        subset = np.intersect1d(F.asnumpy(nodes), F.asnumpy(local_nids))
        print('part {} get {} nodes and {} are in the partition'.format(i, len(nodes), len(subset)))

        dgl.distributed.set_num_client(num_parts * 2)
        nodes1 = node_split(node_mask, gpb, i * 2, force_even=True)
        nodes2 = node_split(node_mask, gpb, i * 2 + 1, force_even=True)
        nodes3 = F.cat([nodes1, nodes2], 0)
        all_nodes2.append(nodes3)
        subset = np.intersect1d(F.asnumpy(nodes), F.asnumpy(nodes3))
        print('intersection has', len(subset))

        dgl.distributed.set_num_client(num_parts)
        local_eids = F.nonzero_1d(part_g.edata['inner_edge'])
        local_eids = F.gather_row(part_g.edata[dgl.EID], local_eids)
        edges = edge_split(edge_mask, gpb, i, force_even=True)
        all_edges1.append(edges)
        subset = np.intersect1d(F.asnumpy(edges), F.asnumpy(local_eids))
        print('part {} get {} edges and {} are in the partition'.format(i, len(edges), len(subset)))

        dgl.distributed.set_num_client(num_parts * 2)
        edges1 = edge_split(edge_mask, gpb, i * 2, force_even=True)
        edges2 = edge_split(edge_mask, gpb, i * 2 + 1, force_even=True)
        edges3 = F.cat([edges1, edges2], 0)
        all_edges2.append(edges3)
        subset = np.intersect1d(F.asnumpy(edges), F.asnumpy(edges3))
        print('intersection has', len(subset))
    all_nodes1 = F.cat(all_nodes1, 0)
    all_edges1 = F.cat(all_edges1, 0)
    all_nodes2 = F.cat(all_nodes2, 0)
    all_edges2 = F.cat(all_edges2, 0)
    all_nodes = np.nonzero(node_mask)[0]
    all_edges = np.nonzero(edge_mask)[0]
    assert np.all(all_nodes == F.asnumpy(all_nodes1))
    assert np.all(all_edges == F.asnumpy(all_edges1))
    assert np.all(all_nodes == F.asnumpy(all_nodes2))
    assert np.all(all_edges == F.asnumpy(all_edges2))

356
def prepare_dist(num_servers):
357
    ip_config = open("kv_ip_config.txt", "w")
358
    ip_addr = get_local_usable_addr()
359
    ip_config.write('{} {}\n'.format(ip_addr, num_servers))
360
361
    ip_config.close()

362
if __name__ == '__main__':
Da Zheng's avatar
Da Zheng committed
363
    os.makedirs('/tmp/dist_graph', exist_ok=True)
364
365
    test_split()
    test_split_even()
Da Zheng's avatar
Da Zheng committed
366
    test_server_client()
367
    test_standalone()