test_kvstore.py 5.36 KB
Newer Older
1
2
3
import os
import time

4
5
6
7
8
import backend as F
import numpy as np
import scipy as sp
from numpy.testing import assert_array_equal

9
10
11
import dgl
from dgl import utils
from dgl.contrib import KVClient, KVServer
12
13
14
15

num_entries = 10
dim_size = 3

16
server_namebook = {0: [0, "127.0.0.1", 30070, 1]}
17
18
19
20
21

data_0 = F.zeros((num_entries, dim_size), F.float32, F.cpu())
g2l_0 = F.arange(0, num_entries)
partition_0 = F.zeros(num_entries, F.int64, F.cpu())

22
23
24
data_1 = F.zeros((num_entries * 2, dim_size), F.float32, F.cpu())
g2l_1 = F.arange(0, num_entries * 2)
partition_1 = F.zeros(num_entries * 2, F.int64, F.cpu())
25

Chao Ma's avatar
Chao Ma committed
26
27
28
29
data_3 = F.zeros((num_entries, dim_size), F.int64, F.cpu())
data_4 = F.zeros((num_entries, dim_size), F.float64, F.cpu())
data_5 = F.zeros((num_entries, dim_size), F.int32, F.cpu())

30

31
def start_server():
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
    my_server = KVServer(
        server_id=0, server_namebook=server_namebook, num_client=1
    )
    my_server.set_global2local(name="data_0", global2local=g2l_0)
    my_server.set_global2local(name="data_1", global2local=g2l_1)
    my_server.set_global2local(name="data_3", global2local=g2l_0)
    my_server.set_global2local(name="data_4", global2local=g2l_0)
    my_server.set_global2local(name="data_5", global2local=g2l_0)
    my_server.set_partition_book(name="data_0", partition_book=partition_0)
    my_server.set_partition_book(name="data_1", partition_book=partition_1)
    my_server.set_partition_book(name="data_3", partition_book=partition_0)
    my_server.set_partition_book(name="data_4", partition_book=partition_0)
    my_server.set_partition_book(name="data_5", partition_book=partition_0)
    my_server.init_data(name="data_0", data_tensor=data_0)
    my_server.init_data(name="data_1", data_tensor=data_1)
    my_server.init_data(name="data_3", data_tensor=data_3)
    my_server.init_data(name="data_4", data_tensor=data_4)
    my_server.init_data(name="data_5", data_tensor=data_5)
50
51
52
53
54
55
56
57

    my_server.start()


def start_client():
    my_client = KVClient(server_namebook=server_namebook)
    my_client.connect()

58
59
60
61
62
63
    my_client.init_data(
        name="data_2",
        shape=(num_entries, dim_size),
        dtype=F.float32,
        target_name="data_0",
    )
Chao Ma's avatar
Chao Ma committed
64
    print("Init data from client..")
65

66
    name_list = my_client.get_data_name_list()
Chao Ma's avatar
Chao Ma committed
67
    assert len(name_list) == 6
68
69
70
71
72
73
74
75
    assert "data_0" in name_list
    assert "data_1" in name_list
    assert "data_2" in name_list
    assert "data_3" in name_list
    assert "data_4" in name_list
    assert "data_5" in name_list

    meta_0 = my_client.get_data_meta("data_0")
76
    assert meta_0[0] == F.float32
77
    assert meta_0[1] == tuple(F.shape(data_0))
78
79
    assert_array_equal(meta_0[2], partition_0)

80
    meta_1 = my_client.get_data_meta("data_1")
81
    assert meta_1[0] == F.float32
82
    assert meta_1[1] == tuple(F.shape(data_1))
83
84
    assert_array_equal(meta_1[2], partition_1)

85
    meta_2 = my_client.get_data_meta("data_2")
86
    assert meta_2[0] == F.float32
87
    assert meta_2[1] == tuple(F.shape(data_0))
88
    assert_array_equal(meta_2[2], partition_0)
89

90
    meta_3 = my_client.get_data_meta("data_3")
