client.py 3.18 KB
Newer Older
Chao Ma's avatar
Chao Ma committed
1
2
3
4
5
# This is a simple pytorch client demo shows how to use DGL distributed kvstore.
# In this demo, we initialize two embeddings on server and push/pull data to/from it.
import dgl
import time
import argparse
6
import torch as th
Chao Ma's avatar
Chao Ma committed
7
8
9
10
11
12
13
14
15
16
17
18
19

server_namebook, client_namebook = dgl.contrib.ReadNetworkConfigure('config.txt')

def start_client(args):
    # Initialize client and connect to server
    client = dgl.contrib.KVClient(
        client_id=args.id, 
        server_namebook=server_namebook, 
        client_addr=client_namebook[args.id])

    client.connect()

    # Initialize data on server
20
21
22
23
    client.init_data(name='embed_0', server_id=0, shape=[5, 3], init_type='zero')
    client.init_data(name='embed_0', server_id=1, shape=[6, 3], init_type='zero')
    client.init_data(name='embed_1', server_id=0, shape=[5], init_type='uniform', low=0.0, high=0.0)
    client.init_data(name='embed_1', server_id=1, shape=[6], init_type='uniform', low=0.0, high=0.0)
Chao Ma's avatar
Chao Ma committed
24

25
26
    data_0 = th.tensor([[0., 0., 0., ], [1., 1., 1.], [2., 2., 2.]])
    data_1 = th.tensor([0., 1., 2.])
Chao Ma's avatar
Chao Ma committed
27
28

    for i in range(5):
29
30
31
32
        client.push(name='embed_0', server_id=0, id_tensor=th.tensor([0, 2, 4]), data_tensor=data_0)
        client.push(name='embed_0', server_id=1, id_tensor=th.tensor([1, 3, 5]), data_tensor=data_0)
        client.push(name='embed_1', server_id=0, id_tensor=th.tensor([0, 2, 4]), data_tensor=data_1)
        client.push(name='embed_1', server_id=1, id_tensor=th.tensor([1, 3, 5]), data_tensor=data_1)
Chao Ma's avatar
Chao Ma committed
33
34
        client.push(name='server_embed', server_id=0, id_tensor=th.tensor([0, 2, 4]), data_tensor=data_1)
        client.push(name='server_embed', server_id=1, id_tensor=th.tensor([0, 2, 4]), data_tensor=data_1)
Chao Ma's avatar
Chao Ma committed
35
36
37
38

    client.barrier()

    if client.get_id() == 0:
39
        client.pull(name='embed_0', server_id=0, id_tensor=th.tensor([0, 1, 2, 3, 4]))
Chao Ma's avatar
Chao Ma committed
40
41
        msg_0 = client.pull_wait()
        assert msg_0.rank == 0
42
        client.pull(name='embed_0', server_id=1, id_tensor=th.tensor([0, 1, 2, 3, 4, 5]))
Chao Ma's avatar
Chao Ma committed
43
44
        msg_1 = client.pull_wait()
        assert msg_1.rank == 1
Chao Ma's avatar
Chao Ma committed
45
        print("embed_0:")
Chao Ma's avatar
Chao Ma committed
46
        print(th.cat([msg_0.data, msg_1.data]))
47
48

        client.pull(name='embed_1', server_id=0, id_tensor=th.tensor([0, 1, 2, 3, 4]))
Chao Ma's avatar
Chao Ma committed
49
50
        msg_0 = client.pull_wait()
        assert msg_0.rank == 0
51
        client.pull(name='embed_1', server_id=1, id_tensor=th.tensor([0, 1, 2, 3, 4, 5]))
Chao Ma's avatar
Chao Ma committed
52
53
        msg_1 = client.pull_wait()
        assert msg_1.rank == 1
54
        print("embed_1:")
Chao Ma's avatar
Chao Ma committed
55
        print(th.cat([msg_0.data, msg_1.data]))
Chao Ma's avatar
Chao Ma committed
56

Chao Ma's avatar
Chao Ma committed
57
        client.pull(name='server_embed', server_id=0, id_tensor=th.tensor([0, 1, 2, 3, 4]))
Chao Ma's avatar
Chao Ma committed
58
59
        msg_0 = client.pull_wait()
        assert msg_0.rank == 0
Chao Ma's avatar
Chao Ma committed
60
        client.pull(name='server_embed', server_id=1, id_tensor=th.tensor([0, 1, 2, 3, 4]))
Chao Ma's avatar
Chao Ma committed
61
62
        msg_1 = client.pull_wait()
        assert msg_1.rank == 1
Chao Ma's avatar
Chao Ma committed
63
        print("server_embed:")
Chao Ma's avatar
Chao Ma committed
64
        print(th.cat([msg_0.data, msg_1.data]))
Chao Ma's avatar
Chao Ma committed
65

Chao Ma's avatar
Chao Ma committed
66
67
68
69
70
71
72
73
74
75
    # Shut-down all the servers
    if client.get_id() == 0:
        client.shut_down()

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='kvstore')
    parser.add_argument("--id", type=int, default=0, help="node ID")
    args = parser.parse_args()
    time.sleep(2)  # wait server start
    start_client(args)