test_new_kvstore.py 13 KB
Newer Older
1
2
3
import os
import time
import numpy as np
4
import socket
5
6
7
from scipy import sparse as spsp
import dgl
import backend as F
Da Zheng's avatar
Da Zheng committed
8
import unittest
9
from dgl.graph_index import create_graph_index
10
import multiprocessing as mp
11
from numpy.testing import assert_array_equal
12
from utils import generate_ip_config, reset_envs
13

14
15
16
17
if os.name != 'nt':
    import fcntl
    import struct

18
19
20
21
22
23
24
25
# Create an one-part Graph
node_map = F.tensor([0,0,0,0,0,0], F.int64)
edge_map = F.tensor([0,0,0,0,0,0,0], F.int64)
global_nid = F.tensor([0,1,2,3,4,5], F.int64)
global_eid = F.tensor([0,1,2,3,4,5,6], F.int64)

g = dgl.DGLGraph()
g.add_nodes(6)
26
27
28
29
30
31
32
g.add_edges(0, 1) # 0
g.add_edges(0, 2) # 1
g.add_edges(0, 3) # 2
g.add_edges(2, 3) # 3
g.add_edges(1, 1) # 4
g.add_edges(0, 4) # 5
g.add_edges(2, 5) # 6
33
34
35
36

g.ndata[dgl.NID] = global_nid
g.edata[dgl.EID] = global_eid

37
38
39
40
41
gpb = dgl.distributed.graph_partition_book.BasicPartitionBook(part_id=0,
                                                              num_parts=1,
                                                              node_map=node_map,
                                                              edge_map=edge_map,
                                                              part_graph=g)
42

43
node_policy = dgl.distributed.PartitionPolicy(policy_str='node:_N',
44
45
                                              partition_book=gpb)

46
edge_policy = dgl.distributed.PartitionPolicy(policy_str='edge:_E',
47
48
49
                                              partition_book=gpb)

data_0 = F.tensor([[1.,1.],[1.,1.],[1.,1.],[1.,1.],[1.,1.],[1.,1.]], F.float32)
50
51
52
data_0_1 = F.tensor([1.,2.,3.,4.,5.,6.], F.float32)
data_0_2 = F.tensor([1,2,3,4,5,6], F.int32)
data_0_3 = F.tensor([1,2,3,4,5,6], F.int64)
53
54
55
56
57
58
59
data_1 = F.tensor([[2.,2.],[2.,2.],[2.,2.],[2.,2.],[2.,2.],[2.,2.],[2.,2.]], F.float32)
data_2 = F.tensor([[0.,0.],[0.,0.],[0.,0.],[0.,0.],[0.,0.],[0.,0.]], F.float32)

def init_zero_func(shape, dtype):
    return F.zeros(shape, dtype, F.cpu())

def udf_push(target, name, id_tensor, data_tensor):
60
61
62
63
    target[name][id_tensor] = data_tensor * data_tensor

def add_push(target, name, id_tensor, data_tensor):
    target[name][id_tensor] += data_tensor
64
65
66
67
68
69
70
71
72
73
74
75
76

@unittest.skipIf(os.name == 'nt' or os.getenv('DGLBACKEND') == 'tensorflow', reason='Do not support windows and TF yet')
def test_partition_policy():
    assert node_policy.part_id == 0
    assert edge_policy.part_id == 0
    local_nid = node_policy.to_local(F.tensor([0,1,2,3,4,5]))
    local_eid = edge_policy.to_local(F.tensor([0,1,2,3,4,5,6]))
    assert_array_equal(F.asnumpy(local_nid), F.asnumpy(F.tensor([0,1,2,3,4,5], F.int64)))
    assert_array_equal(F.asnumpy(local_eid), F.asnumpy(F.tensor([0,1,2,3,4,5,6], F.int64)))
    nid_partid = node_policy.to_partid(F.tensor([0,1,2,3,4,5], F.int64))
    eid_partid = edge_policy.to_partid(F.tensor([0,1,2,3,4,5,6], F.int64))
    assert_array_equal(F.asnumpy(nid_partid), F.asnumpy(F.tensor([0,0,0,0,0,0], F.int64)))
    assert_array_equal(F.asnumpy(eid_partid), F.asnumpy(F.tensor([0,0,0,0,0,0,0], F.int64)))
77
78
    assert node_policy.get_part_size() == len(node_map)
    assert edge_policy.get_part_size() == len(edge_map)
79

80
def start_server(server_id, num_clients, num_servers):
81
    # Init kvserver
82
83
    print("Sleep 5 seconds to test client re-connect.")
    time.sleep(5)
84
    kvserver = dgl.distributed.KVServer(server_id=server_id,
85
                                        ip_config='kv_ip_config.txt',
86
                                        num_servers=num_servers,
87
                                        num_clients=num_clients)
88
89
    kvserver.add_part_policy(node_policy)
    kvserver.add_part_policy(edge_policy)
90
    if kvserver.is_backup_server():
