test_new_kvstore.py 13.6 KB
Newer Older
1
2
3
import os
import time
import numpy as np
4
import socket
5
6
7
from scipy import sparse as spsp
import dgl
import backend as F
Da Zheng's avatar
Da Zheng committed
8
import unittest
9
from dgl.graph_index import create_graph_index
10
import multiprocessing as mp
11
12
from numpy.testing import assert_array_equal

13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
if os.name != 'nt':
    import fcntl
    import struct

def get_local_usable_addr():
    """Get local usable IP and port

    Returns
    -------
    str
        IP address, e.g., '192.168.8.12:50051'
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    try:
        # doesn't even have to be reachable
        sock.connect(('10.255.255.255', 1))
        ip_addr = sock.getsockname()[0]
    except ValueError:
        ip_addr = '127.0.0.1'
    finally:
        sock.close()
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(("", 0))
    sock.listen(1)
    port = sock.getsockname()[1]
    sock.close()

    return ip_addr + ' ' + str(port)

42
43
44
45
46
47
48
49
# Create an one-part Graph
node_map = F.tensor([0,0,0,0,0,0], F.int64)
edge_map = F.tensor([0,0,0,0,0,0,0], F.int64)
global_nid = F.tensor([0,1,2,3,4,5], F.int64)
global_eid = F.tensor([0,1,2,3,4,5,6], F.int64)

g = dgl.DGLGraph()
g.add_nodes(6)
50
51
52
53
54
55
56
g.add_edges(0, 1) # 0
g.add_edges(0, 2) # 1
g.add_edges(0, 3) # 2
g.add_edges(2, 3) # 3
g.add_edges(1, 1) # 4
g.add_edges(0, 4) # 5
g.add_edges(2, 5) # 6
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

g.ndata[dgl.NID] = global_nid
g.edata[dgl.EID] = global_eid

gpb = dgl.distributed.GraphPartitionBook(part_id=0,
                                         num_parts=1,
                                         node_map=node_map,
                                         edge_map=edge_map,
                                         part_graph=g)

node_policy = dgl.distributed.PartitionPolicy(policy_str='node',
                                              partition_book=gpb)

edge_policy = dgl.distributed.PartitionPolicy(policy_str='edge',
                                              partition_book=gpb)

data_0 = F.tensor([[1.,1.],[1.,1.],[1.,1.],[1.,1.],[1.,1.],[1.,1.]], F.float32)
74
75
76
data_0_1 = F.tensor([1.,2.,3.,4.,5.,6.], F.float32)
data_0_2 = F.tensor([1,2,3,4,5,6], F.int32)
data_0_3 = F.tensor([1,2,3,4,5,6], F.int64)
77
78
79
80
81
82
83
data_1 = F.tensor([[2.,2.],[2.,2.],[2.,2.],[2.,2.],[2.,2.],[2.,2.],[2.,2.]], F.float32)
data_2 = F.tensor([[0.,0.],[0.,0.],[0.,0.],[0.,0.],[0.,0.],[0.,0.]], F.float32)

def init_zero_func(shape, dtype):
    return F.zeros(shape, dtype, F.cpu())

def udf_push(target, name, id_tensor, data_tensor):
84
85
86
87
    target[name][id_tensor] = data_tensor * data_tensor

def add_push(target, name, id_tensor, data_tensor):
    target[name][id_tensor] += data_tensor
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102

@unittest.skipIf(os.name == 'nt' or os.getenv('DGLBACKEND') == 'tensorflow', reason='Do not support windows and TF yet')
def test_partition_policy():
    assert node_policy.policy_str == 'node'
    assert edge_policy.policy_str == 'edge'
    assert node_policy.part_id == 0
    assert edge_policy.part_id == 0
    local_nid = node_policy.to_local(F.tensor([0,1,2,3,4,5]))
    local_eid = edge_policy.to_local(F.tensor([0,1,2,3,4,5,6]))
    assert_array_equal(F.asnumpy(local_nid), F.asnumpy(F.tensor([0,1,2,3,4,5], F.int64)))
    assert_array_equal(F.asnumpy(local_eid), F.asnumpy(F.tensor([0,1,2,3,4,5,6], F.int64)))
    nid_partid = node_policy.to_partid(F.tensor([0,1,2,3,4,5], F.int64))
    eid_partid = edge_policy.to_partid(F.tensor([0,1,2,3,4,5,6], F.int64))
    assert_array_equal(F.asnumpy(nid_partid), F.asnumpy(F.tensor([0,0,0,0,0,0], F.int64)))
    assert_array_equal(F.asnumpy(eid_partid), F.asnumpy(F.tensor([0,0,0,0,0,0,0], F.int64)))
103
104
    assert node_policy.get_part_size() == len(node_map)
    assert edge_policy.get_part_size() == len(edge_map)
105

106
def start_server(server_id, num_clients, num_servers):
107
    # Init kvserver
108
109
    print("Sleep 5 seconds to test client re-connect.")
    time.sleep(5)
110
    kvserver = dgl.distributed.KVServer(server_id=server_id,
111
                                        ip_config='kv_ip_config.txt',
112
                                        num_servers=num_servers,
113
                                        num_clients=num_clients)
114
115
    kvserver.add_part_policy(node_policy)
    kvserver.add_part_policy(edge_policy)
116
117
118
119
120
121
122
123
124
125
    if kvserver.is_backup_server():
        kvserver.init_data('data_0', 'node')
        kvserver.init_data('data_0_1', 'node')
        kvserver.init_data('data_0_2', 'node')
        kvserver.init_data('data_0_3', 'node')
    else:
        kvserver.init_data('data_0', 'node', data_0)
        kvserver.init_data('data_0_1', 'node', data_0_1)
        kvserver.init_data('data_0_2', 'node', data_0_2)
        kvserver.init_data('data_0_3', 'node', data_0_3)
126
    # start server
Jinjing Zhou's avatar
Jinjing Zhou committed
127
    server_state = dgl.distributed.ServerState(kv_store=kvserver, local_g=None, partition_book=None)
128
    dgl.distributed.start_server(server_id=server_id,
129
                                 ip_config='kv_ip_config.txt',
130
                                 num_servers=num_servers,
131
                                 num_clients=num_clients,
132
133
                                 server_state=server_state)

134
def start_server_mul_role(server_id, num_clients, num_servers):
135
136
137
    # Init kvserver
    kvserver = dgl.distributed.KVServer(server_id=server_id,
                                        ip_config='kv_ip_mul_config.txt',
138
                                        num_servers=num_servers,
139
140
141
142
143
144
145
146
147
148
                                        num_clients=num_clients)
    kvserver.add_part_policy(node_policy)
    if kvserver.is_backup_server():
        kvserver.init_data('data_0', 'node')
    else:
        kvserver.init_data('data_0', 'node', data_0)
    # start server
    server_state = dgl.distributed.ServerState(kv_store=kvserver, local_g=None, partition_book=None)
    dgl.distributed.start_server(server_id=server_id,
                                 ip_config='kv_ip_mul_config.txt',
149
                                 num_servers=num_servers,
150
151
152
                                 num_clients=num_clients,
                                 server_state=server_state)

153
def start_client(num_clients, num_servers):
Da Zheng's avatar
Da Zheng committed
154
    os.environ['DGL_DIST_MODE'] = 'distributed'
155
    # Note: connect to server first !
156
    dgl.distributed.initialize(ip_config='kv_ip_config.txt', num_servers=num_servers)
157
    # Init kvclient
158
    kvclient = dgl.distributed.KVClient(ip_config='kv_ip_config.txt', num_servers=num_servers)
159
    kvclient.map_shared_data(partition_book=gpb)
160
    assert dgl.distributed.get_num_client() == num_clients
161
162
163
    kvclient.init_data(name='data_1', 
                       shape=F.shape(data_1), 
                       dtype=F.dtype(data_1), 
164
                       part_policy=edge_policy,
165
166
167
168
                       init_func=init_zero_func)
    kvclient.init_data(name='data_2', 
                       shape=F.shape(data_2), 
                       dtype=F.dtype(data_2), 
169
                       part_policy=node_policy,
170
171
172
173
174
175
                       init_func=init_zero_func)
    
    # Test data_name_list
    name_list = kvclient.data_name_list()
    print(name_list)
    assert 'data_0' in name_list
176
177
178
    assert 'data_0_1' in name_list
    assert 'data_0_2' in name_list
    assert 'data_0_3' in name_list
179
180
181
182
183
184
185
186
    assert 'data_1' in name_list
    assert 'data_2' in name_list
    # Test get_meta_data
    meta = kvclient.get_data_meta('data_0')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0)
    assert shape == F.shape(data_0)
    assert policy.policy_str == 'node'
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205

    meta = kvclient.get_data_meta('data_0_1')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0_1)
    assert shape == F.shape(data_0_1)
    assert policy.policy_str == 'node'

    meta = kvclient.get_data_meta('data_0_2')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0_2)
    assert shape == F.shape(data_0_2)
    assert policy.policy_str == 'node'

    meta = kvclient.get_data_meta('data_0_3')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0_3)
    assert shape == F.shape(data_0_3)
    assert policy.policy_str == 'node'

206
207
208
209
210
    meta = kvclient.get_data_meta('data_1')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_1)
    assert shape == F.shape(data_1)
    assert policy.policy_str == 'edge'
211

212
213
214
215
216
    meta = kvclient.get_data_meta('data_2')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_2)
    assert shape == F.shape(data_2)
    assert policy.policy_str == 'node'
217

218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
    # Test push and pull
    id_tensor = F.tensor([0,2,4], F.int64)
    data_tensor = F.tensor([[6.,6.],[6.,6.],[6.,6.]], F.float32)
    kvclient.push(name='data_0',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    kvclient.push(name='data_1',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    kvclient.push(name='data_2',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    res = kvclient.pull(name='data_0', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    res = kvclient.pull(name='data_1', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    res = kvclient.pull(name='data_2', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    # Register new push handler
237
238
239
    kvclient.register_push_handler('data_0', udf_push)
    kvclient.register_push_handler('data_1', udf_push)
    kvclient.register_push_handler('data_2', udf_push)
240
241
242
243
244
245
246
247
248
249
    # Test push and pull
    kvclient.push(name='data_0',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    kvclient.push(name='data_1',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    kvclient.push(name='data_2',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
250
    kvclient.barrier()
251
252
253
254
255
256
257
    data_tensor = data_tensor * data_tensor
    res = kvclient.pull(name='data_0', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    res = kvclient.pull(name='data_1', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    res = kvclient.pull(name='data_2', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
258
259
260
261
262
263

    # Test delete data
    kvclient.delete_data('data_0')
    kvclient.delete_data('data_1')
    kvclient.delete_data('data_2')

264
265
266
267
    # Register new push handler
    kvclient.init_data(name='data_3', 
                       shape=F.shape(data_2),
                       dtype=F.dtype(data_2), 
268
                       part_policy=node_policy,
269
270
271
                       init_func=init_zero_func)
    kvclient.register_push_handler('data_3', add_push)
    data_tensor = F.tensor([[6.,6.],[6.,6.],[6.,6.]], F.float32)
272
    kvclient.barrier()
273
274
275
276
277
278
279
280
281
    time.sleep(kvclient.client_id + 1)
    print("add...")
    kvclient.push(name='data_3',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    kvclient.barrier()
    res = kvclient.pull(name='data_3', id_tensor=id_tensor)
    data_tensor = data_tensor * num_clients
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
282

283
def start_client_mul_role(i, num_workers, num_servers):
Da Zheng's avatar
Da Zheng committed
284
285
    os.environ['DGL_DIST_MODE'] = 'distributed'
    # Initialize creates kvstore !
286
    dgl.distributed.initialize(ip_config='kv_ip_mul_config.txt', num_servers=num_servers, num_workers=num_workers)
Da Zheng's avatar
Da Zheng committed
287
    if i == 0: # block one trainer
288
        time.sleep(5)
Da Zheng's avatar
Da Zheng committed
289
    kvclient = dgl.distributed.kvstore.get_kvstore()
290
    kvclient.barrier()
291
    print("i: %d role: %s" % (i, kvclient.role))
292

Da Zheng's avatar
Da Zheng committed
293
294
295
296
297
298
    assert dgl.distributed.role.get_num_trainers() == 2
    assert dgl.distributed.role.get_trainer_rank() < 2
    print('trainer rank: %d, global rank: %d' % (dgl.distributed.role.get_trainer_rank(),
                                                 dgl.distributed.role.get_global_rank()))
    dgl.distributed.exit_client()

299
300
301
@unittest.skipIf(os.name == 'nt' or os.getenv('DGLBACKEND') == 'tensorflow', reason='Do not support windows and TF yet')
def test_kv_store():
    ip_config = open("kv_ip_config.txt", "w")
302
303
    num_servers = 2
    num_clients = 2
304
    ip_addr = get_local_usable_addr()
305
    ip_config.write('{}\n'.format(ip_addr))
306
    ip_config.close()
307
    ctx = mp.get_context('spawn')
308
309
    pserver_list = []
    pclient_list = []
310
    for i in range(num_servers):
311
        pserver = ctx.Process(target=start_server, args=(i, num_clients, num_servers))
312
313
        pserver.start()
        pserver_list.append(pserver)
314
    for i in range(num_clients):
315
        pclient = ctx.Process(target=start_client, args=(num_clients, num_servers))
316
317
        pclient.start()
        pclient_list.append(pclient)
318
    for i in range(num_clients):
319
        pclient_list[i].join()
320
    for i in range(num_servers):
321
        pserver_list[i].join()
322

323
324
325
326
@unittest.skipIf(os.name == 'nt' or os.getenv('DGLBACKEND') == 'tensorflow', reason='Do not support windows and TF yet')
def test_kv_multi_role():
    ip_config = open("kv_ip_mul_config.txt", "w")
    num_servers = 2
Da Zheng's avatar
Da Zheng committed
327
328
329
330
    num_trainers = 2
    num_samplers = 2
    # There are two trainer processes and each trainer process has two sampler processes.
    num_clients = num_trainers * (1 + num_samplers)
331
    ip_addr = get_local_usable_addr()
332
    ip_config.write('{}\n'.format(ip_addr))
333
334
335
336
337
    ip_config.close()
    ctx = mp.get_context('spawn')
    pserver_list = []
    pclient_list = []
    for i in range(num_servers):
338
        pserver = ctx.Process(target=start_server_mul_role, args=(i, num_clients, num_servers))
339
340
        pserver.start()
        pserver_list.append(pserver)
Da Zheng's avatar
Da Zheng committed
341
    for i in range(num_trainers):
342
        pclient = ctx.Process(target=start_client_mul_role, args=(i, num_samplers, num_servers))
343
344
        pclient.start()
        pclient_list.append(pclient)
Da Zheng's avatar
Da Zheng committed
345
    for i in range(num_trainers):
346
347
348
349
        pclient_list[i].join()
    for i in range(num_servers):
        pserver_list[i].join()

350
351
if __name__ == '__main__':
    test_partition_policy()
352
    test_kv_store()
353
    test_kv_multi_role()