test_new_kvstore.py 12.6 KB
Newer Older
1
import multiprocessing as mp
2
import os
3
import socket
4
import time
5
6
7
import unittest

import backend as F
8
import numpy as np
9
from numpy.testing import assert_array_equal
10
from scipy import sparse as spsp
11
12
from utils import generate_ip_config, reset_envs

13
14
15
import dgl
from dgl.graph_index import create_graph_index

16
if os.name != "nt":
17
18
19
    import fcntl
    import struct

20
# Create an one-part Graph
21
22
node_map = {'_N': F.tensor([[0, 6]], F.int64)}
edge_map = {('_N','_E','_N'): F.tensor([[0, 7]], F.int64)}
23
24
global_nid = F.tensor([0, 1, 2, 3, 4, 5], F.int64)
global_eid = F.tensor([0, 1, 2, 3, 4, 5, 6], F.int64)
25
26
27

g = dgl.DGLGraph()
g.add_nodes(6)
28
29
30
31
32
33
34
g.add_edges(0, 1)  # 0
g.add_edges(0, 2)  # 1
g.add_edges(0, 3)  # 2
g.add_edges(2, 3)  # 3
g.add_edges(1, 1)  # 4
g.add_edges(0, 4)  # 5
g.add_edges(2, 5)  # 6
35
36
37
38

g.ndata[dgl.NID] = global_nid
g.edata[dgl.EID] = global_eid

39
40
41
42
gpb = dgl.distributed.graph_partition_book.RangePartitionBook(
    part_id=0, num_parts=1, node_map=node_map, edge_map=edge_map,
    ntypes={ntype: i for i, ntype in enumerate(g.ntypes)},
    etypes={etype: i for i, etype in enumerate(g.canonical_etypes)}
43
44
45
)

node_policy = dgl.distributed.PartitionPolicy(
46
    policy_str="node~_N", partition_book=gpb
47
)
48

49
edge_policy = dgl.distributed.PartitionPolicy(
50
    policy_str="edge~_N:_E:_N", partition_book=gpb
51
)
52

53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
data_0 = F.tensor(
    [[1.0, 1.0], [1.0, 1.0], [1.0, 1.0], [1.0, 1.0], [1.0, 1.0], [1.0, 1.0]],
    F.float32,
)
data_0_1 = F.tensor([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], F.float32)
data_0_2 = F.tensor([1, 2, 3, 4, 5, 6], F.int32)
data_0_3 = F.tensor([1, 2, 3, 4, 5, 6], F.int64)
data_1 = F.tensor(
    [
        [2.0, 2.0],
        [2.0, 2.0],
        [2.0, 2.0],
        [2.0, 2.0],
        [2.0, 2.0],
        [2.0, 2.0],
        [2.0, 2.0],
    ],
    F.float32,
)
data_2 = F.tensor(
    [[0.0, 0.0], [0.0, 0.0], [0.0, 0.0], [0.0, 0.0], [0.0, 0.0], [0.0, 0.0]],
    F.float32,
)
76
77
78
79
80


def init_zero_func(shape, dtype):
    return F.zeros(shape, dtype, F.cpu())

81

82
def udf_push(target, name, id_tensor, data_tensor):
83
84
    target[name][id_tensor] = data_tensor * data_tensor

85

86
87
def add_push(target, name, id_tensor, data_tensor):
    target[name][id_tensor] += data_tensor
88

89
90
91
92
93

@unittest.skipIf(
    os.name == "nt" or os.getenv("DGLBACKEND") == "tensorflow",
    reason="Do not support windows and TF yet",
)
94
95
96
def test_partition_policy():
    assert node_policy.part_id == 0
    assert edge_policy.part_id == 0
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
    local_nid = node_policy.to_local(F.tensor([0, 1, 2, 3, 4, 5]))
    local_eid = edge_policy.to_local(F.tensor([0, 1, 2, 3, 4, 5, 6]))
    assert_array_equal(
        F.asnumpy(local_nid), F.asnumpy(F.tensor([0, 1, 2, 3, 4, 5], F.int64))
    )
    assert_array_equal(
        F.asnumpy(local_eid),
        F.asnumpy(F.tensor([0, 1, 2, 3, 4, 5, 6], F.int64)),
    )
    nid_partid = node_policy.to_partid(F.tensor([0, 1, 2, 3, 4, 5], F.int64))
    eid_partid = edge_policy.to_partid(F.tensor([0, 1, 2, 3, 4, 5, 6], F.int64))
    assert_array_equal(
        F.asnumpy(nid_partid), F.asnumpy(F.tensor([0, 0, 0, 0, 0, 0], F.int64))
    )
    assert_array_equal(
        F.asnumpy(eid_partid),
        F.asnumpy(F.tensor([0, 0, 0, 0, 0, 0, 0], F.int64)),
    )
