test_new_kvstore.py 12.7 KB
Newer Older
1
import multiprocessing as mp
2
import os
3
import socket
4
import time
5
6
7
import unittest

import backend as F
8
9

import dgl
10
import numpy as np
11
from dgl.graph_index import create_graph_index
12
from numpy.testing import assert_array_equal
13
from scipy import sparse as spsp
14
15
16
from utils import generate_ip_config, reset_envs

if os.name != "nt":
17
18
19
    import fcntl
    import struct

20
# Create an one-part Graph
21
22
node_map = {"_N": F.tensor([[0, 6]], F.int64)}
edge_map = {("_N", "_E", "_N"): F.tensor([[0, 7]], F.int64)}
23
24
global_nid = F.tensor([0, 1, 2, 3, 4, 5], F.int64)
global_eid = F.tensor([0, 1, 2, 3, 4, 5, 6], F.int64)
25
26
27

g = dgl.DGLGraph()
g.add_nodes(6)
28
29
30
31
32
33
34
g.add_edges(0, 1)  # 0
g.add_edges(0, 2)  # 1
g.add_edges(0, 3)  # 2
g.add_edges(2, 3)  # 3
g.add_edges(1, 1)  # 4
g.add_edges(0, 4)  # 5
g.add_edges(2, 5)  # 6
35
36
37
38

g.ndata[dgl.NID] = global_nid
g.edata[dgl.EID] = global_eid

39
gpb = dgl.distributed.graph_partition_book.RangePartitionBook(
40
41
42
43
    part_id=0,
    num_parts=1,
    node_map=node_map,
    edge_map=edge_map,
44
    ntypes={ntype: i for i, ntype in enumerate(g.ntypes)},
45
    etypes={etype: i for i, etype in enumerate(g.canonical_etypes)},
46
47
48
)

node_policy = dgl.distributed.PartitionPolicy(
49
    policy_str="node~_N", partition_book=gpb
50
)
51

52
edge_policy = dgl.distributed.PartitionPolicy(
53
    policy_str="edge~_N:_E:_N", partition_book=gpb
54
)
55

56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
data_0 = F.tensor(
    [[1.0, 1.0], [1.0, 1.0], [1.0, 1.0], [1.0, 1.0], [1.0, 1.0], [1.0, 1.0]],
    F.float32,
)
data_0_1 = F.tensor([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], F.float32)
data_0_2 = F.tensor([1, 2, 3, 4, 5, 6], F.int32)
data_0_3 = F.tensor([1, 2, 3, 4, 5, 6], F.int64)
data_1 = F.tensor(
    [
        [2.0, 2.0],
        [2.0, 2.0],
        [2.0, 2.0],
        [2.0, 2.0],
        [2.0, 2.0],
        [2.0, 2.0],
        [2.0, 2.0],
    ],
    F.float32,
)
data_2 = F.tensor(
    [[0.0, 0.0], [0.0, 0.0], [0.0, 0.0], [0.0, 0.0], [0.0, 0.0], [0.0, 0.0]],
    F.float32,
)
79
80
81
82
83


def init_zero_func(shape, dtype):
    return F.zeros(shape, dtype, F.cpu())

84

85
def udf_push(target, name, id_tensor, data_tensor):
86
87
    target[name][id_tensor] = data_tensor * data_tensor

88

89
90
def add_push(target, name, id_tensor, data_tensor):
    target[name][id_tensor] += data_tensor
91

92
93
94
95
96

