test_new_kvstore.py 12.5 KB
Newer Older
1
import multiprocessing as mp
2
3
import os
import time
4
5
6
import unittest

import backend as F
7
8

import dgl
9
10
11
from numpy.testing import assert_array_equal
from utils import generate_ip_config, reset_envs

12

13
# Create an one-part Graph
14
15
node_map = {"_N": F.tensor([[0, 6]], F.int64)}
edge_map = {("_N", "_E", "_N"): F.tensor([[0, 7]], F.int64)}
16
17
global_nid = F.tensor([0, 1, 2, 3, 4, 5], F.int64)
global_eid = F.tensor([0, 1, 2, 3, 4, 5, 6], F.int64)
18

19
g = dgl.graph([])
20
g.add_nodes(6)
21
22
23
24
25
26
27
g.add_edges(0, 1)  # 0
g.add_edges(0, 2)  # 1
g.add_edges(0, 3)  # 2
g.add_edges(2, 3)  # 3
g.add_edges(1, 1)  # 4
g.add_edges(0, 4)  # 5
g.add_edges(2, 5)  # 6
28
29
30
31

g.ndata[dgl.NID] = global_nid
g.edata[dgl.EID] = global_eid

32
gpb = dgl.distributed.graph_partition_book.RangePartitionBook(
33
34
35
36
    part_id=0,
    num_parts=1,
    node_map=node_map,
    edge_map=edge_map,
37
    ntypes={ntype: i for i, ntype in enumerate(g.ntypes)},
38
    etypes={etype: i for i, etype in enumerate(g.canonical_etypes)},
39
40
41
)

node_policy = dgl.distributed.PartitionPolicy(
42
    policy_str="node~_N", partition_book=gpb
43
)
44

45
edge_policy = dgl.distributed.PartitionPolicy(
46
    policy_str="edge~_N:_E:_N", partition_book=gpb
47
)
48

49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
data_0 = F.tensor(
    [[1.0, 1.0], [1.0, 1.0], [1.0, 1.0], [1.0, 1.0], [1.0, 1.0], [1.0, 1.0]],
    F.float32,
)
data_0_1 = F.tensor([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], F.float32)
data_0_2 = F.tensor([1, 2, 3, 4, 5, 6], F.int32)
data_0_3 = F.tensor([1, 2, 3, 4, 5, 6], F.int64)
data_1 = F.tensor(
    [
        [2.0, 2.0],
        [2.0, 2.0],
        [2.0, 2.0],
        [2.0, 2.0],
        [2.0, 2.0],
        [2.0, 2.0],
        [2.0, 2.0],
    ],
    F.float32,
)
data_2 = F.tensor(
    [[0.0, 0.0], [0.0, 0.0], [0.0, 0.0], [0.0, 0.0], [0.0, 0.0], [0.0, 0.0]],
    F.float32,
)
72
73
74
75
76


def init_zero_func(shape, dtype):
    return F.zeros(shape, dtype, F.cpu())

77

78
def udf_push(target, name, id_tensor, data_tensor):
79
80
    target[name][id_tensor] = data_tensor * data_tensor

81

82
83
def add_push(target, name, id_tensor, data_tensor):
    target[name][id_tensor] += data_tensor
84

85
86
87
88
89

@unittest.skipIf(
    os.name == "nt" or os.getenv("DGLBACKEND") == "tensorflow",
    reason="Do not support windows and TF yet",
)
90
91
92
def test_partition_policy():
    assert node_policy.part_id == 0
    assert edge_policy.part_id == 0
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
    local_nid = node_policy.to_local(F.tensor([0, 1, 2, 3, 4, 5]))
    local_eid = edge_policy.to_local(F.tensor([0, 1, 2, 3, 4, 5, 6]))
    assert_array_equal(
        F.asnumpy(local_nid), F.asnumpy(F.tensor([0, 1, 2, 3, 4, 5], F.int64))
    )
    assert_array_equal(
        F.asnumpy(local_eid),
        F.asnumpy(F.tensor([0, 1, 2, 3, 4, 5, 6], F.int64)),
    )
    nid_partid = node_policy.to_partid(F.tensor([0, 1, 2, 3, 4, 5], F.int64))
    eid_partid = edge_policy.to_partid(F.tensor([0, 1, 2, 3, 4, 5, 6], F.int64))
    assert_array_equal(
        F.asnumpy(nid_partid), F.asnumpy(F.tensor([0, 0, 0, 0, 0, 0], F.int64))
    )
    assert_array_equal(
        F.asnumpy(eid_partid),
        F.asnumpy(F.tensor([0, 0, 0, 0, 0, 0, 0], F.int64)),
    )