115
116
    assert node_policy.get_part_size() == len(local_nid)
    assert edge_policy.get_part_size() == len(local_eid)
117

118

119
def start_server(server_id, num_clients, num_servers):
120
    # Init kvserver
121
122
    print("Sleep 5 seconds to test client re-connect.")
    time.sleep(5)
123
124
125
126
127
128
    kvserver = dgl.distributed.KVServer(
        server_id=server_id,
        ip_config="kv_ip_config.txt",
        num_servers=num_servers,
        num_clients=num_clients,
    )
129
130
    kvserver.add_part_policy(node_policy)
    kvserver.add_part_policy(edge_policy)
131
    if kvserver.is_backup_server():
132
133
134
135
        kvserver.init_data("data_0", "node~_N")
        kvserver.init_data("data_0_1", "node~_N")
        kvserver.init_data("data_0_2", "node~_N")
        kvserver.init_data("data_0_3", "node~_N")
136
    else:
137
138
139
140
        kvserver.init_data("data_0", "node~_N", data_0)
        kvserver.init_data("data_0_1", "node~_N", data_0_1)
        kvserver.init_data("data_0_2", "node~_N", data_0_2)
        kvserver.init_data("data_0_3", "node~_N", data_0_3)
141
    # start server
142
143
144
145
146
147
148
149
150
151
152
    server_state = dgl.distributed.ServerState(
        kv_store=kvserver, local_g=None, partition_book=None
    )
    dgl.distributed.start_server(
        server_id=server_id,
        ip_config="kv_ip_config.txt",
        num_servers=num_servers,
        num_clients=num_clients,
        server_state=server_state,
    )

153

154
def start_server_mul_role(server_id, num_clients, num_servers):
155
    # Init kvserver
156
157
158
159
160
161
    kvserver = dgl.distributed.KVServer(
        server_id=server_id,
        ip_config="kv_ip_mul_config.txt",
        num_servers=num_servers,
        num_clients=num_clients,
    )
162
163
    kvserver.add_part_policy(node_policy)
    if kvserver.is_backup_server():
164
        kvserver.init_data("data_0", "node~_N")
165
    else:
166
        kvserver.init_data("data_0", "node~_N", data_0)
167
    # start server
168
169
170
171
172
173
174
175
176
177
178
    server_state = dgl.distributed.ServerState(
        kv_store=kvserver, local_g=None, partition_book=None
    )
    dgl.distributed.start_server(
        server_id=server_id,
        ip_config="kv_ip_mul_config.txt",
        num_servers=num_servers,
        num_clients=num_clients,
        server_state=server_state,
    )

179

180
def start_client(num_clients, num_servers):
181
    os.environ["DGL_DIST_MODE"] = "distributed"
182
    # Note: connect to server first !
183
    dgl.distributed.initialize(ip_config="kv_ip_config.txt")
184
    # Init kvclient
185
186
187
    kvclient = dgl.distributed.KVClient(
        ip_config="kv_ip_config.txt", num_servers=num_servers
    )
188
    kvclient.map_shared_data(partition_book=gpb)
189
    assert dgl.distributed.get_num_client() == num_clients
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
    kvclient.init_data(
        name="data_1",
        shape=F.shape(data_1),
        dtype=F.dtype(data_1),
        part_policy=edge_policy,
        init_func=init_zero_func,
    )
    kvclient.init_data(
        name="data_2",
        shape=F.shape(data_2),
        dtype=F.dtype(data_2),
        part_policy=node_policy,
        init_func=init_zero_func,
    )