@unittest.skipIf(
    os.name == "nt" or os.getenv("DGLBACKEND") == "tensorflow",
    reason="Do not support windows and TF yet",
)
97
98
99
def test_partition_policy():
    assert node_policy.part_id == 0
    assert edge_policy.part_id == 0
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
    local_nid = node_policy.to_local(F.tensor([0, 1, 2, 3, 4, 5]))
    local_eid = edge_policy.to_local(F.tensor([0, 1, 2, 3, 4, 5, 6]))
    assert_array_equal(
        F.asnumpy(local_nid), F.asnumpy(F.tensor([0, 1, 2, 3, 4, 5], F.int64))
    )
    assert_array_equal(
        F.asnumpy(local_eid),
        F.asnumpy(F.tensor([0, 1, 2, 3, 4, 5, 6], F.int64)),
    )
    nid_partid = node_policy.to_partid(F.tensor([0, 1, 2, 3, 4, 5], F.int64))
    eid_partid = edge_policy.to_partid(F.tensor([0, 1, 2, 3, 4, 5, 6], F.int64))
    assert_array_equal(
        F.asnumpy(nid_partid), F.asnumpy(F.tensor([0, 0, 0, 0, 0, 0], F.int64))
    )
    assert_array_equal(
        F.asnumpy(eid_partid),
        F.asnumpy(F.tensor([0, 0, 0, 0, 0, 0, 0], F.int64)),
    )
118
119
    assert node_policy.get_part_size() == len(local_nid)
    assert edge_policy.get_part_size() == len(local_eid)
120

121

122
def start_server(server_id, num_clients, num_servers):
123
    # Init kvserver
124
125
    print("Sleep 5 seconds to test client re-connect.")
    time.sleep(5)
126
127
128
129
130
131
    kvserver = dgl.distributed.KVServer(
        server_id=server_id,
        ip_config="kv_ip_config.txt",
        num_servers=num_servers,
        num_clients=num_clients,
    )
132
133
    kvserver.add_part_policy(node_policy)
    kvserver.add_part_policy(edge_policy)
134
    if kvserver.is_backup_server():
135
136
137
138
        kvserver.init_data("data_0", "node~_N")
        kvserver.init_data("data_0_1", "node~_N")
        kvserver.init_data("data_0_2", "node~_N")
        kvserver.init_data("data_0_3", "node~_N")
139
    else:
140
141
142
143
        kvserver.init_data("data_0", "node~_N", data_0)
        kvserver.init_data("data_0_1", "node~_N", data_0_1)
        kvserver.init_data("data_0_2", "node~_N", data_0_2)
        kvserver.init_data("data_0_3", "node~_N", data_0_3)
144
    # start server
145
146
147
148
149
150
151
152
153
154
155
    server_state = dgl.distributed.ServerState(
        kv_store=kvserver, local_g=None, partition_book=None
    )
    dgl.distributed.start_server(
        server_id=server_id,
        ip_config="kv_ip_config.txt",
        num_servers=num_servers,
        num_clients=num_clients,
        server_state=server_state,
    )

156

157
def start_server_mul_role(server_id, num_clients, num_servers):
158
    # Init kvserver
159
160
161
162
163
164
    kvserver = dgl.distributed.KVServer(
        server_id=server_id,
        ip_config="kv_ip_mul_config.txt",
        num_servers=num_servers,
        num_clients=num_clients,
    )
165
166
    kvserver.add_part_policy(node_policy)
    if kvserver.is_backup_server():
167
        kvserver.init_data("data_0", "node~_N")
168
    else:
169
        kvserver.init_data("data_0", "node~_N", data_0)
170
    # start server
171
172
173
174
175
176
177
178
179
180
181
    server_state = dgl.distributed.ServerState(
        kv_store=kvserver, local_g=None, partition_book=None
    )
    dgl.distributed.start_server(
        server_id=server_id,
        ip_config="kv_ip_mul_config.txt",
        num_servers=num_servers,
        num_clients=num_clients,
        server_state=server_state,
    )

182

183
def start_client(num_clients, num_servers):
184
    os.environ["DGL_DIST_MODE"] = "distributed"
185
    # Note: connect to server first !
186
    dgl.distributed.initialize(ip_config="kv_ip_config.txt")
187
    # Init kvclient
188
189
190
    kvclient = dgl.distributed.KVClient(
        ip_config="kv_ip_config.txt", num_servers=num_servers
    )
191
    kvclient.map_shared_data(partition_book=gpb)
192
    assert dgl.distributed.get_num_client() == num_clients
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
    kvclient.init_data(
        name="data_1",
        shape=F.shape(data_1),
        dtype=F.dtype(data_1),
        part_policy=edge_policy,
        init_func=init_zero_func,
    )
    kvclient.init_data(
        name="data_2",
        shape=F.shape(data_2),
        dtype=F.dtype(data_2),
        part_policy=node_policy,
        init_func=init_zero_func,
    )