91
92
93
94
        kvserver.init_data('data_0', 'node:_N')
        kvserver.init_data('data_0_1', 'node:_N')
        kvserver.init_data('data_0_2', 'node:_N')
        kvserver.init_data('data_0_3', 'node:_N')
95
    else:
96
97
98
99
        kvserver.init_data('data_0', 'node:_N', data_0)
        kvserver.init_data('data_0_1', 'node:_N', data_0_1)
        kvserver.init_data('data_0_2', 'node:_N', data_0_2)
        kvserver.init_data('data_0_3', 'node:_N', data_0_3)
100
    # start server
Jinjing Zhou's avatar
Jinjing Zhou committed
101
    server_state = dgl.distributed.ServerState(kv_store=kvserver, local_g=None, partition_book=None)
102
    dgl.distributed.start_server(server_id=server_id,
103
                                 ip_config='kv_ip_config.txt',
104
                                 num_servers=num_servers,
105
                                 num_clients=num_clients,
106
107
                                 server_state=server_state)

108
def start_server_mul_role(server_id, num_clients, num_servers):
109
110
111
    # Init kvserver
    kvserver = dgl.distributed.KVServer(server_id=server_id,
                                        ip_config='kv_ip_mul_config.txt',
112
                                        num_servers=num_servers,
113
114
115
                                        num_clients=num_clients)
    kvserver.add_part_policy(node_policy)
    if kvserver.is_backup_server():
116
        kvserver.init_data('data_0', 'node:_N')
117
    else:
118
        kvserver.init_data('data_0', 'node:_N', data_0)
119
120
121
122
    # start server
    server_state = dgl.distributed.ServerState(kv_store=kvserver, local_g=None, partition_book=None)
    dgl.distributed.start_server(server_id=server_id,
                                 ip_config='kv_ip_mul_config.txt',
123
                                 num_servers=num_servers,
124
125
126
                                 num_clients=num_clients,
                                 server_state=server_state)

127
def start_client(num_clients, num_servers):
Da Zheng's avatar
Da Zheng committed
128
    os.environ['DGL_DIST_MODE'] = 'distributed'
129
    # Note: connect to server first !
130
    dgl.distributed.initialize(ip_config='kv_ip_config.txt')
131
    # Init kvclient
132
    kvclient = dgl.distributed.KVClient(ip_config='kv_ip_config.txt', num_servers=num_servers)
133
    kvclient.map_shared_data(partition_book=gpb)
134
    assert dgl.distributed.get_num_client() == num_clients
135
136
137
    kvclient.init_data(name='data_1', 
                       shape=F.shape(data_1), 
                       dtype=F.dtype(data_1), 
138
                       part_policy=edge_policy,
139
140
141
142
                       init_func=init_zero_func)
    kvclient.init_data(name='data_2', 
                       shape=F.shape(data_2), 
                       dtype=F.dtype(data_2), 
143
                       part_policy=node_policy,
144
145
146
147
148
149
                       init_func=init_zero_func)
    
    # Test data_name_list
    name_list = kvclient.data_name_list()
    print(name_list)
    assert 'data_0' in name_list
150
151
152
    assert 'data_0_1' in name_list
    assert 'data_0_2' in name_list
    assert 'data_0_3' in name_list
153
154
155
156
157
158
159
    assert 'data_1' in name_list
    assert 'data_2' in name_list
    # Test get_meta_data
    meta = kvclient.get_data_meta('data_0')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0)
    assert shape == F.shape(data_0)
160
    assert policy.policy_str == 'node:_N'
161
162
163
164
165

    meta = kvclient.get_data_meta('data_0_1')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0_1)
    assert shape == F.shape(data_0_1)
166
    assert policy.policy_str == 'node:_N'
167
168
169
170
171

    meta = kvclient.get_data_meta('data_0_2')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0_2)
    assert shape == F.shape(data_0_2)
172
    assert policy.policy_str == 'node:_N'
173
174
175
176
177

    meta = kvclient.get_data_meta('data_0_3')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0_3)
    assert shape == F.shape(data_0_3)
178
    assert policy.policy_str == 'node:_N'
179

180
181
182
183
    meta = kvclient.get_data_meta('data_1')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_1)
    assert shape == F.shape(data_1)
184
    assert policy.policy_str == 'edge:_E'
185

186
187
188
189
    meta = kvclient.get_data_meta('data_2')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_2)
    assert shape == F.shape(data_2)
190
    assert policy.policy_str == 'node:_N'
191

