test_new_kvstore.py 13.3 KB
Newer Older
1
2
3
import os
import time
import numpy as np
4
import socket
5
6
7
from scipy import sparse as spsp
import dgl
import backend as F
Da Zheng's avatar
Da Zheng committed
8
import unittest
9
from dgl.graph_index import create_graph_index
10
import multiprocessing as mp
11
12
from numpy.testing import assert_array_equal

13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
if os.name != 'nt':
    import fcntl
    import struct

def get_local_usable_addr():
    """Get local usable IP and port

    Returns
    -------
    str
        IP address, e.g., '192.168.8.12:50051'
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    try:
        # doesn't even have to be reachable
        sock.connect(('10.255.255.255', 1))
        ip_addr = sock.getsockname()[0]
    except ValueError:
        ip_addr = '127.0.0.1'
    finally:
        sock.close()
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(("", 0))
    sock.listen(1)
    port = sock.getsockname()[1]
    sock.close()

    return ip_addr + ' ' + str(port)

42
43
44
45
46
47
48
49
# Create an one-part Graph
node_map = F.tensor([0,0,0,0,0,0], F.int64)
edge_map = F.tensor([0,0,0,0,0,0,0], F.int64)
global_nid = F.tensor([0,1,2,3,4,5], F.int64)
global_eid = F.tensor([0,1,2,3,4,5,6], F.int64)

g = dgl.DGLGraph()
g.add_nodes(6)
50
51
52
53
54
55
56
g.add_edges(0, 1) # 0
g.add_edges(0, 2) # 1
g.add_edges(0, 3) # 2
g.add_edges(2, 3) # 3
g.add_edges(1, 1) # 4
g.add_edges(0, 4) # 5
g.add_edges(2, 5) # 6
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

g.ndata[dgl.NID] = global_nid
g.edata[dgl.EID] = global_eid

gpb = dgl.distributed.GraphPartitionBook(part_id=0,
                                         num_parts=1,
                                         node_map=node_map,
                                         edge_map=edge_map,
                                         part_graph=g)

node_policy = dgl.distributed.PartitionPolicy(policy_str='node',
                                              partition_book=gpb)

edge_policy = dgl.distributed.PartitionPolicy(policy_str='edge',
                                              partition_book=gpb)

data_0 = F.tensor([[1.,1.],[1.,1.],[1.,1.],[1.,1.],[1.,1.],[1.,1.]], F.float32)
74
75
76
data_0_1 = F.tensor([1.,2.,3.,4.,5.,6.], F.float32)
data_0_2 = F.tensor([1,2,3,4,5,6], F.int32)
data_0_3 = F.tensor([1,2,3,4,5,6], F.int64)
77
78
79
80
81
82
83
data_1 = F.tensor([[2.,2.],[2.,2.],[2.,2.],[2.,2.],[2.,2.],[2.,2.],[2.,2.]], F.float32)
data_2 = F.tensor([[0.,0.],[0.,0.],[0.,0.],[0.,0.],[0.,0.],[0.,0.]], F.float32)

def init_zero_func(shape, dtype):
    return F.zeros(shape, dtype, F.cpu())

def udf_push(target, name, id_tensor, data_tensor):
84
85
86
87
    target[name][id_tensor] = data_tensor * data_tensor

def add_push(target, name, id_tensor, data_tensor):
    target[name][id_tensor] += data_tensor
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105

@unittest.skipIf(os.name == 'nt' or os.getenv('DGLBACKEND') == 'tensorflow', reason='Do not support windows and TF yet')
def test_partition_policy():
    assert node_policy.policy_str == 'node'
    assert edge_policy.policy_str == 'edge'
    assert node_policy.part_id == 0
    assert edge_policy.part_id == 0
    local_nid = node_policy.to_local(F.tensor([0,1,2,3,4,5]))
    local_eid = edge_policy.to_local(F.tensor([0,1,2,3,4,5,6]))
    assert_array_equal(F.asnumpy(local_nid), F.asnumpy(F.tensor([0,1,2,3,4,5], F.int64)))
    assert_array_equal(F.asnumpy(local_eid), F.asnumpy(F.tensor([0,1,2,3,4,5,6], F.int64)))
    nid_partid = node_policy.to_partid(F.tensor([0,1,2,3,4,5], F.int64))
    eid_partid = edge_policy.to_partid(F.tensor([0,1,2,3,4,5,6], F.int64))
    assert_array_equal(F.asnumpy(nid_partid), F.asnumpy(F.tensor([0,0,0,0,0,0], F.int64)))
    assert_array_equal(F.asnumpy(eid_partid), F.asnumpy(F.tensor([0,0,0,0,0,0,0], F.int64)))
    assert node_policy.get_data_size() == len(node_map)
    assert edge_policy.get_data_size() == len(edge_map)

106
def start_server(server_id, num_clients):
107
    # Init kvserver
108
109
    print("Sleep 5 seconds to test client re-connect.")
    time.sleep(5)
110
    kvserver = dgl.distributed.KVServer(server_id=server_id,
111
                                        ip_config='kv_ip_config.txt',
112
                                        num_clients=num_clients)
113
114
    kvserver.add_part_policy(node_policy)
    kvserver.add_part_policy(edge_policy)
115
116
117
118
119
120
121
122
123
124
    if kvserver.is_backup_server():
        kvserver.init_data('data_0', 'node')
        kvserver.init_data('data_0_1', 'node')
        kvserver.init_data('data_0_2', 'node')
        kvserver.init_data('data_0_3', 'node')
    else:
        kvserver.init_data('data_0', 'node', data_0)
        kvserver.init_data('data_0_1', 'node', data_0_1)
        kvserver.init_data('data_0_2', 'node', data_0_2)
        kvserver.init_data('data_0_3', 'node', data_0_3)
125
    # start server
Jinjing Zhou's avatar
Jinjing Zhou committed
126
    server_state = dgl.distributed.ServerState(kv_store=kvserver, local_g=None, partition_book=None)
127
    dgl.distributed.start_server(server_id=server_id,
128
                                 ip_config='kv_ip_config.txt',
129
                                 num_clients=num_clients,
130
131
                                 server_state=server_state)

132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def start_server_mul_role(server_id, num_clients):
    # Init kvserver
    kvserver = dgl.distributed.KVServer(server_id=server_id,
                                        ip_config='kv_ip_mul_config.txt',
                                        num_clients=num_clients)
    kvserver.add_part_policy(node_policy)
    if kvserver.is_backup_server():
        kvserver.init_data('data_0', 'node')
    else:
        kvserver.init_data('data_0', 'node', data_0)
    # start server
    server_state = dgl.distributed.ServerState(kv_store=kvserver, local_g=None, partition_book=None)
    dgl.distributed.start_server(server_id=server_id,
                                 ip_config='kv_ip_mul_config.txt',
                                 num_clients=num_clients,
                                 server_state=server_state)

149
def start_client(num_clients):
Da Zheng's avatar
Da Zheng committed
150
    os.environ['DGL_DIST_MODE'] = 'distributed'
151
    # Note: connect to server first !
Da Zheng's avatar
Da Zheng committed
152
    dgl.distributed.initialize(ip_config='kv_ip_config.txt')
153
154
    # Init kvclient
    kvclient = dgl.distributed.KVClient(ip_config='kv_ip_config.txt')
155
    kvclient.map_shared_data(partition_book=gpb)
156
    assert dgl.distributed.get_num_client() == num_clients
157
158
159
    kvclient.init_data(name='data_1', 
                       shape=F.shape(data_1), 
                       dtype=F.dtype(data_1), 
160
                       part_policy=edge_policy,
161
162
163
164
                       init_func=init_zero_func)
    kvclient.init_data(name='data_2', 
                       shape=F.shape(data_2), 
                       dtype=F.dtype(data_2), 
165
                       part_policy=node_policy,
166
167
168
169
170
171
                       init_func=init_zero_func)
    
    # Test data_name_list
    name_list = kvclient.data_name_list()
    print(name_list)
    assert 'data_0' in name_list
172
173
174
    assert 'data_0_1' in name_list
    assert 'data_0_2' in name_list
    assert 'data_0_3' in name_list
175
176
177
178
179
180
181
182
    assert 'data_1' in name_list
    assert 'data_2' in name_list
    # Test get_meta_data
    meta = kvclient.get_data_meta('data_0')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0)
    assert shape == F.shape(data_0)
    assert policy.policy_str == 'node'
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201

    meta = kvclient.get_data_meta('data_0_1')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0_1)
    assert shape == F.shape(data_0_1)
    assert policy.policy_str == 'node'

    meta = kvclient.get_data_meta('data_0_2')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0_2)
    assert shape == F.shape(data_0_2)
    assert policy.policy_str == 'node'

    meta = kvclient.get_data_meta('data_0_3')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0_3)
    assert shape == F.shape(data_0_3)
    assert policy.policy_str == 'node'

202
203
204
205
206
    meta = kvclient.get_data_meta('data_1')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_1)
    assert shape == F.shape(data_1)
    assert policy.policy_str == 'edge'
207

208
209
210
211
212
    meta = kvclient.get_data_meta('data_2')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_2)
    assert shape == F.shape(data_2)
    assert policy.policy_str == 'node'
213

214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
    # Test push and pull
    id_tensor = F.tensor([0,2,4], F.int64)
    data_tensor = F.tensor([[6.,6.],[6.,6.],[6.,6.]], F.float32)
    kvclient.push(name='data_0',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    kvclient.push(name='data_1',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    kvclient.push(name='data_2',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    res = kvclient.pull(name='data_0', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    res = kvclient.pull(name='data_1', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    res = kvclient.pull(name='data_2', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    # Register new push handler
233
234
235
    kvclient.register_push_handler('data_0', udf_push)
    kvclient.register_push_handler('data_1', udf_push)
    kvclient.register_push_handler('data_2', udf_push)
236
237
238
239
240
241
242
243
244
245
    # Test push and pull
    kvclient.push(name='data_0',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    kvclient.push(name='data_1',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    kvclient.push(name='data_2',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
246
    kvclient.barrier()
247
248
249
250
251
252
253
    data_tensor = data_tensor * data_tensor
    res = kvclient.pull(name='data_0', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    res = kvclient.pull(name='data_1', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    res = kvclient.pull(name='data_2', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
254
255
256
257
258
259

    # Test delete data
    kvclient.delete_data('data_0')
    kvclient.delete_data('data_1')
    kvclient.delete_data('data_2')

260
261
262
263
    # Register new push handler
    kvclient.init_data(name='data_3', 
                       shape=F.shape(data_2),
                       dtype=F.dtype(data_2), 
264
                       part_policy=node_policy,
265
266
267
                       init_func=init_zero_func)
    kvclient.register_push_handler('data_3', add_push)
    data_tensor = F.tensor([[6.,6.],[6.,6.],[6.,6.]], F.float32)
268
    kvclient.barrier()
269
270
271
272
273
274
275
276
277
    time.sleep(kvclient.client_id + 1)
    print("add...")
    kvclient.push(name='data_3',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    kvclient.barrier()
    res = kvclient.pull(name='data_3', id_tensor=id_tensor)
    data_tensor = data_tensor * num_clients
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
278

Da Zheng's avatar
Da Zheng committed
279
280
281
282
283
def start_client_mul_role(i, num_workers):
    os.environ['DGL_DIST_MODE'] = 'distributed'
    # Initialize creates kvstore !
    dgl.distributed.initialize(ip_config='kv_ip_mul_config.txt', num_workers=num_workers)
    if i == 0: # block one trainer
284
        time.sleep(5)
Da Zheng's avatar
Da Zheng committed
285
    kvclient = dgl.distributed.kvstore.get_kvstore()
286
    kvclient.barrier()
287
    print("i: %d role: %s" % (i, kvclient.role))
288

Da Zheng's avatar
Da Zheng committed
289
290
291
292
293
294
    assert dgl.distributed.role.get_num_trainers() == 2
    assert dgl.distributed.role.get_trainer_rank() < 2
    print('trainer rank: %d, global rank: %d' % (dgl.distributed.role.get_trainer_rank(),
                                                 dgl.distributed.role.get_global_rank()))
    dgl.distributed.exit_client()

295
296
297
@unittest.skipIf(os.name == 'nt' or os.getenv('DGLBACKEND') == 'tensorflow', reason='Do not support windows and TF yet')
def test_kv_store():
    ip_config = open("kv_ip_config.txt", "w")
298
299
    num_servers = 2
    num_clients = 2
300
    ip_addr = get_local_usable_addr()
301
    ip_config.write('{} {}\n'.format(ip_addr, num_servers))
302
    ip_config.close()
303
    ctx = mp.get_context('spawn')
304
305
    pserver_list = []
    pclient_list = []
306
307
    for i in range(num_servers):
        pserver = ctx.Process(target=start_server, args=(i, num_clients))
308
309
        pserver.start()
        pserver_list.append(pserver)
310
311
    for i in range(num_clients):
        pclient = ctx.Process(target=start_client, args=(num_clients,))
312
313
        pclient.start()
        pclient_list.append(pclient)
314
    for i in range(num_clients):
315
        pclient_list[i].join()
316
    for i in range(num_servers):
317
        pserver_list[i].join()
318

319
320
321
322
@unittest.skipIf(os.name == 'nt' or os.getenv('DGLBACKEND') == 'tensorflow', reason='Do not support windows and TF yet')
def test_kv_multi_role():
    ip_config = open("kv_ip_mul_config.txt", "w")
    num_servers = 2
Da Zheng's avatar
Da Zheng committed
323
324
325
326
    num_trainers = 2
    num_samplers = 2
    # There are two trainer processes and each trainer process has two sampler processes.
    num_clients = num_trainers * (1 + num_samplers)
327
328
329
330
331
332
333
334
335
336
    ip_addr = get_local_usable_addr()
    ip_config.write('{} {}\n'.format(ip_addr, num_servers))
    ip_config.close()
    ctx = mp.get_context('spawn')
    pserver_list = []
    pclient_list = []
    for i in range(num_servers):
        pserver = ctx.Process(target=start_server_mul_role, args=(i, num_clients))
        pserver.start()
        pserver_list.append(pserver)
Da Zheng's avatar
Da Zheng committed
337
338
    for i in range(num_trainers):
        pclient = ctx.Process(target=start_client_mul_role, args=(i, num_samplers))
339
340
        pclient.start()
        pclient_list.append(pclient)
Da Zheng's avatar
Da Zheng committed
341
    for i in range(num_trainers):
342
343
344
345
        pclient_list[i].join()
    for i in range(num_servers):
        pserver_list[i].join()

346
347
if __name__ == '__main__':
    test_partition_policy()
348
    test_kv_store()
349
    test_kv_multi_role()