test_new_kvstore.py 10.8 KB
Newer Older
1
2
3
import os
import time
import numpy as np
4
import socket
5
6
7
8
9
from scipy import sparse as spsp
import dgl
import backend as F
import unittest, pytest
from dgl.graph_index import create_graph_index
10
import multiprocessing as mp
11
12
from numpy.testing import assert_array_equal

13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
if os.name != 'nt':
    import fcntl
    import struct

def get_local_usable_addr():
    """Get local usable IP and port

    Returns
    -------
    str
        IP address, e.g., '192.168.8.12:50051'
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    try:
        # doesn't even have to be reachable
        sock.connect(('10.255.255.255', 1))
        ip_addr = sock.getsockname()[0]
    except ValueError:
        ip_addr = '127.0.0.1'
    finally:
        sock.close()
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(("", 0))
    sock.listen(1)
    port = sock.getsockname()[1]
    sock.close()

    return ip_addr + ' ' + str(port)

42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def create_random_graph(n):
    arr = (spsp.random(n, n, density=0.001, format='coo') != 0).astype(np.int64)
    ig = create_graph_index(arr, readonly=True)
    return dgl.DGLGraph(ig)

# Create an one-part Graph
node_map = F.tensor([0,0,0,0,0,0], F.int64)
edge_map = F.tensor([0,0,0,0,0,0,0], F.int64)
global_nid = F.tensor([0,1,2,3,4,5], F.int64)
global_eid = F.tensor([0,1,2,3,4,5,6], F.int64)

g = dgl.DGLGraph()
g.add_nodes(6)
g.add_edge(0, 1) # 0
g.add_edge(0, 2) # 1
g.add_edge(0, 3) # 2
g.add_edge(2, 3) # 3
g.add_edge(1, 1) # 4
g.add_edge(0, 4) # 5
g.add_edge(2, 5) # 6

g.ndata[dgl.NID] = global_nid
g.edata[dgl.EID] = global_eid

gpb = dgl.distributed.GraphPartitionBook(part_id=0,
                                         num_parts=1,
                                         node_map=node_map,
                                         edge_map=edge_map,
                                         part_graph=g)

node_policy = dgl.distributed.PartitionPolicy(policy_str='node',
                                              partition_book=gpb)

edge_policy = dgl.distributed.PartitionPolicy(policy_str='edge',
                                              partition_book=gpb)

data_0 = F.tensor([[1.,1.],[1.,1.],[1.,1.],[1.,1.],[1.,1.],[1.,1.]], F.float32)
79
80
81
data_0_1 = F.tensor([1.,2.,3.,4.,5.,6.], F.float32)
data_0_2 = F.tensor([1,2,3,4,5,6], F.int32)
data_0_3 = F.tensor([1,2,3,4,5,6], F.int64)
82
83
84
85
86
87
88
data_1 = F.tensor([[2.,2.],[2.,2.],[2.,2.],[2.,2.],[2.,2.],[2.,2.],[2.,2.]], F.float32)
data_2 = F.tensor([[0.,0.],[0.,0.],[0.,0.],[0.,0.],[0.,0.],[0.,0.]], F.float32)

def init_zero_func(shape, dtype):
    return F.zeros(shape, dtype, F.cpu())

def udf_push(target, name, id_tensor, data_tensor):
89
90
91
92
    target[name][id_tensor] = data_tensor * data_tensor

def add_push(target, name, id_tensor, data_tensor):
    target[name][id_tensor] += data_tensor
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110

@unittest.skipIf(os.name == 'nt' or os.getenv('DGLBACKEND') == 'tensorflow', reason='Do not support windows and TF yet')
def test_partition_policy():
    assert node_policy.policy_str == 'node'
    assert edge_policy.policy_str == 'edge'
    assert node_policy.part_id == 0
    assert edge_policy.part_id == 0
    local_nid = node_policy.to_local(F.tensor([0,1,2,3,4,5]))
    local_eid = edge_policy.to_local(F.tensor([0,1,2,3,4,5,6]))
    assert_array_equal(F.asnumpy(local_nid), F.asnumpy(F.tensor([0,1,2,3,4,5], F.int64)))
    assert_array_equal(F.asnumpy(local_eid), F.asnumpy(F.tensor([0,1,2,3,4,5,6], F.int64)))
    nid_partid = node_policy.to_partid(F.tensor([0,1,2,3,4,5], F.int64))
    eid_partid = edge_policy.to_partid(F.tensor([0,1,2,3,4,5,6], F.int64))
    assert_array_equal(F.asnumpy(nid_partid), F.asnumpy(F.tensor([0,0,0,0,0,0], F.int64)))
    assert_array_equal(F.asnumpy(eid_partid), F.asnumpy(F.tensor([0,0,0,0,0,0,0], F.int64)))
    assert node_policy.get_data_size() == len(node_map)
    assert edge_policy.get_data_size() == len(edge_map)

111
def start_server(server_id, num_clients):
112
	# Init kvserver
113
    kvserver = dgl.distributed.KVServer(server_id=server_id,
114
                                        ip_config='kv_ip_config.txt',
115
                                        num_clients=num_clients)
116
117
    kvserver.add_part_policy(node_policy)
    kvserver.add_part_policy(edge_policy)
118
119
120
121
122
123
124
125
126
127
    if kvserver.is_backup_server():
        kvserver.init_data('data_0', 'node')
        kvserver.init_data('data_0_1', 'node')
        kvserver.init_data('data_0_2', 'node')
        kvserver.init_data('data_0_3', 'node')
    else:
        kvserver.init_data('data_0', 'node', data_0)
        kvserver.init_data('data_0_1', 'node', data_0_1)
        kvserver.init_data('data_0_2', 'node', data_0_2)
        kvserver.init_data('data_0_3', 'node', data_0_3)
128
    # start server
Jinjing Zhou's avatar
Jinjing Zhou committed
129
    server_state = dgl.distributed.ServerState(kv_store=kvserver, local_g=None, partition_book=None)
130
    dgl.distributed.start_server(server_id=server_id,
131
                                 ip_config='kv_ip_config.txt',
132
                                 num_clients=num_clients,
133
134
                                 server_state=server_state)

135
def start_client(num_clients):
136
137
138
139
    # Note: connect to server first !
    dgl.distributed.connect_to_server(ip_config='kv_ip_config.txt')
    # Init kvclient
    kvclient = dgl.distributed.KVClient(ip_config='kv_ip_config.txt')
140
    assert dgl.distributed.get_num_client() == num_clients
141
142
143
    kvclient.init_data(name='data_1', 
                       shape=F.shape(data_1), 
                       dtype=F.dtype(data_1), 
144
                       part_policy=edge_policy,
145
146
147
148
                       init_func=init_zero_func)
    kvclient.init_data(name='data_2', 
                       shape=F.shape(data_2), 
                       dtype=F.dtype(data_2), 
149
                       part_policy=node_policy,
150
151
152
153
154
155
156
157
                       init_func=init_zero_func)

    kvclient.map_shared_data(partition_book=gpb)
    
    # Test data_name_list
    name_list = kvclient.data_name_list()
    print(name_list)
    assert 'data_0' in name_list
158
159
160
    assert 'data_0_1' in name_list
    assert 'data_0_2' in name_list
    assert 'data_0_3' in name_list
161
162
163
164
165
166
167
168
    assert 'data_1' in name_list
    assert 'data_2' in name_list
    # Test get_meta_data
    meta = kvclient.get_data_meta('data_0')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0)
    assert shape == F.shape(data_0)
    assert policy.policy_str == 'node'
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187

    meta = kvclient.get_data_meta('data_0_1')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0_1)
    assert shape == F.shape(data_0_1)
    assert policy.policy_str == 'node'

    meta = kvclient.get_data_meta('data_0_2')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0_2)
    assert shape == F.shape(data_0_2)
    assert policy.policy_str == 'node'

    meta = kvclient.get_data_meta('data_0_3')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0_3)
    assert shape == F.shape(data_0_3)
    assert policy.policy_str == 'node'

188
189
190
191
192
    meta = kvclient.get_data_meta('data_1')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_1)
    assert shape == F.shape(data_1)
    assert policy.policy_str == 'edge'
193

194
195
196
197
198
    meta = kvclient.get_data_meta('data_2')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_2)
    assert shape == F.shape(data_2)
    assert policy.policy_str == 'node'
199

200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
    # Test push and pull
    id_tensor = F.tensor([0,2,4], F.int64)
    data_tensor = F.tensor([[6.,6.],[6.,6.],[6.,6.]], F.float32)
    kvclient.push(name='data_0',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    kvclient.push(name='data_1',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    kvclient.push(name='data_2',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    res = kvclient.pull(name='data_0', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    res = kvclient.pull(name='data_1', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    res = kvclient.pull(name='data_2', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    # Register new push handler
219
220
221
    kvclient.register_push_handler('data_0', udf_push)
    kvclient.register_push_handler('data_1', udf_push)
    kvclient.register_push_handler('data_2', udf_push)
222
223
224
225
226
227
228
229
230
231
    # Test push and pull
    kvclient.push(name='data_0',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    kvclient.push(name='data_1',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    kvclient.push(name='data_2',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
232
    kvclient.barrier()
233
234
235
236
237
238
239
    data_tensor = data_tensor * data_tensor
    res = kvclient.pull(name='data_0', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    res = kvclient.pull(name='data_1', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    res = kvclient.pull(name='data_2', id_tensor=id_tensor)
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
240
241
242
243
    # Register new push handler
    kvclient.init_data(name='data_3', 
                       shape=F.shape(data_2),
                       dtype=F.dtype(data_2), 
244
                       part_policy=node_policy,
245
246
247
248
249
250
251
252
253
254
255
256
257
                       init_func=init_zero_func)
    kvclient.register_push_handler('data_3', add_push)
    kvclient.map_shared_data(partition_book=gpb)
    data_tensor = F.tensor([[6.,6.],[6.,6.],[6.,6.]], F.float32)
    time.sleep(kvclient.client_id + 1)
    print("add...")
    kvclient.push(name='data_3',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    kvclient.barrier()
    res = kvclient.pull(name='data_3', id_tensor=id_tensor)
    data_tensor = data_tensor * num_clients
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
258
    # clean up
259
    kvclient.barrier()
260
261
262
263
264
265
    dgl.distributed.shutdown_servers()
    dgl.distributed.finalize_client()

@unittest.skipIf(os.name == 'nt' or os.getenv('DGLBACKEND') == 'tensorflow', reason='Do not support windows and TF yet')
def test_kv_store():
    ip_config = open("kv_ip_config.txt", "w")
266
267
    num_servers = 2
    num_clients = 2
268
    ip_addr = get_local_usable_addr()
269
    ip_config.write('{} {}\n'.format(ip_addr, num_servers))
270
    ip_config.close()
271
    ctx = mp.get_context('spawn')
272
273
    pserver_list = []
    pclient_list = []
274
275
    for i in range(num_servers):
        pserver = ctx.Process(target=start_server, args=(i, num_clients))
276
277
278
        pserver.start()
        pserver_list.append(pserver)
    time.sleep(2)
279
280
    for i in range(num_clients):
        pclient = ctx.Process(target=start_client, args=(num_clients,))
281
282
        pclient.start()
        pclient_list.append(pclient)
283
    for i in range(num_clients):
284
        pclient_list[i].join()
285
    for i in range(num_servers):
286
        pserver_list[i].join()
287
288
289

if __name__ == '__main__':
    test_partition_policy()
290
    test_kv_store()