111
112
    assert node_policy.get_part_size() == len(local_nid)
    assert edge_policy.get_part_size() == len(local_eid)
113

114

115
def start_server(server_id, num_clients, num_servers):
116
    # Init kvserver
117
118
    print("Sleep 5 seconds to test client re-connect.")
    time.sleep(5)
119
120
121
122
123
124
    kvserver = dgl.distributed.KVServer(
        server_id=server_id,
        ip_config="kv_ip_config.txt",
        num_servers=num_servers,
        num_clients=num_clients,
    )
125
126
    kvserver.add_part_policy(node_policy)
    kvserver.add_part_policy(edge_policy)
127
    if kvserver.is_backup_server():
128
129
130
131
        kvserver.init_data("data_0", "node~_N")
        kvserver.init_data("data_0_1", "node~_N")
        kvserver.init_data("data_0_2", "node~_N")
        kvserver.init_data("data_0_3", "node~_N")
132
    else:
133
134
135
136
        kvserver.init_data("data_0", "node~_N", data_0)
        kvserver.init_data("data_0_1", "node~_N", data_0_1)
        kvserver.init_data("data_0_2", "node~_N", data_0_2)
        kvserver.init_data("data_0_3", "node~_N", data_0_3)
137
    # start server
138
139
140
141
142
143
144
145
146
147
148
    server_state = dgl.distributed.ServerState(
        kv_store=kvserver, local_g=None, partition_book=None
    )
    dgl.distributed.start_server(
        server_id=server_id,
        ip_config="kv_ip_config.txt",
        num_servers=num_servers,
        num_clients=num_clients,
        server_state=server_state,
    )

149

150
def start_server_mul_role(server_id, num_clients, num_servers):
151
    # Init kvserver
152
153
154
155
156
157
    kvserver = dgl.distributed.KVServer(
        server_id=server_id,
        ip_config="kv_ip_mul_config.txt",
        num_servers=num_servers,
        num_clients=num_clients,
    )
158
159
    kvserver.add_part_policy(node_policy)
    if kvserver.is_backup_server():
160
        kvserver.init_data("data_0", "node~_N")
161
    else:
162
        kvserver.init_data("data_0", "node~_N", data_0)
163
    # start server
164
165
166
167
168
169
170
171
172
173
174
    server_state = dgl.distributed.ServerState(
        kv_store=kvserver, local_g=None, partition_book=None
    )
    dgl.distributed.start_server(
        server_id=server_id,
        ip_config="kv_ip_mul_config.txt",
        num_servers=num_servers,
        num_clients=num_clients,
        server_state=server_state,
    )

175

176
def start_client(num_clients, num_servers):
177
    os.environ["DGL_DIST_MODE"] = "distributed"
178
    # Note: connect to server first !
179
    dgl.distributed.initialize(ip_config="kv_ip_config.txt")
180
    # Init kvclient
181
182
183
    kvclient = dgl.distributed.KVClient(
        ip_config="kv_ip_config.txt", num_servers=num_servers
    )
184
    kvclient.map_shared_data(partition_book=gpb)
185
    assert dgl.distributed.get_num_client() == num_clients
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
    kvclient.init_data(
        name="data_1",
        shape=F.shape(data_1),
        dtype=F.dtype(data_1),
        part_policy=edge_policy,
        init_func=init_zero_func,
    )
    kvclient.init_data(
        name="data_2",
        shape=F.shape(data_2),
        dtype=F.dtype(data_2),
        part_policy=node_policy,
        init_func=init_zero_func,
    )