192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
    # Test push and pull
    id_tensor = F.tensor([0,2,4], F.int64)
    data_tensor = F.tensor([[6.,6.],[6.,6.],[6.,6.]], F.float32)
    kvclient.push(name='data_0',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    kvclient.push(name='data_1',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    kvclient.push(name='data_2',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    res = kvclient.pull(name='data_0', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    res = kvclient.pull(name='data_1', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    res = kvclient.pull(name='data_2', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    # Register new push handler
211
212
213
    kvclient.register_push_handler('data_0', udf_push)
    kvclient.register_push_handler('data_1', udf_push)
    kvclient.register_push_handler('data_2', udf_push)
214
215
216
217
218
219
220
221
222
223
    # Test push and pull
    kvclient.push(name='data_0',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    kvclient.push(name='data_1',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    kvclient.push(name='data_2',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
224
    kvclient.barrier()
225
226
227
228
229
230
231
    data_tensor = data_tensor * data_tensor
    res = kvclient.pull(name='data_0', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    res = kvclient.pull(name='data_1', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    res = kvclient.pull(name='data_2', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
232
233
234
235
236
237

    # Test delete data
    kvclient.delete_data('data_0')
    kvclient.delete_data('data_1')
    kvclient.delete_data('data_2')

238
239
240
241
    # Register new push handler
    kvclient.init_data(name='data_3', 
                       shape=F.shape(data_2),
                       dtype=F.dtype(data_2), 
242
                       part_policy=node_policy,
243
244
245
                       init_func=init_zero_func)
    kvclient.register_push_handler('data_3', add_push)
    data_tensor = F.tensor([[6.,6.],[6.,6.],[6.,6.]], F.float32)
246
    kvclient.barrier()
247
248
249
250
251
252
253
254
255
    time.sleep(kvclient.client_id + 1)
    print("add...")
    kvclient.push(name='data_3',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    kvclient.barrier()
    res = kvclient.pull(name='data_3', id_tensor=id_tensor)
    data_tensor = data_tensor * num_clients
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
256

257
def start_client_mul_role(i):
Da Zheng's avatar
Da Zheng committed
258
259
    os.environ['DGL_DIST_MODE'] = 'distributed'
    # Initialize creates kvstore !
260
    dgl.distributed.initialize(ip_config='kv_ip_mul_config.txt')
Da Zheng's avatar
Da Zheng committed
261
    if i == 0: # block one trainer
262
        time.sleep(5)
Da Zheng's avatar
Da Zheng committed
263
    kvclient = dgl.distributed.kvstore.get_kvstore()
264
    kvclient.barrier()
265
    print("i: %d role: %s" % (i, kvclient.role))
266

Da Zheng's avatar
Da Zheng committed
267
268
269
270
271
272
    assert dgl.distributed.role.get_num_trainers() == 2
    assert dgl.distributed.role.get_trainer_rank() < 2
    print('trainer rank: %d, global rank: %d' % (dgl.distributed.role.get_trainer_rank(),
                                                 dgl.distributed.role.get_global_rank()))
    dgl.distributed.exit_client()

273
274
@unittest.skipIf(os.name == 'nt' or os.getenv('DGLBACKEND') == 'tensorflow', reason='Do not support windows and TF yet')
def test_kv_store():
275
    reset_envs()
276
277
    num_servers = 2
    num_clients = 2
278
    generate_ip_config("kv_ip_config.txt", 1, num_servers)
279
    ctx = mp.get_context('spawn')
280
281
    pserver_list = []
    pclient_list = []
282
    os.environ['DGL_NUM_SERVER'] = str(num_servers)
283
    for i in range(num_servers):
284
        pserver = ctx.Process(target=start_server, args=(i, num_clients, num_servers))
285
286
        pserver.start()
        pserver_list.append(pserver)
287
    for i in range(num_clients):
288
        pclient = ctx.Process(target=start_client, args=(num_clients, num_servers))
289
290
        pclient.start()
        pclient_list.append(pclient)
291
    for i in range(num_clients):
292
        pclient_list[i].join()
293
    for i in range(num_servers):
294
        pserver_list[i].join()
295

296
297
@unittest.skipIf(os.name == 'nt' or os.getenv('DGLBACKEND') == 'tensorflow', reason='Do not support windows and TF yet')
def test_kv_multi_role():
298
    reset_envs()
299
    num_servers = 2
Da Zheng's avatar
Da Zheng committed
300
301
    num_trainers = 2
    num_samplers = 2
302
    generate_ip_config("kv_ip_mul_config.txt", 1, num_servers)
Da Zheng's avatar
Da Zheng committed
303
304
    # There are two trainer processes and each trainer process has two sampler processes.
    num_clients = num_trainers * (1 + num_samplers)
305
306
307
    ctx = mp.get_context('spawn')
    pserver_list = []
    pclient_list = []
308
309
    os.environ['DGL_NUM_SAMPLER'] = str(num_samplers)
    os.environ['DGL_NUM_SERVER'] = str(num_servers)
310
    for i in range(num_servers):
311
        pserver = ctx.Process(target=start_server_mul_role, args=(i, num_clients, num_servers))
312
313
        pserver.start()
        pserver_list.append(pserver)
Da Zheng's avatar
Da Zheng committed
314
    for i in range(num_trainers):
315
        pclient = ctx.Process(target=start_client_mul_role, args=(i,))
316
317
        pclient.start()
        pclient_list.append(pclient)
Da Zheng's avatar
Da Zheng committed
318
    for i in range(num_trainers):
319
320
321
322
        pclient_list[i].join()
    for i in range(num_servers):
        pserver_list[i].join()

323
324
if __name__ == '__main__':
    test_partition_policy()
325
    test_kv_store()
326
    test_kv_multi_role()