test_kvstore.py 4.71 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import backend as F
import numpy as np
import scipy as sp
import dgl
from dgl import utils
from dgl.contrib import KVServer
from dgl.contrib import KVClient
from numpy.testing import assert_array_equal

import os
import time

num_entries = 10
dim_size = 3

server_namebook = {0:[0, '127.0.0.1', 30070, 1]}

data_0 = F.zeros((num_entries, dim_size), F.float32, F.cpu())
g2l_0 = F.arange(0, num_entries)
partition_0 = F.zeros(num_entries, F.int64, F.cpu())

data_1 = F.zeros((num_entries*2, dim_size), F.float32, F.cpu())
g2l_1 = F.arange(0, num_entries*2)
partition_1 = F.zeros(num_entries*2, F.int64, F.cpu())

Chao Ma's avatar
Chao Ma committed
26
27
28
29
data_3 = F.zeros((num_entries, dim_size), F.int64, F.cpu())
data_4 = F.zeros((num_entries, dim_size), F.float64, F.cpu())
data_5 = F.zeros((num_entries, dim_size), F.int32, F.cpu())

30
31
32
33
def start_server():
    my_server = KVServer(server_id=0, server_namebook=server_namebook, num_client=1)
    my_server.set_global2local(name='data_0', global2local=g2l_0)
    my_server.set_global2local(name='data_1', global2local=g2l_1)
Chao Ma's avatar
Chao Ma committed
34
35
36
    my_server.set_global2local(name='data_3', global2local=g2l_0)
    my_server.set_global2local(name='data_4', global2local=g2l_0)
    my_server.set_global2local(name='data_5', global2local=g2l_0)
37
38
    my_server.set_partition_book(name='data_0', partition_book=partition_0)
    my_server.set_partition_book(name='data_1', partition_book=partition_1)
Chao Ma's avatar
Chao Ma committed
39
40
41
    my_server.set_partition_book(name='data_3', partition_book=partition_0)
    my_server.set_partition_book(name='data_4', partition_book=partition_0)
    my_server.set_partition_book(name='data_5', partition_book=partition_0)
42
43
    my_server.init_data(name='data_0', data_tensor=data_0)
    my_server.init_data(name='data_1', data_tensor=data_1)
Chao Ma's avatar
Chao Ma committed
44
45
46
    my_server.init_data(name='data_3', data_tensor=data_3)
    my_server.init_data(name='data_4', data_tensor=data_4)
    my_server.init_data(name='data_5', data_tensor=data_5)
47
48
49
50
51
52
53
54

    my_server.start()


def start_client():
    my_client = KVClient(server_namebook=server_namebook)
    my_client.connect()

Chao Ma's avatar
Chao Ma committed
55
56
    my_client.init_data(name='data_2', shape=(num_entries, dim_size), dtype=F.float32, target_name='data_0')
    print("Init data from client..")
57

58
    name_list = my_client.get_data_name_list()
Chao Ma's avatar
Chao Ma committed
59
    assert len(name_list) == 6
60
61
    assert 'data_0' in name_list
    assert 'data_1' in name_list
62
    assert 'data_2' in name_list
Chao Ma's avatar
Chao Ma committed
63
64
65
    assert 'data_3' in name_list
    assert 'data_4' in name_list
    assert 'data_5' in name_list
66
67
68
69
70
71
72
73
74

    meta_0 = my_client.get_data_meta('data_0')
    assert meta_0[0] == F.float32
    assert_array_equal(meta_0[2], partition_0)

    meta_1 = my_client.get_data_meta('data_1')
    assert meta_1[0] == F.float32
    assert_array_equal(meta_1[2], partition_1)

75
76
77
    meta_2 = my_client.get_data_meta('data_2')
    assert meta_2[0] == F.float32
    assert_array_equal(meta_2[2], partition_0)
78

Chao Ma's avatar
Chao Ma committed
79
80
81
82
83
84
85
86
87
88
89
90
    meta_3 = my_client.get_data_meta('data_3')
    assert meta_3[0] == F.int64
    assert_array_equal(meta_3[2], partition_0)    

    meta_4 = my_client.get_data_meta('data_4')
    assert meta_4[0] == F.float64
    assert_array_equal(meta_3[2], partition_0)  

    meta_5 = my_client.get_data_meta('data_5')
    assert meta_5[0] == F.int32
    assert_array_equal(meta_3[2], partition_0)  

91
92
    my_client.push(name='data_0', id_tensor=F.tensor([0, 1, 2]), data_tensor=F.tensor([[1.,1.,1.],[2.,2.,2.],[3.,3.,3.]]))
    my_client.push(name='data_2', id_tensor=F.tensor([0, 1, 2]), data_tensor=F.tensor([[1.,1.,1.],[2.,2.,2.],[3.,3.,3.]]))
Chao Ma's avatar
Chao Ma committed
93
94
95
    my_client.push(name='data_3', id_tensor=F.tensor([0, 1, 2]), data_tensor=F.tensor([[1,1,1],[2,2,2],[3,3,3]]))
    my_client.push(name='data_4', id_tensor=F.tensor([0, 1, 2]), data_tensor=F.tensor([[1.,1.,1.],[2.,2.,2.],[3.,3.,3.]], F.float64))
    my_client.push(name='data_5', id_tensor=F.tensor([0, 1, 2]), data_tensor=F.tensor([[1,1,1],[2,2,2],[3,3,3]], F.int32))
96
97
98

    target = F.tensor([[1.,1.,1.],[2.,2.,2.],[3.,3.,3.]])

99
100
101
102
    res = my_client.pull(name='data_0', id_tensor=F.tensor([0, 1, 2]))
    assert_array_equal(res, target)

    res = my_client.pull(name='data_2', id_tensor=F.tensor([0, 1, 2]))
103
104
    assert_array_equal(res, target)

Chao Ma's avatar
Chao Ma committed
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
    target = F.tensor([[1,1,1],[2,2,2],[3,3,3]])

    res = my_client.pull(name='data_3', id_tensor=F.tensor([0, 1, 2]))
    assert_array_equal(res, target)

    target = F.tensor([[1.,1.,1.],[2.,2.,2.],[3.,3.,3.]], F.float64)

    res = my_client.pull(name='data_4', id_tensor=F.tensor([0, 1, 2]))
    assert_array_equal(res, target)

    target = F.tensor([[1,1,1],[2,2,2],[3,3,3]], F.int32)

    res = my_client.pull(name='data_5', id_tensor=F.tensor([0, 1, 2]))
    assert_array_equal(res, target)

120
121
122
123
124
125
126
127
128
129
    my_client.shut_down()


if __name__ == '__main__':
    pid = os.fork()
    if pid == 0:
        start_server()
    else:
        time.sleep(2) # wait trainer start
        start_client()