208
209
210
    # Test data_name_list
    name_list = kvclient.data_name_list()
    print(name_list)
211
212
213
214
215
216
    assert "data_0" in name_list
    assert "data_0_1" in name_list
    assert "data_0_2" in name_list
    assert "data_0_3" in name_list
    assert "data_1" in name_list
    assert "data_2" in name_list
217
    # Test get_meta_data
218
    meta = kvclient.get_data_meta("data_0")
219
220
221
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0)
    assert shape == F.shape(data_0)
222
    assert policy.policy_str == "node~_N"
223

224
    meta = kvclient.get_data_meta("data_0_1")
225
226
227
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0_1)
    assert shape == F.shape(data_0_1)
228
    assert policy.policy_str == "node~_N"
229

230
    meta = kvclient.get_data_meta("data_0_2")
231
232
233
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0_2)
    assert shape == F.shape(data_0_2)
234
    assert policy.policy_str == "node~_N"
235

236
    meta = kvclient.get_data_meta("data_0_3")
237
238
239
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0_3)
    assert shape == F.shape(data_0_3)
240
    assert policy.policy_str == "node~_N"
241

242
    meta = kvclient.get_data_meta("data_1")
243
244
245
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_1)
    assert shape == F.shape(data_1)
246
    assert policy.policy_str == "edge~_N:_E:_N"
247

248
    meta = kvclient.get_data_meta("data_2")
249
250
251
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_2)
    assert shape == F.shape(data_2)
252
    assert policy.policy_str == "node~_N"
253

254
    # Test push and pull
255
256
257
258
259
260
    id_tensor = F.tensor([0, 2, 4], F.int64)
    data_tensor = F.tensor([[6.0, 6.0], [6.0, 6.0], [6.0, 6.0]], F.float32)
    kvclient.push(name="data_0", id_tensor=id_tensor, data_tensor=data_tensor)
    kvclient.push(name="data_1", id_tensor=id_tensor, data_tensor=data_tensor)
    kvclient.push(name="data_2", id_tensor=id_tensor, data_tensor=data_tensor)
    res = kvclient.pull(name="data_0", id_tensor=id_tensor)
261
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
262
    res = kvclient.pull(name="data_1", id_tensor=id_tensor)
263
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
264
    res = kvclient.pull(name="data_2", id_tensor=id_tensor)
265
266
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
    # Register new push handler
267
268
269
    kvclient.register_push_handler("data_0", udf_push)
    kvclient.register_push_handler("data_1", udf_push)
    kvclient.register_push_handler("data_2", udf_push)
270
    # Test push and pull
271
272
273
    kvclient.push(name="data_0", id_tensor=id_tensor, data_tensor=data_tensor)
    kvclient.push(name="data_1", id_tensor=id_tensor, data_tensor=data_tensor)
    kvclient.push(name="data_2", id_tensor=id_tensor, data_tensor=data_tensor)
274
    kvclient.barrier()
275
    data_tensor = data_tensor * data_tensor
276
    res = kvclient.pull(name="data_0", id_tensor=id_tensor)
277
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
278
    res = kvclient.pull(name="data_1", id_tensor=id_tensor)
279
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
280
    res = kvclient.pull(name="data_2", id_tensor=id_tensor)
281
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
282
283

    # Test delete data
284
285
286
    kvclient.delete_data("data_0")
    kvclient.delete_data("data_1")
    kvclient.delete_data("data_2")
287

288
    # Register new push handler
289
290
291
292
293
294
295
296
297
    kvclient.init_data(
        name="data_3",
        shape=F.shape(data_2),
        dtype=F.dtype(data_2),
        part_policy=node_policy,
        init_func=init_zero_func,
    )
    kvclient.register_push_handler("data_3", add_push)
    data_tensor = F.tensor([[6.0, 6.0], [6.0, 6.0], [6.0, 6.0]], F.float32)
298
    kvclient.barrier()
299
300
    time.sleep(kvclient.client_id + 1)
    print("add...")
301
    kvclient.push(name="data_3", id_tensor=id_tensor, data_tensor=data_tensor)
