test_new_kvstore.py 12.5 KB
Newer Older
1
import multiprocessing as mp
2
import os
3
import socket
4
import time
5
6
7
import unittest

import backend as F
8
import numpy as np
9
from numpy.testing import assert_array_equal
10
from scipy import sparse as spsp
11
12
from utils import generate_ip_config, reset_envs

13
14
15
import dgl
from dgl.graph_index import create_graph_index

16
if os.name != "nt":
17
18
19
    import fcntl
    import struct

20
# Create an one-part Graph
21
22
23
24
node_map = F.tensor([0, 0, 0, 0, 0, 0], F.int64)
edge_map = F.tensor([0, 0, 0, 0, 0, 0, 0], F.int64)
global_nid = F.tensor([0, 1, 2, 3, 4, 5], F.int64)
global_eid = F.tensor([0, 1, 2, 3, 4, 5, 6], F.int64)
25
26
27

g = dgl.DGLGraph()
g.add_nodes(6)
28
29
30
31
32
33
34
g.add_edges(0, 1)  # 0
g.add_edges(0, 2)  # 1
g.add_edges(0, 3)  # 2
g.add_edges(2, 3)  # 3
g.add_edges(1, 1)  # 4
g.add_edges(0, 4)  # 5
g.add_edges(2, 5)  # 6
35
36
37
38

g.ndata[dgl.NID] = global_nid
g.edata[dgl.EID] = global_eid

39
40
41
42
43
44
45
gpb = dgl.distributed.graph_partition_book.BasicPartitionBook(
    part_id=0, num_parts=1, node_map=node_map, edge_map=edge_map, part_graph=g
)

node_policy = dgl.distributed.PartitionPolicy(
    policy_str="node:_N", partition_book=gpb
)
46

47
48
49
edge_policy = dgl.distributed.PartitionPolicy(
    policy_str="edge:_E", partition_book=gpb
)
50

51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
data_0 = F.tensor(
    [[1.0, 1.0], [1.0, 1.0], [1.0, 1.0], [1.0, 1.0], [1.0, 1.0], [1.0, 1.0]],
    F.float32,
)
data_0_1 = F.tensor([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], F.float32)
data_0_2 = F.tensor([1, 2, 3, 4, 5, 6], F.int32)
data_0_3 = F.tensor([1, 2, 3, 4, 5, 6], F.int64)
data_1 = F.tensor(
    [
        [2.0, 2.0],
        [2.0, 2.0],
        [2.0, 2.0],
        [2.0, 2.0],
        [2.0, 2.0],
        [2.0, 2.0],
        [2.0, 2.0],
    ],
    F.float32,
)
data_2 = F.tensor(
    [[0.0, 0.0], [0.0, 0.0], [0.0, 0.0], [0.0, 0.0], [0.0, 0.0], [0.0, 0.0]],
    F.float32,
)
74
75
76
77
78


def init_zero_func(shape, dtype):
    return F.zeros(shape, dtype, F.cpu())

79

80
def udf_push(target, name, id_tensor, data_tensor):
81
82
    target[name][id_tensor] = data_tensor * data_tensor

83

84
85
def add_push(target, name, id_tensor, data_tensor):
    target[name][id_tensor] += data_tensor
86

87
88
89
90
91

@unittest.skipIf(
    os.name == "nt" or os.getenv("DGLBACKEND") == "tensorflow",
    reason="Do not support windows and TF yet",
)
92
93
94
def test_partition_policy():
    assert node_policy.part_id == 0
    assert edge_policy.part_id == 0
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
    local_nid = node_policy.to_local(F.tensor([0, 1, 2, 3, 4, 5]))
    local_eid = edge_policy.to_local(F.tensor([0, 1, 2, 3, 4, 5, 6]))
    assert_array_equal(
        F.asnumpy(local_nid), F.asnumpy(F.tensor([0, 1, 2, 3, 4, 5], F.int64))
    )
    assert_array_equal(
        F.asnumpy(local_eid),
        F.asnumpy(F.tensor([0, 1, 2, 3, 4, 5, 6], F.int64)),
    )
    nid_partid = node_policy.to_partid(F.tensor([0, 1, 2, 3, 4, 5], F.int64))
    eid_partid = edge_policy.to_partid(F.tensor([0, 1, 2, 3, 4, 5, 6], F.int64))
    assert_array_equal(
        F.asnumpy(nid_partid), F.asnumpy(F.tensor([0, 0, 0, 0, 0, 0], F.int64))
    )
    assert_array_equal(
        F.asnumpy(eid_partid),
        F.asnumpy(F.tensor([0, 0, 0, 0, 0, 0, 0], F.int64)),
    )