201
202
203
    # Test data_name_list
    name_list = kvclient.data_name_list()
    print(name_list)
204
205
206
207
208
209
    assert "data_0" in name_list
    assert "data_0_1" in name_list
    assert "data_0_2" in name_list
    assert "data_0_3" in name_list
    assert "data_1" in name_list
    assert "data_2" in name_list
210
    # Test get_meta_data
211
    meta = kvclient.get_data_meta("data_0")
212
213
214
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0)
    assert shape == F.shape(data_0)
215
    assert policy.policy_str == "node~_N"
216

217
    meta = kvclient.get_data_meta("data_0_1")
218
219
220
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0_1)
    assert shape == F.shape(data_0_1)
221
    assert policy.policy_str == "node~_N"
222

223
    meta = kvclient.get_data_meta("data_0_2")
224
225
226
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0_2)
    assert shape == F.shape(data_0_2)
227
    assert policy.policy_str == "node~_N"
228

229
    meta = kvclient.get_data_meta("data_0_3")
230
231
232
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0_3)
    assert shape == F.shape(data_0_3)
233
    assert policy.policy_str == "node~_N"
234

235
    meta = kvclient.get_data_meta("data_1")
236
237
238
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_1)
    assert shape == F.shape(data_1)
239
    assert policy.policy_str == "edge~_N:_E:_N"
240

241
    meta = kvclient.get_data_meta("data_2")
242
243
244
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_2)
    assert shape == F.shape(data_2)
245
    assert policy.policy_str == "node~_N"
246

247
    # Test push and pull
248
249
250
251
252
253
    id_tensor = F.tensor([0, 2, 4], F.int64)
    data_tensor = F.tensor([[6.0, 6.0], [6.0, 6.0], [6.0, 6.0]], F.float32)
    kvclient.push(name="data_0", id_tensor=id_tensor, data_tensor=data_tensor)
    kvclient.push(name="data_1", id_tensor=id_tensor, data_tensor=data_tensor)
    kvclient.push(name="data_2", id_tensor=id_tensor, data_tensor=data_tensor)
    res = kvclient.pull(name="data_0", id_tensor=id_tensor)
254
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
255
    res = kvclient.pull(name="data_1", id_tensor=id_tensor)
256
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
257
    res = kvclient.pull(name="data_2", id_tensor=id_tensor)
258
259
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    # Register new push handler
260
261
262
    kvclient.register_push_handler("data_0", udf_push)
    kvclient.register_push_handler("data_1", udf_push)
    kvclient.register_push_handler("data_2", udf_push)
263
    # Test push and pull
264
265
266
    kvclient.push(name="data_0", id_tensor=id_tensor, data_tensor=data_tensor)
    kvclient.push(name="data_1", id_tensor=id_tensor, data_tensor=data_tensor)
    kvclient.push(name="data_2", id_tensor=id_tensor, data_tensor=data_tensor)
267
    kvclient.barrier()
268
    data_tensor = data_tensor * data_tensor
269
    res = kvclient.pull(name="data_0", id_tensor=id_tensor)
270
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
271
    res = kvclient.pull(name="data_1", id_tensor=id_tensor)
272
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
273
    res = kvclient.pull(name="data_2", id_tensor=id_tensor)
274
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
275
276

    # Test delete data
277
278
279
    kvclient.delete_data("data_0")
    kvclient.delete_data("data_1")
    kvclient.delete_data("data_2")
280

281
    # Register new push handler
282
283
284
285
286
287
288
289
290
    kvclient.init_data(
        name="data_3",
        shape=F.shape(data_2),
        dtype=F.dtype(data_2),
        part_policy=node_policy,
        init_func=init_zero_func,
    )
    kvclient.register_push_handler("data_3", add_push)
    data_tensor = F.tensor([[6.0, 6.0], [6.0, 6.0], [6.0, 6.0]], F.float32)
291
    kvclient.barrier()
292
293
    time.sleep(kvclient.client_id + 1)
    print("add...")
294
    kvclient.push(name="data_3", id_tensor=id_tensor, data_tensor=data_tensor)
295
    kvclient.barrier()