302
    kvclient.barrier()
303
    res = kvclient.pull(name="data_3", id_tensor=id_tensor)
304
305
    data_tensor = data_tensor * num_clients
    assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor))
306

307

308
def start_client_mul_role(i):
309
    os.environ["DGL_DIST_MODE"] = "distributed"
Da Zheng's avatar
Da Zheng committed
310
    # Initialize creates kvstore !
311
312
    dgl.distributed.initialize(ip_config="kv_ip_mul_config.txt")
    if i == 0:  # block one trainer
313
        time.sleep(5)
Da Zheng's avatar
Da Zheng committed
314
    kvclient = dgl.distributed.kvstore.get_kvstore()
315
    kvclient.barrier()
316
    print("i: %d role: %s" % (i, kvclient.role))
317

Da Zheng's avatar
Da Zheng committed
318
319
    assert dgl.distributed.role.get_num_trainers() == 2
    assert dgl.distributed.role.get_trainer_rank() < 2
320
321
322
323
324
325
326
    print(
        "trainer rank: %d, global rank: %d"
        % (
            dgl.distributed.role.get_trainer_rank(),
            dgl.distributed.role.get_global_rank(),
        )
    )
Da Zheng's avatar
Da Zheng committed
327
328
    dgl.distributed.exit_client()

329
330
331
332
333

@unittest.skipIf(
    os.name == "nt" or os.getenv("DGLBACKEND") == "tensorflow",
    reason="Do not support windows and TF yet",
)
334
def test_kv_store():
335
    reset_envs()
336
337
    num_servers = 2
    num_clients = 2
338
    generate_ip_config("kv_ip_config.txt", 1, num_servers)
339
    ctx = mp.get_context("spawn")
340
341
    pserver_list = []
    pclient_list = []
342
    os.environ["DGL_NUM_SERVER"] = str(num_servers)
343
    for i in range(num_servers):
344
345
346
        pserver = ctx.Process(
            target=start_server, args=(i, num_clients, num_servers)
        )
347
348
        pserver.start()
        pserver_list.append(pserver)
349
    for i in range(num_clients):
350
351
352
        pclient = ctx.Process(
            target=start_client, args=(num_clients, num_servers)
        )
353
354
        pclient.start()
        pclient_list.append(pclient)
355
    for i in range(num_clients):
356
        pclient_list[i].join()
357
    for i in range(num_servers):
358
        pserver_list[i].join()
359

360
361
362
363
364

@unittest.skipIf(
    os.name == "nt" or os.getenv("DGLBACKEND") == "tensorflow",
    reason="Do not support windows and TF yet",
)
365
def test_kv_multi_role():
366
    reset_envs()
367
    num_servers = 2
Da Zheng's avatar
Da Zheng committed
368
369
    num_trainers = 2
    num_samplers = 2
370
    generate_ip_config("kv_ip_mul_config.txt", 1, num_servers)
Da Zheng's avatar
Da Zheng committed
371
372
    # There are two trainer processes and each trainer process has two sampler processes.
    num_clients = num_trainers * (1 + num_samplers)
373
    ctx = mp.get_context("spawn")
374
375
    pserver_list = []
    pclient_list = []
376
377
    os.environ["DGL_NUM_SAMPLER"] = str(num_samplers)
    os.environ["DGL_NUM_SERVER"] = str(num_servers)
378
    for i in range(num_servers):
379
380
381
        pserver = ctx.Process(
            target=start_server_mul_role, args=(i, num_clients, num_servers)
        )
382
383
        pserver.start()
        pserver_list.append(pserver)
Da Zheng's avatar
Da Zheng committed
384
    for i in range(num_trainers):
385
        pclient = ctx.Process(target=start_client_mul_role, args=(i,))
386
387
        pclient.start()
        pclient_list.append(pclient)
Da Zheng's avatar
Da Zheng committed
388
    for i in range(num_trainers):
389
390
391
392
        pclient_list[i].join()
    for i in range(num_servers):
        pserver_list[i].join()

393
394

if __name__ == "__main__":
395
    test_partition_policy()
396
    test_kv_store()
397
    test_kv_multi_role()