113
114
    assert node_policy.get_part_size() == len(node_map)
    assert edge_policy.get_part_size() == len(edge_map)
115

116

117
def start_server(server_id, num_clients, num_servers):
118
    # Init kvserver
119
120
    print("Sleep 5 seconds to test client re-connect.")
    time.sleep(5)
121
122
123
124
125
126
    kvserver = dgl.distributed.KVServer(
        server_id=server_id,
        ip_config="kv_ip_config.txt",
        num_servers=num_servers,
        num_clients=num_clients,
    )
127
128
    kvserver.add_part_policy(node_policy)
    kvserver.add_part_policy(edge_policy)
129
    if kvserver.is_backup_server():
130
131
132
133
        kvserver.init_data("data_0", "node:_N")
        kvserver.init_data("data_0_1", "node:_N")
        kvserver.init_data("data_0_2", "node:_N")
        kvserver.init_data("data_0_3", "node:_N")
134
    else:
135
136
137
138
        kvserver.init_data("data_0", "node:_N", data_0)
        kvserver.init_data("data_0_1", "node:_N", data_0_1)
        kvserver.init_data("data_0_2", "node:_N", data_0_2)
        kvserver.init_data("data_0_3", "node:_N", data_0_3)
139
    # start server
140
141
142
143
144
145
146
147
148
149
150
    server_state = dgl.distributed.ServerState(
        kv_store=kvserver, local_g=None, partition_book=None
    )
    dgl.distributed.start_server(
        server_id=server_id,
        ip_config="kv_ip_config.txt",
        num_servers=num_servers,
        num_clients=num_clients,
        server_state=server_state,
    )

151

152
def start_server_mul_role(server_id, num_clients, num_servers):
153
    # Init kvserver
154
155
156
157
158
159
    kvserver = dgl.distributed.KVServer(
        server_id=server_id,
        ip_config="kv_ip_mul_config.txt",
        num_servers=num_servers,
        num_clients=num_clients,
    )
160
161
    kvserver.add_part_policy(node_policy)
    if kvserver.is_backup_server():
162
        kvserver.init_data("data_0", "node:_N")
163
    else:
164
        kvserver.init_data("data_0", "node:_N", data_0)
165
    # start server
166
167
168
169
170
171
172
173
174
175
176
    server_state = dgl.distributed.ServerState(
        kv_store=kvserver, local_g=None, partition_book=None
    )
    dgl.distributed.start_server(
        server_id=server_id,
        ip_config="kv_ip_mul_config.txt",
        num_servers=num_servers,
        num_clients=num_clients,
        server_state=server_state,
    )

177

178
def start_client(num_clients, num_servers):
179
    os.environ["DGL_DIST_MODE"] = "distributed"
180
    # Note: connect to server first !
181
    dgl.distributed.initialize(ip_config="kv_ip_config.txt")
182
    # Init kvclient
183
184
185
    kvclient = dgl.distributed.KVClient(
        ip_config="kv_ip_config.txt", num_servers=num_servers
    )
186
    kvclient.map_shared_data(partition_book=gpb)
187
    assert dgl.distributed.get_num_client() == num_clients
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
    kvclient.init_data(
        name="data_1",
        shape=F.shape(data_1),
        dtype=F.dtype(data_1),
        part_policy=edge_policy,
        init_func=init_zero_func,
    )
    kvclient.init_data(
        name="data_2",
        shape=F.shape(data_2),
        dtype=F.dtype(data_2),
        part_policy=node_policy,
        init_func=init_zero_func,
    )