205
206
207
    # Test data_name_list
    name_list = kvclient.data_name_list()
    print(name_list)
208
209
210
211
212
213
    assert "data_0" in name_list
    assert "data_0_1" in name_list
    assert "data_0_2" in name_list
    assert "data_0_3" in name_list
    assert "data_1" in name_list
    assert "data_2" in name_list
214
    # Test get_meta_data
215
    meta = kvclient.get_data_meta("data_0")
216
217
218
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0)
    assert shape == F.shape(data_0)
219
    assert policy.policy_str == "node~_N"
220

221
    meta = kvclient.get_data_meta("data_0_1")
222
223
224
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0_1)
    assert shape == F.shape(data_0_1)
225
    assert policy.policy_str == "node~_N"
226

227
    meta = kvclient.get_data_meta("data_0_2")
228
229
230
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0_2)
    assert shape == F.shape(data_0_2)
231
    assert policy.policy_str == "node~_N"
232

233
    meta = kvclient.get_data_meta("data_0_3")
234
235
236
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0_3)
    assert shape == F.shape(data_0_3)
237
    assert policy.policy_str == "node~_N"
238

239
    meta = kvclient.get_data_meta("data_1")
240
241
242
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_1)
    assert shape == F.shape(data_1)
243
    assert policy.policy_str == "edge~_N:_E:_N"
244

245
    meta = kvclient.get_data_meta("data_2")
246
247
248
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_2)
    assert shape == F.shape(data_2)
249
    assert policy.policy_str == "node~_N"
250

251
    # Test push and pull
252
253
254
255
256
257
    id_tensor = F.tensor([0, 2, 4], F.int64)
    data_tensor = F.tensor([[6.0, 6.0], [6.0, 6.0], [6.0, 6.0]], F.float32)
    kvclient.push(name="data_0", id_tensor=id_tensor, data_tensor=data_tensor)
    kvclient.push(name="data_1", id_tensor=id_tensor, data_tensor=data_tensor)
    kvclient.push(name="data_2", id_tensor=id_tensor, data_tensor=data_tensor)
    res = kvclient.pull(name="data_0", id_tensor=id_tensor)
258
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
259
    res = kvclient.pull(name="data_1", id_tensor=id_tensor)
260
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
261
    res = kvclient.pull(name="data_2", id_tensor=id_tensor)
262
263
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    # Register new push handler
264
265
266
    kvclient.register_push_handler("data_0", udf_push)
    kvclient.register_push_handler("data_1", udf_push)
    kvclient.register_push_handler("data_2", udf_push)
267
    # Test push and pull
268
269
270
    kvclient.push(name="data_0", id_tensor=id_tensor, data_tensor=data_tensor)
    kvclient.push(name="data_1", id_tensor=id_tensor, data_tensor=data_tensor)
    kvclient.push(name="data_2", id_tensor=id_tensor, data_tensor=data_tensor)
271
    kvclient.barrier()
272
    data_tensor = data_tensor * data_tensor
273
    res = kvclient.pull(name="data_0", id_tensor=id_tensor)
274
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
275
    res = kvclient.pull(name="data_1", id_tensor=id_tensor)
276
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
277
    res = kvclient.pull(name="data_2", id_tensor=id_tensor)
278
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
279
280

    # Test delete data
281
282
283
    kvclient.delete_data("data_0")
    kvclient.delete_data("data_1")
    kvclient.delete_data("data_2")
284

285
    # Register new push handler
286
287
288
289
290
291
292
293
294
    kvclient.init_data(
        name="data_3",
        shape=F.shape(data_2),
        dtype=F.dtype(data_2),
        part_policy=node_policy,
        init_func=init_zero_func,
    )
    kvclient.register_push_handler("data_3", add_push)
    data_tensor = F.tensor([[6.0, 6.0], [6.0, 6.0], [6.0, 6.0]], F.float32)
295
    kvclient.barrier()
296
297
    time.sleep(kvclient.client_id + 1)
    print("add...")
298
    kvclient.push(name="data_3", id_tensor=id_tensor, data_tensor=data_tensor)
