test_new_kvstore.py 13.7 KB
Newer Older
1
2
3
import os
import time
import numpy as np
4
import socket
5
6
7
from scipy import sparse as spsp
import dgl
import backend as F
Da Zheng's avatar
Da Zheng committed
8
import unittest
9
from dgl.graph_index import create_graph_index
10
import multiprocessing as mp
11
12
from numpy.testing import assert_array_equal

13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
if os.name != 'nt':
    import fcntl
    import struct

def get_local_usable_addr():
    """Get local usable IP and port

    Returns
    -------
    str
        IP address, e.g., '192.168.8.12:50051'
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    try:
        # doesn't even have to be reachable
        sock.connect(('10.255.255.255', 1))
        ip_addr = sock.getsockname()[0]
    except ValueError:
        ip_addr = '127.0.0.1'
    finally:
        sock.close()
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(("", 0))
    sock.listen(1)
    port = sock.getsockname()[1]
    sock.close()

    return ip_addr + ' ' + str(port)

42
43
44
45
46
47
48
49
# Create an one-part Graph
node_map = F.tensor([0,0,0,0,0,0], F.int64)
edge_map = F.tensor([0,0,0,0,0,0,0], F.int64)
global_nid = F.tensor([0,1,2,3,4,5], F.int64)
global_eid = F.tensor([0,1,2,3,4,5,6], F.int64)

g = dgl.DGLGraph()
g.add_nodes(6)
50
51
52
53
54
55
56
g.add_edges(0, 1) # 0
g.add_edges(0, 2) # 1
g.add_edges(0, 3) # 2
g.add_edges(2, 3) # 3
g.add_edges(1, 1) # 4
g.add_edges(0, 4) # 5
g.add_edges(2, 5) # 6
57
58
59
60

g.ndata[dgl.NID] = global_nid
g.edata[dgl.EID] = global_eid

61
62
63
64
65
gpb = dgl.distributed.graph_partition_book.BasicPartitionBook(part_id=0,
                                                              num_parts=1,
                                                              node_map=node_map,
                                                              edge_map=edge_map,
                                                              part_graph=g)
66

67
node_policy = dgl.distributed.PartitionPolicy(policy_str='node:_N',
68
69
                                              partition_book=gpb)

70
edge_policy = dgl.distributed.PartitionPolicy(policy_str='edge:_E',
71
72
73
                                              partition_book=gpb)

data_0 = F.tensor([[1.,1.],[1.,1.],[1.,1.],[1.,1.],[1.,1.],[1.,1.]], F.float32)
74
75
76
data_0_1 = F.tensor([1.,2.,3.,4.,5.,6.], F.float32)
data_0_2 = F.tensor([1,2,3,4,5,6], F.int32)
data_0_3 = F.tensor([1,2,3,4,5,6], F.int64)
77
78
79
80
81
82
83
data_1 = F.tensor([[2.,2.],[2.,2.],[2.,2.],[2.,2.],[2.,2.],[2.,2.],[2.,2.]], F.float32)
data_2 = F.tensor([[0.,0.],[0.,0.],[0.,0.],[0.,0.],[0.,0.],[0.,0.]], F.float32)

def init_zero_func(shape, dtype):
    return F.zeros(shape, dtype, F.cpu())

def udf_push(target, name, id_tensor, data_tensor):
84
85
86
87
    target[name][id_tensor] = data_tensor * data_tensor

def add_push(target, name, id_tensor, data_tensor):
    target[name][id_tensor] += data_tensor
88
89
90
91
92
93
94
95
96
97
98
99
100

@unittest.skipIf(os.name == 'nt' or os.getenv('DGLBACKEND') == 'tensorflow', reason='Do not support windows and TF yet')
def test_partition_policy():
    assert node_policy.part_id == 0
    assert edge_policy.part_id == 0
    local_nid = node_policy.to_local(F.tensor([0,1,2,3,4,5]))
    local_eid = edge_policy.to_local(F.tensor([0,1,2,3,4,5,6]))
    assert_array_equal(F.asnumpy(local_nid), F.asnumpy(F.tensor([0,1,2,3,4,5], F.int64)))
    assert_array_equal(F.asnumpy(local_eid), F.asnumpy(F.tensor([0,1,2,3,4,5,6], F.int64)))
    nid_partid = node_policy.to_partid(F.tensor([0,1,2,3,4,5], F.int64))
    eid_partid = edge_policy.to_partid(F.tensor([0,1,2,3,4,5,6], F.int64))
    assert_array_equal(F.asnumpy(nid_partid), F.asnumpy(F.tensor([0,0,0,0,0,0], F.int64)))
    assert_array_equal(F.asnumpy(eid_partid), F.asnumpy(F.tensor([0,0,0,0,0,0,0], F.int64)))
101
102
    assert node_policy.get_part_size() == len(node_map)
    assert edge_policy.get_part_size() == len(edge_map)
103

104
def start_server(server_id, num_clients, num_servers):
105
    # Init kvserver
106
107
    print("Sleep 5 seconds to test client re-connect.")
    time.sleep(5)
108
    kvserver = dgl.distributed.KVServer(server_id=server_id,
109
                                        ip_config='kv_ip_config.txt',
110
                                        num_servers=num_servers,
111
                                        num_clients=num_clients)
112
113
    kvserver.add_part_policy(node_policy)
    kvserver.add_part_policy(edge_policy)
114
    if kvserver.is_backup_server():
115
116
117
118
        kvserver.init_data('data_0', 'node:_N')
        kvserver.init_data('data_0_1', 'node:_N')
        kvserver.init_data('data_0_2', 'node:_N')
        kvserver.init_data('data_0_3', 'node:_N')
119
    else:
120
121
122
123
        kvserver.init_data('data_0', 'node:_N', data_0)
        kvserver.init_data('data_0_1', 'node:_N', data_0_1)
        kvserver.init_data('data_0_2', 'node:_N', data_0_2)
        kvserver.init_data('data_0_3', 'node:_N', data_0_3)
124
    # start server
Jinjing Zhou's avatar
Jinjing Zhou committed
125
    server_state = dgl.distributed.ServerState(kv_store=kvserver, local_g=None, partition_book=None)
126
    dgl.distributed.start_server(server_id=server_id,
127
                                 ip_config='kv_ip_config.txt',
128
                                 num_servers=num_servers,
129
                                 num_clients=num_clients,
130
131
                                 server_state=server_state)

132
def start_server_mul_role(server_id, num_clients, num_servers):
133
134
135
    # Init kvserver
    kvserver = dgl.distributed.KVServer(server_id=server_id,
                                        ip_config='kv_ip_mul_config.txt',
136
                                        num_servers=num_servers,
137
138
139
                                        num_clients=num_clients)
    kvserver.add_part_policy(node_policy)
    if kvserver.is_backup_server():
140
        kvserver.init_data('data_0', 'node:_N')
141
    else:
142
        kvserver.init_data('data_0', 'node:_N', data_0)
143
144
145
146
    # start server
    server_state = dgl.distributed.ServerState(kv_store=kvserver, local_g=None, partition_book=None)
    dgl.distributed.start_server(server_id=server_id,
                                 ip_config='kv_ip_mul_config.txt',
147
                                 num_servers=num_servers,
148
149
150
                                 num_clients=num_clients,
                                 server_state=server_state)

151
def start_client(num_clients, num_servers):
Da Zheng's avatar
Da Zheng committed
152
    os.environ['DGL_DIST_MODE'] = 'distributed'
153
    # Note: connect to server first !
154
    dgl.distributed.initialize(ip_config='kv_ip_config.txt', num_servers=num_servers)
155
    # Init kvclient
156
    kvclient = dgl.distributed.KVClient(ip_config='kv_ip_config.txt', num_servers=num_servers)
157
    kvclient.map_shared_data(partition_book=gpb)
158
    assert dgl.distributed.get_num_client() == num_clients
159
160
161
    kvclient.init_data(name='data_1', 
                       shape=F.shape(data_1), 
                       dtype=F.dtype(data_1), 
162
                       part_policy=edge_policy,
163
164
165
166
                       init_func=init_zero_func)
    kvclient.init_data(name='data_2', 
                       shape=F.shape(data_2), 
                       dtype=F.dtype(data_2), 
167
                       part_policy=node_policy,
168
169
170
171
172
173
                       init_func=init_zero_func)
    
    # Test data_name_list
    name_list = kvclient.data_name_list()
    print(name_list)
    assert 'data_0' in name_list
174
175
176
    assert 'data_0_1' in name_list
    assert 'data_0_2' in name_list
    assert 'data_0_3' in name_list
177
178
179
180
181
182
183
    assert 'data_1' in name_list
    assert 'data_2' in name_list
    # Test get_meta_data
    meta = kvclient.get_data_meta('data_0')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0)
    assert shape == F.shape(data_0)
184
    assert policy.policy_str == 'node:_N'
185
186
187
188
189

    meta = kvclient.get_data_meta('data_0_1')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0_1)
    assert shape == F.shape(data_0_1)
190
    assert policy.policy_str == 'node:_N'
191
192
193
194
195

    meta = kvclient.get_data_meta('data_0_2')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0_2)
    assert shape == F.shape(data_0_2)
196
    assert policy.policy_str == 'node:_N'
197
198
199
200
201

    meta = kvclient.get_data_meta('data_0_3')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0_3)
    assert shape == F.shape(data_0_3)
202
    assert policy.policy_str == 'node:_N'
203

204
205
206
207
    meta = kvclient.get_data_meta('data_1')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_1)
    assert shape == F.shape(data_1)
208
    assert policy.policy_str == 'edge:_E'
209

210
211
212
213
    meta = kvclient.get_data_meta('data_2')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_2)
    assert shape == F.shape(data_2)
214
    assert policy.policy_str == 'node:_N'
215

216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
    # Test push and pull
    id_tensor = F.tensor([0,2,4], F.int64)
    data_tensor = F.tensor([[6.,6.],[6.,6.],[6.,6.]], F.float32)
    kvclient.push(name='data_0',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    kvclient.push(name='data_1',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    kvclient.push(name='data_2',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    res = kvclient.pull(name='data_0', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    res = kvclient.pull(name='data_1', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    res = kvclient.pull(name='data_2', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    # Register new push handler
235
236
237
    kvclient.register_push_handler('data_0', udf_push)
    kvclient.register_push_handler('data_1', udf_push)
    kvclient.register_push_handler('data_2', udf_push)
238
239
240
241
242
243
244
245
246
247
    # Test push and pull
    kvclient.push(name='data_0',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    kvclient.push(name='data_1',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    kvclient.push(name='data_2',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
248
    kvclient.barrier()
249
250
251
252
253
254
255
    data_tensor = data_tensor * data_tensor
    res = kvclient.pull(name='data_0', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    res = kvclient.pull(name='data_1', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    res = kvclient.pull(name='data_2', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
256
257
258
259
260
261

    # Test delete data
    kvclient.delete_data('data_0')
    kvclient.delete_data('data_1')
    kvclient.delete_data('data_2')

262
263
264
265
    # Register new push handler
    kvclient.init_data(name='data_3', 
                       shape=F.shape(data_2),
                       dtype=F.dtype(data_2), 
266
                       part_policy=node_policy,
267
268
269
                       init_func=init_zero_func)
    kvclient.register_push_handler('data_3', add_push)
    data_tensor = F.tensor([[6.,6.],[6.,6.],[6.,6.]], F.float32)
270
    kvclient.barrier()
271
272
273
274
275
276
277
278
279
    time.sleep(kvclient.client_id + 1)
    print("add...")
    kvclient.push(name='data_3',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    kvclient.barrier()
    res = kvclient.pull(name='data_3', id_tensor=id_tensor)
    data_tensor = data_tensor * num_clients
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
280

281
def start_client_mul_role(i, num_workers, num_servers):
Da Zheng's avatar
Da Zheng committed
282
283
    os.environ['DGL_DIST_MODE'] = 'distributed'
    # Initialize creates kvstore !
284
    dgl.distributed.initialize(ip_config='kv_ip_mul_config.txt', num_servers=num_servers, num_workers=num_workers)
Da Zheng's avatar
Da Zheng committed
285
    if i == 0: # block one trainer
286
        time.sleep(5)
Da Zheng's avatar
Da Zheng committed
287
    kvclient = dgl.distributed.kvstore.get_kvstore()
288
    kvclient.barrier()
289
    print("i: %d role: %s" % (i, kvclient.role))
290

Da Zheng's avatar
Da Zheng committed
291
292
293
294
295
296
    assert dgl.distributed.role.get_num_trainers() == 2
    assert dgl.distributed.role.get_trainer_rank() < 2
    print('trainer rank: %d, global rank: %d' % (dgl.distributed.role.get_trainer_rank(),
                                                 dgl.distributed.role.get_global_rank()))
    dgl.distributed.exit_client()

297
298
299
@unittest.skipIf(os.name == 'nt' or os.getenv('DGLBACKEND') == 'tensorflow', reason='Do not support windows and TF yet')
def test_kv_store():
    ip_config = open("kv_ip_config.txt", "w")
300
301
    num_servers = 2
    num_clients = 2
302
    ip_addr = get_local_usable_addr()
303
    ip_config.write('{}\n'.format(ip_addr))
304
    ip_config.close()
305
    ctx = mp.get_context('spawn')
306
307
    pserver_list = []
    pclient_list = []
308
    for i in range(num_servers):
309
        pserver = ctx.Process(target=start_server, args=(i, num_clients, num_servers))
310
311
        pserver.start()
        pserver_list.append(pserver)
312
    for i in range(num_clients):
313
        pclient = ctx.Process(target=start_client, args=(num_clients, num_servers))
314
315
        pclient.start()
        pclient_list.append(pclient)
316
    for i in range(num_clients):
317
        pclient_list[i].join()
318
    for i in range(num_servers):
319
        pserver_list[i].join()
320

321
322
323
324
@unittest.skipIf(os.name == 'nt' or os.getenv('DGLBACKEND') == 'tensorflow', reason='Do not support windows and TF yet')
def test_kv_multi_role():
    ip_config = open("kv_ip_mul_config.txt", "w")
    num_servers = 2
Da Zheng's avatar
Da Zheng committed
325
326
327
328
    num_trainers = 2
    num_samplers = 2
    # There are two trainer processes and each trainer process has two sampler processes.
    num_clients = num_trainers * (1 + num_samplers)
329
    ip_addr = get_local_usable_addr()
330
    ip_config.write('{}\n'.format(ip_addr))
331
332
333
334
335
    ip_config.close()
    ctx = mp.get_context('spawn')
    pserver_list = []
    pclient_list = []
    for i in range(num_servers):
336
        pserver = ctx.Process(target=start_server_mul_role, args=(i, num_clients, num_servers))
337
338
        pserver.start()
        pserver_list.append(pserver)
Da Zheng's avatar
Da Zheng committed
339
    for i in range(num_trainers):
340
        pclient = ctx.Process(target=start_client_mul_role, args=(i, num_samplers, num_servers))
341
342
        pclient.start()
        pclient_list.append(pclient)
Da Zheng's avatar
Da Zheng committed
343
    for i in range(num_trainers):
344
345
346
347
        pclient_list[i].join()
    for i in range(num_servers):
        pserver_list[i].join()

348
349
if __name__ == '__main__':
    test_partition_policy()
350
    test_kv_store()
351
    test_kv_multi_role()