203
204
205
    # Test data_name_list
    name_list = kvclient.data_name_list()
    print(name_list)
206
207
208
209
210
211
    assert "data_0" in name_list
    assert "data_0_1" in name_list
    assert "data_0_2" in name_list
    assert "data_0_3" in name_list
    assert "data_1" in name_list
    assert "data_2" in name_list
212
    # Test get_meta_data
213
    meta = kvclient.get_data_meta("data_0")
214
215
216
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0)
    assert shape == F.shape(data_0)
217
    assert policy.policy_str == "node:_N"
218

219
    meta = kvclient.get_data_meta("data_0_1")
220
221
222
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0_1)
    assert shape == F.shape(data_0_1)
223
    assert policy.policy_str == "node:_N"
224

225
    meta = kvclient.get_data_meta("data_0_2")
226
227
228
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0_2)
    assert shape == F.shape(data_0_2)
229
    assert policy.policy_str == "node:_N"
230

231
    meta = kvclient.get_data_meta("data_0_3")
232
233
234
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0_3)
    assert shape == F.shape(data_0_3)
235
    assert policy.policy_str == "node:_N"
236

237
    meta = kvclient.get_data_meta("data_1")
238
239
240
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_1)
    assert shape == F.shape(data_1)
241
    assert policy.policy_str == "edge:_E"
242

243
    meta = kvclient.get_data_meta("data_2")
244
245
246
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_2)
    assert shape == F.shape(data_2)
247
    assert policy.policy_str == "node:_N"
248

249
    # Test push and pull
250
251
252
253
254
255
    id_tensor = F.tensor([0, 2, 4], F.int64)
    data_tensor = F.tensor([[6.0, 6.0], [6.0, 6.0], [6.0, 6.0]], F.float32)
    kvclient.push(name="data_0", id_tensor=id_tensor, data_tensor=data_tensor)
    kvclient.push(name="data_1", id_tensor=id_tensor, data_tensor=data_tensor)
    kvclient.push(name="data_2", id_tensor=id_tensor, data_tensor=data_tensor)
    res = kvclient.pull(name="data_0", id_tensor=id_tensor)
256
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
257
    res = kvclient.pull(name="data_1", id_tensor=id_tensor)
258
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
259
    res = kvclient.pull(name="data_2", id_tensor=id_tensor)
260
261
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    # Register new push handler
262
263
264
    kvclient.register_push_handler("data_0", udf_push)
    kvclient.register_push_handler("data_1", udf_push)
    kvclient.register_push_handler("data_2", udf_push)
265
    # Test push and pull
266
267
268
    kvclient.push(name="data_0", id_tensor=id_tensor, data_tensor=data_tensor)
    kvclient.push(name="data_1", id_tensor=id_tensor, data_tensor=data_tensor)
    kvclient.push(name="data_2", id_tensor=id_tensor, data_tensor=data_tensor)
269
    kvclient.barrier()
270
    data_tensor = data_tensor * data_tensor
271
    res = kvclient.pull(name="data_0", id_tensor=id_tensor)
272
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
273
    res = kvclient.pull(name="data_1", id_tensor=id_tensor)
274
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
275
    res = kvclient.pull(name="data_2", id_tensor=id_tensor)
276
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
277
278

    # Test delete data
279
280
281
    kvclient.delete_data("data_0")
    kvclient.delete_data("data_1")
    kvclient.delete_data("data_2")
282

283
    # Register new push handler
284
285
286
287
288
289
290
291
292
    kvclient.init_data(
        name="data_3",
        shape=F.shape(data_2),
        dtype=F.dtype(data_2),
        part_policy=node_policy,
        init_func=init_zero_func,
    )
    kvclient.register_push_handler("data_3", add_push)
    data_tensor = F.tensor([[6.0, 6.0], [6.0, 6.0], [6.0, 6.0]], F.float32)
293
    kvclient.barrier()
294
295
    time.sleep(kvclient.client_id + 1)
    print("add...")
296
    kvclient.push(name="data_3", id_tensor=id_tensor, data_tensor=data_tensor)
297
    kvclient.barrier()