299
    kvclient.barrier()
300
    res = kvclient.pull(name="data_3", id_tensor=id_tensor)
301
302
    data_tensor = data_tensor * num_clients
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
303

304

305
def start_client_mul_role(i):
306
    os.environ["DGL_DIST_MODE"] = "distributed"
Da Zheng's avatar
Da Zheng committed
307
    # Initialize creates kvstore !
308
309
    dgl.distributed.initialize(ip_config="kv_ip_mul_config.txt")
    if i == 0:  # block one trainer
310
        time.sleep(5)
Da Zheng's avatar
Da Zheng committed
311
    kvclient = dgl.distributed.kvstore.get_kvstore()
312
    kvclient.barrier()
313
    print("i: %d role: %s" % (i, kvclient.role))
314

Da Zheng's avatar
Da Zheng committed
315
316
    assert dgl.distributed.role.get_num_trainers() == 2
    assert dgl.distributed.role.get_trainer_rank() < 2
317
318
319
320
321
322
323
    print(
        "trainer rank: %d, global rank: %d"
        % (
            dgl.distributed.role.get_trainer_rank(),
            dgl.distributed.role.get_global_rank(),
        )
    )
Da Zheng's avatar
Da Zheng committed
324
325
    dgl.distributed.exit_client()

326
327
328
329
330

@unittest.skipIf(
    os.name == "nt" or os.getenv("DGLBACKEND") == "tensorflow",
    reason="Do not support windows and TF yet",
)
331
def test_kv_store():
332
    reset_envs()
333
334
    num_servers = 2
    num_clients = 2
335
    generate_ip_config("kv_ip_config.txt", 1, num_servers)
336
    ctx = mp.get_context("spawn")
337
338
    pserver_list = []
    pclient_list = []
339
    os.environ["DGL_NUM_SERVER"] = str(num_servers)
340
    for i in range(num_servers):
341
342
343
        pserver = ctx.Process(
            target=start_server, args=(i, num_clients, num_servers)
        )
344
345
        pserver.start()
        pserver_list.append(pserver)
346
    for i in range(num_clients):
347
348
349
        pclient = ctx.Process(
            target=start_client, args=(num_clients, num_servers)
        )
350
351
        pclient.start()
        pclient_list.append(pclient)
352
    for i in range(num_clients):
353
        pclient_list[i].join()
354
    for i in range(num_servers):
355
        pserver_list[i].join()
356

357
358
359
360
361

@unittest.skipIf(
    os.name == "nt" or os.getenv("DGLBACKEND") == "tensorflow",
    reason="Do not support windows and TF yet",
)
362
def test_kv_multi_role():
363
    reset_envs()
364
    num_servers = 2
Da Zheng's avatar
Da Zheng committed
365
366
    num_trainers = 2
    num_samplers = 2
367
    generate_ip_config("kv_ip_mul_config.txt", 1, num_servers)
Da Zheng's avatar
Da Zheng committed
368
369
    # There are two trainer processes and each trainer process has two sampler processes.
    num_clients = num_trainers * (1 + num_samplers)
370
    ctx = mp.get_context("spawn")
371
372
    pserver_list = []
    pclient_list = []
373
374
    os.environ["DGL_NUM_SAMPLER"] = str(num_samplers)
    os.environ["DGL_NUM_SERVER"] = str(num_servers)
375
    for i in range(num_servers):
376
377
378
        pserver = ctx.Process(
            target=start_server_mul_role, args=(i, num_clients, num_servers)
        )
379
380
        pserver.start()
        pserver_list.append(pserver)
Da Zheng's avatar
Da Zheng committed
381
    for i in range(num_trainers):
382
        pclient = ctx.Process(target=start_client_mul_role, args=(i,))
383
384
        pclient.start()
        pclient_list.append(pclient)
Da Zheng's avatar
Da Zheng committed
385
    for i in range(num_trainers):
386
387
388
389
        pclient_list[i].join()
    for i in range(num_servers):
        pserver_list[i].join()

390
391

if __name__ == "__main__":
392
    test_partition_policy()
393
    test_kv_store()
394
    test_kv_multi_role()