Chao Ma's avatar
Chao Ma committed
91
    assert meta_3[0] == F.int64
92
    assert meta_3[1] == tuple(F.shape(data_3))
93
    assert_array_equal(meta_3[2], partition_0)
Chao Ma's avatar
Chao Ma committed
94

95
    meta_4 = my_client.get_data_meta("data_4")
Chao Ma's avatar
Chao Ma committed
96
    assert meta_4[0] == F.float64
97
    assert meta_4[1] == tuple(F.shape(data_4))
98
    assert_array_equal(meta_3[2], partition_0)
Chao Ma's avatar
Chao Ma committed
99

100
    meta_5 = my_client.get_data_meta("data_5")
Chao Ma's avatar
Chao Ma committed
101
    assert meta_5[0] == F.int32
102
    assert meta_5[1] == tuple(F.shape(data_5))
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
    assert_array_equal(meta_3[2], partition_0)

    my_client.push(
        name="data_0",
        id_tensor=F.tensor([0, 1, 2]),
        data_tensor=F.tensor(
            [[1.0, 1.0, 1.0], [2.0, 2.0, 2.0], [3.0, 3.0, 3.0]]
        ),
    )
    my_client.push(
        name="data_2",
        id_tensor=F.tensor([0, 1, 2]),
        data_tensor=F.tensor(
            [[1.0, 1.0, 1.0], [2.0, 2.0, 2.0], [3.0, 3.0, 3.0]]
        ),
    )
    my_client.push(
        name="data_3",
        id_tensor=F.tensor([0, 1, 2]),
        data_tensor=F.tensor([[1, 1, 1], [2, 2, 2], [3, 3, 3]]),
    )
    my_client.push(
        name="data_4",
        id_tensor=F.tensor([0, 1, 2]),
        data_tensor=F.tensor(
            [[1.0, 1.0, 1.0], [2.0, 2.0, 2.0], [3.0, 3.0, 3.0]], F.float64
        ),
    )
    my_client.push(
        name="data_5",
        id_tensor=F.tensor([0, 1, 2]),
        data_tensor=F.tensor([[1, 1, 1], [2, 2, 2], [3, 3, 3]], F.int32),
    )

    target = F.tensor([[1.0, 1.0, 1.0], [2.0, 2.0, 2.0], [3.0, 3.0, 3.0]])

    res = my_client.pull(name="data_0", id_tensor=F.tensor([0, 1, 2]))
140
141
    assert_array_equal(res, target)

142
    res = my_client.pull(name="data_2", id_tensor=F.tensor([0, 1, 2]))
143
144
    assert_array_equal(res, target)

145
    target = F.tensor([[1, 1, 1], [2, 2, 2], [3, 3, 3]])
Chao Ma's avatar
Chao Ma committed
146

147
    res = my_client.pull(name="data_3", id_tensor=F.tensor([0, 1, 2]))
Chao Ma's avatar
Chao Ma committed
148
149
    assert_array_equal(res, target)

150
151
152
    target = F.tensor(
        [[1.0, 1.0, 1.0], [2.0, 2.0, 2.0], [3.0, 3.0, 3.0]], F.float64
    )
Chao Ma's avatar
Chao Ma committed
153

154
    res = my_client.pull(name="data_4", id_tensor=F.tensor([0, 1, 2]))
Chao Ma's avatar
Chao Ma committed
155
156
    assert_array_equal(res, target)

157
    target = F.tensor([[1, 1, 1], [2, 2, 2], [3, 3, 3]], F.int32)
Chao Ma's avatar
Chao Ma committed
158

159
    res = my_client.pull(name="data_5", id_tensor=F.tensor([0, 1, 2]))
Chao Ma's avatar
Chao Ma committed
160
161
    assert_array_equal(res, target)

162
163
164
    my_client.shut_down()


165
if __name__ == "__main__":
166
167
168
169
    pid = os.fork()
    if pid == 0:
        start_server()
    else:
170
        time.sleep(2)  # wait trainer start
171
        start_client()