298
    res = kvclient.pull(name="data_3", id_tensor=id_tensor)
299
300
    data_tensor = data_tensor * num_clients
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
301

302

303
def start_client_mul_role(i):
304
    os.environ["DGL_DIST_MODE"] = "distributed"
Da Zheng's avatar
Da Zheng committed
305
    # Initialize creates kvstore !
306
307
    dgl.distributed.initialize(ip_config="kv_ip_mul_config.txt")
    if i == 0:  # block one trainer
308
        time.sleep(5)
Da Zheng's avatar
Da Zheng committed
309
    kvclient = dgl.distributed.kvstore.get_kvstore()
310
    kvclient.barrier()
311
    print("i: %d role: %s" % (i, kvclient.role))
312

Da Zheng's avatar
Da Zheng committed
313
314
    assert dgl.distributed.role.get_num_trainers() == 2
    assert dgl.distributed.role.get_trainer_rank() < 2
315
316
317
318
319
320
321
    print(
        "trainer rank: %d, global rank: %d"
        % (
            dgl.distributed.role.get_trainer_rank(),
            dgl.distributed.role.get_global_rank(),
        )
    )
Da Zheng's avatar
Da Zheng committed
322
323
    dgl.distributed.exit_client()

324
325
326
327
328

@unittest.skipIf(
    os.name == "nt" or os.getenv("DGLBACKEND") == "tensorflow",
    reason="Do not support windows and TF yet",
)
329
def test_kv_store():
330
    reset_envs()
331
332
    num_servers = 2
    num_clients = 2
333
    generate_ip_config("kv_ip_config.txt", 1, num_servers)
334
    ctx = mp.get_context("spawn")
335
336
    pserver_list = []
    pclient_list = []
337
    os.environ["DGL_NUM_SERVER"] = str(num_servers)
338
    for i in range(num_servers):
339
340
341
        pserver = ctx.Process(
            target=start_server, args=(i, num_clients, num_servers)
        )
342
343
        pserver.start()
        pserver_list.append(pserver)
344
    for i in range(num_clients):
345
346
347
        pclient = ctx.Process(
            target=start_client, args=(num_clients, num_servers)
        )
348
349
        pclient.start()
        pclient_list.append(pclient)
350
    for i in range(num_clients):
351
        pclient_list[i].join()
352
    for i in range(num_servers):
353
        pserver_list[i].join()
354

355
356
357
358
359

@unittest.skipIf(
    os.name == "nt" or os.getenv("DGLBACKEND") == "tensorflow",
    reason="Do not support windows and TF yet",
)
360
def test_kv_multi_role():
361
    reset_envs()
362
    num_servers = 2
Da Zheng's avatar
Da Zheng committed
363
364
    num_trainers = 2
    num_samplers = 2
365
    generate_ip_config("kv_ip_mul_config.txt", 1, num_servers)
Da Zheng's avatar
Da Zheng committed
366
367
    # There are two trainer processes and each trainer process has two sampler processes.
    num_clients = num_trainers * (1 + num_samplers)
368
    ctx = mp.get_context("spawn")
369
370
    pserver_list = []
    pclient_list = []
371
372
    os.environ["DGL_NUM_SAMPLER"] = str(num_samplers)
    os.environ["DGL_NUM_SERVER"] = str(num_servers)
373
    for i in range(num_servers):
374
375
376
        pserver = ctx.Process(
            target=start_server_mul_role, args=(i, num_clients, num_servers)
        )
377
378
        pserver.start()
        pserver_list.append(pserver)
Da Zheng's avatar
Da Zheng committed
379
    for i in range(num_trainers):
380
        pclient = ctx.Process(target=start_client_mul_role, args=(i,))
381
382
        pclient.start()
        pclient_list.append(pclient)
Da Zheng's avatar
Da Zheng committed
383
    for i in range(num_trainers):
384
385
386
387
        pclient_list[i].join()
    for i in range(num_servers):
        pserver_list[i].join()

388
389

if __name__ == "__main__":
390
    test_partition_policy()
391
    test_kv_store()
392
    test_kv_multi_role()