296
    res = kvclient.pull(name="data_3", id_tensor=id_tensor)
297
298
    data_tensor = data_tensor * num_clients
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
299

300

301
def start_client_mul_role(i):
302
    os.environ["DGL_DIST_MODE"] = "distributed"
Da Zheng's avatar
Da Zheng committed
303
    # Initialize creates kvstore !
304
305
    dgl.distributed.initialize(ip_config="kv_ip_mul_config.txt")
    if i == 0:  # block one trainer
306
        time.sleep(5)
Da Zheng's avatar
Da Zheng committed
307
    kvclient = dgl.distributed.kvstore.get_kvstore()
308
    kvclient.barrier()
309
    print("i: %d role: %s" % (i, kvclient.role))
310

Da Zheng's avatar
Da Zheng committed
311
312
    assert dgl.distributed.role.get_num_trainers() == 2
    assert dgl.distributed.role.get_trainer_rank() < 2
313
314
315
316
317
318
319
    print(
        "trainer rank: %d, global rank: %d"
        % (
            dgl.distributed.role.get_trainer_rank(),
            dgl.distributed.role.get_global_rank(),
        )
    )
Da Zheng's avatar
Da Zheng committed
320
321
    dgl.distributed.exit_client()

322
323
324
325
326

@unittest.skipIf(
    os.name == "nt" or os.getenv("DGLBACKEND") == "tensorflow",
    reason="Do not support windows and TF yet",
)
327
def test_kv_store():
328
    reset_envs()
329
330
    num_servers = 2
    num_clients = 2
331
    generate_ip_config("kv_ip_config.txt", 1, num_servers)
332
    ctx = mp.get_context("spawn")
333
334
    pserver_list = []
    pclient_list = []
335
    os.environ["DGL_NUM_SERVER"] = str(num_servers)
336
    for i in range(num_servers):
337
338
339
        pserver = ctx.Process(
            target=start_server, args=(i, num_clients, num_servers)
        )
340
341
        pserver.start()
        pserver_list.append(pserver)
342
    for i in range(num_clients):
343
344
345
        pclient = ctx.Process(
            target=start_client, args=(num_clients, num_servers)
        )
346
347
        pclient.start()
        pclient_list.append(pclient)
348
    for i in range(num_clients):
349
        pclient_list[i].join()
350
    for i in range(num_servers):
351
        pserver_list[i].join()
352

353
354
355
356
357

@unittest.skipIf(
    os.name == "nt" or os.getenv("DGLBACKEND") == "tensorflow",
    reason="Do not support windows and TF yet",
)
358
def test_kv_multi_role():
359
    reset_envs()
360
    num_servers = 2
Da Zheng's avatar
Da Zheng committed
361
362
    num_trainers = 2
    num_samplers = 2
363
    generate_ip_config("kv_ip_mul_config.txt", 1, num_servers)
Da Zheng's avatar
Da Zheng committed
364
365
    # There are two trainer processes and each trainer process has two sampler processes.
    num_clients = num_trainers * (1 + num_samplers)
366
    ctx = mp.get_context("spawn")
367
368
    pserver_list = []
    pclient_list = []
369
370
    os.environ["DGL_NUM_SAMPLER"] = str(num_samplers)
    os.environ["DGL_NUM_SERVER"] = str(num_servers)
371
    for i in range(num_servers):
372
373
374
        pserver = ctx.Process(
            target=start_server_mul_role, args=(i, num_clients, num_servers)
        )
375
376
        pserver.start()
        pserver_list.append(pserver)
Da Zheng's avatar
Da Zheng committed
377
    for i in range(num_trainers):
378
        pclient = ctx.Process(target=start_client_mul_role, args=(i,))
379
380
        pclient.start()
        pclient_list.append(pclient)
Da Zheng's avatar
Da Zheng committed
381
    for i in range(num_trainers):
382
383
384
385
        pclient_list[i].join()
    for i in range(num_servers):
        pserver_list[i].join()

386
387

if __name__ == "__main__":
388
    test_partition_policy()
389
    test_kv_store()
390
    test_kv_multi_role()