Unverified Commit b133abb8 authored by Chao Ma's avatar Chao Ma Committed by GitHub
Browse files

[KVStore] New kvstore used by DGL-KE (#1263)

* new kvstore

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* test warning

* update

* update

* udpate

* update

* update

* update

* update

* small fix

* small fix

* get group count

* update

* update

* make file

* update

* use addr

* get id

* partition book

* update

* partition

* barrier

* update

* loop count

* update

* update

* update

* update

* update

* update

* update

* update

* update

* add mxnet demo

* update ip

* update

* update

* update

* random

* update

* update

* update

* update

* update

* update

* fix lint

* fix lint

* fix lint
parent 49fe5b3c
## Usage of DGL distributed KVStore
This is a simple example shows how to use DGL distributed KVStore on MXNet locally. In this example, we start 4 servers and 4 clients, and you can first run the command:
./run_server.sh
And when you see the message
start server 1 on 127.0.0.1:50051
start server 2 on 127.0.0.1:50052
start server 0 on 127.0.0.1:50050
start server 3 on 127.0.0.1:50053
you can start client by:
./run_client.sh
# This is a simple MXNet server demo shows how to use DGL distributed kvstore. import os
import dgl
import argparse import argparse
import mxnet as mx
import time import time
import dgl
from dgl.contrib import KVClient
import mxnet as mx
partition = mx.nd.array([0,0,1,1,2,2,3,3], dtype='int64')
ID = [] ID = []
ID.append(mx.nd.array([0,1], dtype='int64')) ID.append(mx.nd.array([0,1], dtype='int64'))
ID.append(mx.nd.array([2,3], dtype='int64')) ID.append(mx.nd.array([2,3], dtype='int64'))
...@@ -16,44 +21,60 @@ DATA.append(mx.nd.array([[2.,2.,2.,],[2.,2.,2.,]])) ...@@ -16,44 +21,60 @@ DATA.append(mx.nd.array([[2.,2.,2.,],[2.,2.,2.,]]))
DATA.append(mx.nd.array([[3.,3.,3.,],[3.,3.,3.,]])) DATA.append(mx.nd.array([[3.,3.,3.,],[3.,3.,3.,]]))
DATA.append(mx.nd.array([[4.,4.,4.,],[4.,4.,4.,]])) DATA.append(mx.nd.array([[4.,4.,4.,],[4.,4.,4.,]]))
edata_partition_book = {'edata':mx.nd.array([0,0,1,1,2,2,3,3], dtype='int64')}
ndata_partition_book = {'ndata':mx.nd.array([0,0,1,1,2,2,3,3], dtype='int64')}
def start_client(): class ArgParser(argparse.ArgumentParser):
def __init__(self):
super(ArgParser, self).__init__()
self.add_argument('--ip_config', type=str, default='ip_config.txt',
help='IP configuration file of kvstore.')
self.add_argument('--num_worker', type=int, default=2,
help='Number of worker (client nodes) on single-machine.')
def start_client(args):
"""Start client
"""
server_namebook = dgl.contrib.read_ip_config(filename=args.ip_config)
my_client = KVClient(server_namebook=server_namebook)
my_client.connect()
if my_client.get_id() % args.num_worker == 0:
my_client.set_partition_book(name='entity_embed', partition_book=partition)
else:
time.sleep(3) time.sleep(3)
my_client.set_partition_book(name='entity_embed')
client = dgl.contrib.start_client(ip_config='ip_config.txt', my_client.print()
ndata_partition_book=ndata_partition_book,
edata_partition_book=edata_partition_book,
close_shared_mem=True)
my_client.barrier()
tensor_edata = client.pull(name='edata', id_tensor=mx.nd.array([0,1,2,3,4,5,6,7], dtype='int64')) print("send request...")
tensor_ndata = client.pull(name='ndata', id_tensor=mx.nd.array([0,1,2,3,4,5,6,7], dtype='int64'))
print(tensor_edata) for i in range(4):
client.barrier() my_client.push(name='entity_embed', id_tensor=ID[i], data_tensor=DATA[i])
print(tensor_ndata) my_client.barrier()
client.barrier()
client.push(name='edata', id_tensor=ID[client.get_id()], data_tensor=DATA[client.get_id()]) if my_client.get_id() % args.num_worker == 0:
client.push(name='ndata', id_tensor=ID[client.get_id()], data_tensor=DATA[client.get_id()]) res = my_client.pull(name='entity_embed', id_tensor=mx.nd.array([0,1,2,3,4,5,6,7], dtype='int64'))
print(res)
client.barrier() my_client.barrier()
tensor_edata = client.pull(name='edata', id_tensor=mx.nd.array([0,1,2,3,4,5,6,7], dtype='int64')) my_client.push(name='entity_embed', id_tensor=ID[my_client.get_machine_id()], data_tensor=mx.nd.array([[0.,0.,0.],[0.,0.,0.]]))
tensor_ndata = client.pull(name='ndata', id_tensor=mx.nd.array([0,1,2,3,4,5,6,7], dtype='int64'))
print(tensor_edata) my_client.barrier()
client.barrier()
print(tensor_ndata) if my_client.get_id() % args.num_worker == 0:
client.barrier() res = my_client.pull(name='entity_embed', id_tensor=mx.nd.array([0,1,2,3,4,5,6,7], dtype='int64'))
print(res)
if client.get_id() == 0: my_client.shut_down()
client.shut_down()
if __name__ == '__main__':
start_client() if __name__ == '__main__':
\ No newline at end of file args = ArgParser().parse_args()
start_client(args)
\ No newline at end of file
0 127.0.0.1 50050 0 172.31.6.94 30050 2
1 127.0.0.1 50051 1 172.31.4.10 30050 2
2 127.0.0.1 50052 2 172.31.11.99 30050 2
3 127.0.0.1 50053 3 172.31.2.252 30050 2
\ No newline at end of file \ No newline at end of file
DGLBACKEND=mxnet python3 client.py &
DGLBACKEND=mxnet python3 client.py &
DGLBACKEND=mxnet python3 client.py &
DGLBACKEND=mxnet python3 client.py
\ No newline at end of file
DGLBACKEND=mxnet python3 server.py --id 0 &
DGLBACKEND=mxnet python3 server.py --id 1 &
DGLBACKEND=mxnet python3 server.py --id 2 &
DGLBACKEND=mxnet python3 server.py --id 3
\ No newline at end of file
# This is a simple MXNet server demo shows how to use DGL distributed kvstore. import os
import dgl
import argparse import argparse
import time
import dgl
from dgl.contrib import KVServer
import mxnet as mx import mxnet as mx
ndata_g2l = [] g2l = []
edata_g2l = [] g2l.append(mx.nd.array([0,1,0,0,0,0,0,0], dtype='int64'))
g2l.append(mx.nd.array([0,0,0,1,0,0,0,0], dtype='int64'))
g2l.append(mx.nd.array([0,0,0,0,0,1,0,0], dtype='int64'))
g2l.append(mx.nd.array([0,0,0,0,0,0,0,1], dtype='int64'))
data = []
data.append(mx.nd.array([[4.,4.,4.],[4.,4.,4.]]))
data.append(mx.nd.array([[3.,3.,3.],[3.,3.,3.]]))
data.append(mx.nd.array([[2.,2.,2.],[2.,2.,2.]]))
data.append(mx.nd.array([[1.,1.,1.],[1.,1.,1.]]))
ndata_g2l.append({'ndata':mx.nd.array([0,1,0,0,0,0,0,0], dtype='int64')})
ndata_g2l.append({'ndata':mx.nd.array([0,0,0,1,0,0,0,0], dtype='int64')})
ndata_g2l.append({'ndata':mx.nd.array([0,0,0,0,0,1,0,0], dtype='int64')})
ndata_g2l.append({'ndata':mx.nd.array([0,0,0,0,0,0,0,1], dtype='int64')})
edata_g2l.append({'edata':mx.nd.array([0,1,0,0,0,0,0,0], dtype='int64')}) class ArgParser(argparse.ArgumentParser):
edata_g2l.append({'edata':mx.nd.array([0,0,0,1,0,0,0,0], dtype='int64')}) def __init__(self):
edata_g2l.append({'edata':mx.nd.array([0,0,0,0,0,1,0,0], dtype='int64')}) super(ArgParser, self).__init__()
edata_g2l.append({'edata':mx.nd.array([0,0,0,0,0,0,0,1], dtype='int64')})
self.add_argument('--server_id', type=int, default=0,
help='Unique ID of each server.')
self.add_argument('--ip_config', type=str, default='ip_config.txt',
help='IP configuration file of kvstore.')
self.add_argument('--num_client', type=int, default=1,
help='Total number of client nodes.')
DATA = []
DATA.append(mx.nd.array([[4.,4.,4.,],[4.,4.,4.,]]))
DATA.append(mx.nd.array([[3.,3.,3.,],[3.,3.,3.,]]))
DATA.append(mx.nd.array([[2.,2.,2.,],[2.,2.,2.,]]))
DATA.append(mx.nd.array([[1.,1.,1.,],[1.,1.,1.,]]))
def start_server(args): def start_server(args):
"""Start kvstore service
"""
server_namebook = dgl.contrib.read_ip_config(filename=args.ip_config)
dgl.contrib.start_server( my_server = KVServer(server_id=args.server_id, server_namebook=server_namebook, num_client=args.num_client)
server_id=args.id,
ip_config='ip_config.txt',
num_client=4,
ndata={'ndata':DATA[args.id]},
edata={'edata':DATA[args.id]},
ndata_g2l=ndata_g2l[args.id],
edata_g2l=edata_g2l[args.id])
if my_server.get_id() % my_server.get_group_count() == 0: # master server
my_server.set_global2local(name='entity_embed', global2local=g2l[my_server.get_machine_id()])
my_server.init_data(name='entity_embed', data_tensor=data[my_server.get_machine_id()])
else:
time.sleep(3)
my_server.set_global2local(name='entity_embed')
my_server.init_data(name='entity_embed')
if __name__ == '__main__': my_server.print()
parser = argparse.ArgumentParser(description='kvstore')
parser.add_argument("--id", type=int, default=0, help="node ID") my_server.start()
args = parser.parse_args()
if __name__ == '__main__':
args = ArgParser().parse_args()
start_server(args) start_server(args)
\ No newline at end of file
## Usage of DGL distributed KVStore
This is a simple example shows how to use DGL distributed KVStore on Pytorch locally. In this example, we start 4 servers and 4 clients, and you can first run the command:
./run_server.sh
And when you see the message
start server 1 on 127.0.0.1:40051
start server 2 on 127.0.0.1:40052
start server 0 on 127.0.0.1:40050
start server 3 on 127.0.0.1:40053
you can start client by:
./run_client.sh
\ No newline at end of file
# This is a simple MXNet server demo shows how to use DGL distributed kvstore. import os
import dgl
import argparse import argparse
import torch as th
import time import time
import dgl
from dgl.contrib import KVClient
import torch as th
partition = th.tensor([0,0,1,1,2,2,3,3])
ID = [] ID = []
ID.append(th.tensor([0,1])) ID.append(th.tensor([0,1]))
ID.append(th.tensor([2,3])) ID.append(th.tensor([2,3]))
...@@ -16,43 +21,60 @@ DATA.append(th.tensor([[2.,2.,2.,],[2.,2.,2.,]])) ...@@ -16,43 +21,60 @@ DATA.append(th.tensor([[2.,2.,2.,],[2.,2.,2.,]]))
DATA.append(th.tensor([[3.,3.,3.,],[3.,3.,3.,]])) DATA.append(th.tensor([[3.,3.,3.,],[3.,3.,3.,]]))
DATA.append(th.tensor([[4.,4.,4.,],[4.,4.,4.,]])) DATA.append(th.tensor([[4.,4.,4.,],[4.,4.,4.,]]))
edata_partition_book = {'edata':th.tensor([0,0,1,1,2,2,3,3])}
ndata_partition_book = {'ndata':th.tensor([0,0,1,1,2,2,3,3])}
def start_client(): class ArgParser(argparse.ArgumentParser):
def __init__(self):
super(ArgParser, self).__init__()
self.add_argument('--ip_config', type=str, default='ip_config.txt',
help='IP configuration file of kvstore.')
self.add_argument('--num_worker', type=int, default=2,
help='Number of worker (client nodes) on single-machine.')
def start_client(args):
"""Start client
"""
server_namebook = dgl.contrib.read_ip_config(filename=args.ip_config)
my_client = KVClient(server_namebook=server_namebook)
my_client.connect()
if my_client.get_id() % args.num_worker == 0:
my_client.set_partition_book(name='entity_embed', partition_book=partition)
else:
time.sleep(3) time.sleep(3)
my_client.set_partition_book(name='entity_embed')
client = dgl.contrib.start_client(ip_config='ip_config.txt', my_client.print()
ndata_partition_book=ndata_partition_book,
edata_partition_book=edata_partition_book,
close_shared_mem=True)
tensor_edata = client.pull(name='edata', id_tensor=th.tensor([0,1,2,3,4,5,6,7])) my_client.barrier()
tensor_ndata = client.pull(name='ndata', id_tensor=th.tensor([0,1,2,3,4,5,6,7]))
print(tensor_edata) print("send request...")
client.barrier()
print(tensor_ndata) for i in range(4):
client.barrier() my_client.push(name='entity_embed', id_tensor=ID[i], data_tensor=DATA[i])
client.push(name='edata', id_tensor=ID[client.get_id()], data_tensor=DATA[client.get_id()]) my_client.barrier()
client.push(name='ndata', id_tensor=ID[client.get_id()], data_tensor=DATA[client.get_id()])
client.barrier() if my_client.get_id() % args.num_worker == 0:
res = my_client.pull(name='entity_embed', id_tensor=th.tensor([0,1,2,3,4,5,6,7]))
print(res)
tensor_edata = client.pull(name='edata', id_tensor=th.tensor([0,1,2,3,4,5,6,7])) my_client.barrier()
tensor_ndata = client.pull(name='ndata', id_tensor=th.tensor([0,1,2,3,4,5,6,7]))
print(tensor_edata) my_client.push(name='entity_embed', id_tensor=ID[my_client.get_machine_id()], data_tensor=th.tensor([[0.,0.,0.],[0.,0.,0.]]))
client.barrier()
print(tensor_ndata) my_client.barrier()
client.barrier()
if client.get_id() == 0: if my_client.get_id() % args.num_worker == 0:
client.shut_down() res = my_client.pull(name='entity_embed', id_tensor=th.tensor([0,1,2,3,4,5,6,7]))
print(res)
if __name__ == '__main__': my_client.shut_down()
start_client()
\ No newline at end of file if __name__ == '__main__':
args = ArgParser().parse_args()
start_client(args)
\ No newline at end of file
0 127.0.0.1 50050 0 172.31.6.94 30050 2
1 127.0.0.1 50051 1 172.31.4.10 30050 2
2 127.0.0.1 50052 2 172.31.11.99 30050 2
3 127.0.0.1 50053 3 172.31.2.252 30050 2
\ No newline at end of file \ No newline at end of file
python3 client.py &
python3 client.py &
python3 client.py &
python3 client.py
\ No newline at end of file
python3 server.py --id 0 &
python3 server.py --id 1 &
python3 server.py --id 2 &
python3 server.py --id 3
\ No newline at end of file
# This is a simple MXNet server demo shows how to use DGL distributed kvstore. import os
import dgl
import argparse import argparse
import time
import dgl
from dgl.contrib import KVServer
import torch as th import torch as th
ndata_g2l = [] g2l = []
edata_g2l = [] g2l.append(th.tensor([0,1,0,0,0,0,0,0]))
g2l.append(th.tensor([0,0,0,1,0,0,0,0]))
g2l.append(th.tensor([0,0,0,0,0,1,0,0]))
g2l.append(th.tensor([0,0,0,0,0,0,0,1]))
data = []
data.append(th.tensor([[4.,4.,4.],[4.,4.,4.]]))
data.append(th.tensor([[3.,3.,3.],[3.,3.,3.]]))
data.append(th.tensor([[2.,2.,2.],[2.,2.,2.]]))
data.append(th.tensor([[1.,1.,1.],[1.,1.,1.]]))
ndata_g2l.append({'ndata':th.tensor([0,1,0,0,0,0,0,0])})
ndata_g2l.append({'ndata':th.tensor([0,0,0,1,0,0,0,0])})
ndata_g2l.append({'ndata':th.tensor([0,0,0,0,0,1,0,0])})
ndata_g2l.append({'ndata':th.tensor([0,0,0,0,0,0,0,1])})
edata_g2l.append({'edata':th.tensor([0,1,0,0,0,0,0,0])}) class ArgParser(argparse.ArgumentParser):
edata_g2l.append({'edata':th.tensor([0,0,0,1,0,0,0,0])}) def __init__(self):
edata_g2l.append({'edata':th.tensor([0,0,0,0,0,1,0,0])}) super(ArgParser, self).__init__()
edata_g2l.append({'edata':th.tensor([0,0,0,0,0,0,0,1])})
self.add_argument('--server_id', type=int, default=0,
help='Unique ID of each server.')
self.add_argument('--ip_config', type=str, default='ip_config.txt',
help='IP configuration file of kvstore.')
self.add_argument('--num_client', type=int, default=1,
help='Total number of client nodes.')
DATA = []
DATA.append(th.tensor([[4.,4.,4.,],[4.,4.,4.,]]))
DATA.append(th.tensor([[3.,3.,3.,],[3.,3.,3.,]]))
DATA.append(th.tensor([[2.,2.,2.,],[2.,2.,2.,]]))
DATA.append(th.tensor([[1.,1.,1.,],[1.,1.,1.,]]))
def start_server(args): def start_server(args):
"""Start kvstore service
"""
server_namebook = dgl.contrib.read_ip_config(filename=args.ip_config)
dgl.contrib.start_server( my_server = KVServer(server_id=args.server_id, server_namebook=server_namebook, num_client=args.num_client)
server_id=args.id,
ip_config='ip_config.txt',
num_client=4,
ndata={'ndata':DATA[args.id]},
edata={'edata':DATA[args.id]},
ndata_g2l=ndata_g2l[args.id],
edata_g2l=edata_g2l[args.id])
if my_server.get_id() % my_server.get_group_count() == 0: # master server
my_server.set_global2local(name='entity_embed', global2local=g2l[my_server.get_machine_id()])
my_server.init_data(name='entity_embed', data_tensor=data[my_server.get_machine_id()])
else:
time.sleep(3)
my_server.set_global2local(name='entity_embed')
my_server.init_data(name='entity_embed')
if __name__ == '__main__': my_server.print()
parser = argparse.ArgumentParser(description='kvstore')
parser.add_argument("--id", type=int, default=0, help="node ID") my_server.start()
args = parser.parse_args()
if __name__ == '__main__':
args = ArgParser().parse_args()
start_server(args) start_server(args)
\ No newline at end of file
...@@ -2,4 +2,3 @@ from . import sampling ...@@ -2,4 +2,3 @@ from . import sampling
from . import graph_store from . import graph_store
from .dis_kvstore import KVClient, KVServer from .dis_kvstore import KVClient, KVServer
from .dis_kvstore import read_ip_config from .dis_kvstore import read_ip_config
from .dis_kvstore import start_server, start_client
\ No newline at end of file
...@@ -11,179 +11,117 @@ from .. import backend as F ...@@ -11,179 +11,117 @@ from .. import backend as F
from .._ffi.ndarray import empty_shared_mem from .._ffi.ndarray import empty_shared_mem
import os import os
import time
import random
import numpy as np import numpy as np
import socket import socket
if os.name != 'nt': if os.name != 'nt':
import fcntl import fcntl
import struct import struct
GARBAGE_COLLECTION_COUNT = 2000 # Perform grabage collection when message count is large than 2000
def read_ip_config(filename): def read_ip_config(filename):
"""Read networking configuration from file. """Read network configuration information of kvstore from file.
The format of configuration file should be:
Format: [machine_id] [ip] [base_port] [server_count]
[server_id] [ip] [port] 0 172.31.40.143 30050 2
1 172.31.36.140 30050 2
2 172.31.47.147 30050 2
3 172.31.30.180 30050 2
0 172.31.40.143 50050 Note that, DGL KVStore supports multiple servers that can shared data with each other
1 172.31.36.140 50050 on the same machine via shared-tensor. So the server_count should be >= 1.
2 172.31.47.147 50050
3 172.31.30.180 50050
Parameters Parameters
---------- ----------
filename : str filename : str
name of target configure file. name of configuration file.
Returns Returns
------- -------
dict dict
server namebook, e.g., server namebook. e.g.,
{0:'172.31.40.143:50050', [server_id]:[machine_id, ip, port]
1:'172.31.36.140:50050',
2:'172.31.47.147:50050', {0:'[0, 172.31.40.143, 30050],
3:'172.31.30.180:50050'} 1:'[0, 172.31.40.143, 30051],
2:'[1, 172.31.36.140, 30050],
3:'[1, 172.31.36.140, 30051],
4:'[2, 172.31.47.147, 30050],
5:'[2, 172.31.47.147, 30051],
6:'[3, 172.31.30.180, 30050],
7:'[3, 172.31.30.180, 30051]}
""" """
assert len(filename) > 0, 'filename cannot be empty.' assert len(filename) > 0, 'filename cannot be empty.'
server_namebook = {} server_namebook = {}
try: try:
server_id = 0
lines = [line.rstrip('\n') for line in open(filename)] lines = [line.rstrip('\n') for line in open(filename)]
for line in lines: for line in lines:
ID, ip, port = line.split(' ') machine_id, ip, port, server_count = line.split(' ')
server_namebook[int(ID)] = ip+':'+port for s_count in range(int(server_count)):
server_namebook[server_id] = [int(machine_id), ip, int(port)+s_count]
server_id += 1
except: except:
print("Incorrect format IP configure file, the data format on each line should be: [server_id] [ip] [port]") print("Error: data format on each line should be: [machine_id] [ip] [base_port] [server_count]")
return server_namebook return server_namebook
def start_server(server_id, ip_config, num_client, ndata, edata, ndata_g2l=None, edata_g2l=None, msg_queue_size=2*1024*1024*1024):
"""Start a kvserver node.
This function will be blocked by server.start() api.
Parameters
----------
server_id : int
ID of current server node (start from 0)
ip_config : str
Filename of server IP configure file.
num_client : int
Total number of client nodes
ndata : dict of tensor (mx.ndarray or torch.tensor)
node data
edata : dict of tensor (mx.ndarray or torch.tensor)
edge data
ndata_g2l : dict of tensor (mx.ndarray or torch.tensor)
global2local mapping of node data
edata_g2l : dict of tensor (mx.ndarray or torch.tensor)
global2local mapping of edge data
msg_queue_size : int
Size of message queue (2GB by default)
"""
assert server_id >= 0, 'server_id (%d) cannot be a negative number.' % server_id
assert len(ip_config) > 0, 'ip_config cannot be empty.'
assert num_client > 0, 'num_client (%d) cnanot be a negative number.' % num_client
server_namebook = read_ip_config(ip_config)
server = KVServer(
server_id=server_id,
server_addr=server_namebook[server_id],
num_client=num_client,
msg_queue_size=msg_queue_size)
for name, data in ndata.items():
server.init_data(name=name, data_tensor=data)
for name, data in edata.items():
server.init_data(name=name, data_tensor=data)
if ndata_g2l is not None:
for name, data in ndata_g2l.items():
server.set_global2local(name=name, global2local=data)
if edata_g2l is not None:
for name, data in edata_g2l.items():
server.set_global2local(name=name, global2local=data)
print("start server %d on %s" % (server.get_id(), server.get_addr()))
server.start()
def start_client(ip_config, ndata_partition_book, edata_partition_book, close_shared_mem=False, msg_queue_size=2*1024*1024*1024):
"""Start a kvclient node.
Parameters
----------
ip_config : str
Filename of server IP configure file.
ndata_partition_book : dict of tensor (mx.ndarray or torch.tensor)
Data mapping of node ID to server ID
edata_partition_book : dict of tensor (mx.ndarray or torch.tensor)
Data mapping of edge ID to server ID
close_shared_mem : bool
Close local shared-memory tensor access.
msg_queue_size : int
Size of message queue (2GB by default)
Returns
-------
KVClient
client handle
"""
assert len(ip_config) > 0, 'ip_config cannot be empty.'
assert len(ndata_partition_book) > 0, 'ndata_partition_book cannot be empty.'
assert len(edata_partition_book) > 0, 'edata_partition_book cannot be empty.'
server_namebook = read_ip_config(ip_config)
client = KVClient(server_namebook=server_namebook, close_shared_mem=close_shared_mem, msg_queue_size=msg_queue_size)
for name, data in ndata_partition_book.items():
client.set_partition_book(name=name, partition_book=data)
for name, data in edata_partition_book.items():
client.set_partition_book(name=name, partition_book=data)
client.connect()
print("Client %d (%s) connected to kvstore ..." % (client.get_id(), client.get_addr()))
return client
class KVServer(object): class KVServer(object):
"""KVServer is a lightweight key-value store service for DGL distributed training. """KVServer is a lightweight key-value store service for DGL distributed training.
In practice, developers can use KVServer to hold large graph features or graph embeddings In practice, developers can use KVServer to hold large-scale graph features or
across machines in a distributed setting. User can re-wriite _push_handler and _pull_handler graph embeddings across machines in a distributed setting. Also, user can re-wriite _push_handler()
to support flexibale algorithms. and _pull_handler() API to support flexibale algorithms.
DGL kvstore supports multiple-servers on single-machine. That means we can lunach many servers on the same machine and all of
these servers will share the same shared-memory tensor.
Note that, DO NOT use KVServer in multiple threads on Python because this behavior is not defined. Note that, DO NOT use KVServer in multiple threads on Python because this behavior is not defined.
For now, KVServer can only run in CPU, and we will support GPU KVServer in the future. For now, KVServer can only run in CPU. We will support GPU KVServer in the future.
Parameters Parameters
---------- ----------
server_id : int server_id : int
ID of current kvserver node (start from 0). KVServer's ID (start from 0).
server_addr : str server_namebook: dict
IP address and port of current KVServer node, e.g., '127.0.0.1:50051'. IP address namebook of KVServer, where key is the KVServer's ID
(start from 0) and value is the server's machine_id, IP address and port, e.g.,
{0:'[0, 172.31.40.143, 30050],
1:'[0, 172.31.40.143, 30051],
2:'[1, 172.31.36.140, 30050],
3:'[1, 172.31.36.140, 30051],
4:'[2, 172.31.47.147, 30050],
5:'[2, 172.31.47.147, 30051],
6:'[3, 172.31.30.180, 30050],
7:'[3, 172.31.30.180, 30051]}
num_client : int num_client : int
Total number of clients connecting to server. Total number of client nodes.
msg_queue_size : int queue_size : int
Size of message queue (2GB by default) Sise (bytes) of kvstore message queue buffer (~20 GB on default).
Note that the 20 GB is just an upper-bound and system will not allocate 20GB memory at once.
net_type : str net_type : str
networking type, e.g., 'socket' (default) or 'mpi' (do not support yet). networking type, e.g., 'socket' (default) or 'mpi' (do not support yet).
""" """
def __init__(self, server_id, server_addr, num_client, msg_queue_size=2 * 1024 * 1024 * 1024, net_type='socket'): def __init__(self, server_id, server_namebook, num_client, queue_size=2*1024*1024*1024, net_type='socket'):
assert server_id >= 0, 'server_id (%d) cannot be a negative number.' % server_id assert server_id >= 0, 'server_id (%d) cannot be a negative number.' % server_id
assert len(server_addr.split(':')) == 2, 'Incorrect IP format: %s' % server_addr assert len(server_namebook) > 0, 'server_namebook cannot be empty.'
assert num_client >= 0, 'num_client (%d) cannot be a negative number.' % num_client assert num_client >= 0, 'num_client (%d) cannot be a negative number.' % num_client
assert queue_size > 0, 'queue_size cannot be a negative number.'
assert net_type == 'socket' or net_type == 'mpi', 'net_type (%s) can only be \'socket\' or \'mpi\'.' % net_type assert net_type == 'socket' or net_type == 'mpi', 'net_type (%s) can only be \'socket\' or \'mpi\'.' % net_type
# check if target data has been initialized # check if target data has been initialized
...@@ -192,17 +130,24 @@ class KVServer(object): ...@@ -192,17 +130,24 @@ class KVServer(object):
self._data_store = {} self._data_store = {}
# Used for barrier() API on KVClient # Used for barrier() API on KVClient
self._barrier_count = 0 self._barrier_count = 0
# Server ID starts from zero # Server information
self._server_id = server_id self._server_id = server_id
self._addr = server_addr self._server_namebook = server_namebook
# client_namebook will be received from client nodes self._machine_id = server_namebook[server_id][0]
self._ip = server_namebook[server_id][1]
self._port = server_namebook[server_id][2]
self._group_count = self._get_group_count()
# client_namebook will be sent from remote client nodes
self._client_namebook = {} self._client_namebook = {}
self._client_count = num_client self._client_count = num_client
# Create C communicator of sender and receiver # Create C communicator of sender and receiver
self._sender = _create_sender(net_type, msg_queue_size) self._sender = _create_sender(net_type, queue_size)
self._receiver = _create_receiver(net_type, msg_queue_size) self._receiver = _create_receiver(net_type, queue_size)
# A naive garbage collocetion for kvstore # A naive garbage collocetion for kvstore
self._garbage_msg = [] self._garbage_msg = []
self._open_file_list = []
# record for total message count
self._msg_count = 0
def __del__(self): def __del__(self):
...@@ -211,34 +156,47 @@ class KVServer(object): ...@@ -211,34 +156,47 @@ class KVServer(object):
# Finalize C communicator of sender and receiver # Finalize C communicator of sender and receiver
_finalize_sender(self._sender) _finalize_sender(self._sender)
_finalize_receiver(self._receiver) _finalize_receiver(self._receiver)
# Delete temp file
for file in self._open_file_list:
if(os.path.exists(file)):
os.remove(file)
def set_global2local(self, name, global2local): def set_global2local(self, name, global2local=None):
"""Set a data mapping of global ID to local ID. """Set data of global ID to local ID.
Parameters Parameters
---------- ----------
name : str name : str
data name data name
global2local : list or tensor (mx.ndarray or torch.tensor) global2local : list or tensor (mx.ndarray or torch.tensor)
A data mapping of global ID to local ID. KVStore will use global ID automatically A data mapping of global ID to local ID. KVStore will use global ID by default
if this global2local is not been set. if the global2local is not been set.
Note that, if the global2local is None KVServer will read shared-tensor.
""" """
assert len(name) > 0, 'name cannot be empty.' assert len(name) > 0, 'name cannot be empty.'
assert len(global2local) > 0, 'global2local cannot be empty.'
if global2local is not None: # Create shared-tensor
if isinstance(global2local, list): if isinstance(global2local, list):
global2local = F.tensor(global2local) global2local = F.tensor(global2local)
shared_data = empty_shared_mem(name+'-g2l-', True, global2local.shape, 'int64')
shared_data = empty_shared_mem(name+'-g2l-'+str(self._server_id), True, global2local.shape, 'int64')
dlpack = shared_data.to_dlpack() dlpack = shared_data.to_dlpack()
self._data_store[name+'-g2l-'] = F.zerocopy_from_dlpack(dlpack) self._data_store[name+'-g2l-'] = F.zerocopy_from_dlpack(dlpack)
self._data_store[name+'-g2l-'][:] = global2local[:] self._data_store[name+'-g2l-'][:] = global2local[:]
self._write_data_shape(name+'-g2l-shape', global2local)
self._open_file_list.append(name+'-g2l-shape')
else: # Read shared-tensor
data_shape = self._read_data_shape(name+'-g2l-shape')
shared_data = empty_shared_mem(name+'-g2l-', False, data_shape, 'int64')
dlpack = shared_data.to_dlpack()
self._data_store[name+'-g2l-'] = F.zerocopy_from_dlpack(dlpack)
self._has_data.add(name+'-g2l-') self._has_data.add(name+'-g2l-')
def init_data(self, name, data_tensor): def init_data(self, name, data_tensor=None):
"""Initialize data on KVServer with data name. """Initialize data tensor on KVServe.
Parameters Parameters
---------- ----------
...@@ -246,19 +204,29 @@ class KVServer(object): ...@@ -246,19 +204,29 @@ class KVServer(object):
data name data name
data_tensor : tensor (mx.ndarray or torch.tensor) data_tensor : tensor (mx.ndarray or torch.tensor)
data tensor data tensor
Note that, if the data_tensor is None KVServer will read shared-tensor.
""" """
assert len(name) > 0, 'name cannot be empty.' assert len(name) > 0, 'name cannot be empty.'
assert len(data_tensor) > 0, 'data_tensor cannot be empty.'
shared_data = empty_shared_mem(name+'-data-'+str(self._server_id), True, data_tensor.shape, 'float32') if data_tensor is not None: # Create shared-tensor
shared_data = empty_shared_mem(name+'-data-', True, data_tensor.shape, 'float32')
dlpack = shared_data.to_dlpack() dlpack = shared_data.to_dlpack()
self._data_store[name+'-data-'] = F.zerocopy_from_dlpack(dlpack) self._data_store[name+'-data-'] = F.zerocopy_from_dlpack(dlpack)
self._data_store[name+'-data-'][:] = data_tensor[:] self._data_store[name+'-data-'][:] = data_tensor[:]
self._write_data_shape(name+'-data-shape', data_tensor)
self._open_file_list.append(name+'-data-shape')
else: # Read shared-tensor
data_shape = self._read_data_shape(name+'-data-shape')
shared_data = empty_shared_mem(name+'-data-', False, data_shape, 'float32')
dlpack = shared_data.to_dlpack()
self._data_store[name+'-data-'] = F.zerocopy_from_dlpack(dlpack)
self._has_data.add(name+'-data-') self._has_data.add(name+'-data-')
def get_id(self): def get_id(self):
"""Get current server id. """Get current server id
Return Return
------ ------
...@@ -269,31 +237,84 @@ class KVServer(object): ...@@ -269,31 +237,84 @@ class KVServer(object):
def get_addr(self): def get_addr(self):
"""Get current server IP address """Get current server IP address and port
Return Return
------ ------
str str
IP address IP address and port
""" """
return self._addr return self._ip + ':' + str(self._port)
def get_machine_id(self):
"""Get local machine ID
Return
-------
int
machine ID
"""
return self._machine_id
def get_group_count(self):
"""Get count of server inside a machine
Return
------
int
count of server
"""
return self._group_count
def get_message_count(self):
"""Get total message count on current KVServer
Return
------
int
count of message
"""
return self._msg_count
def print(self):
"""Print server information (Used by debug)
"""
print("----------")
print("server id: %d" % self.get_id())
print("data:")
for name, data in self._data_store.items():
print(name)
print(data)
print("---------")
def start(self): def start(self):
"""Start service of KVServer """Start service of KVServer
The start() api performs the following things:
1. Get connected with all client nodes.
2. Recv client address information.
3. assign client ID to each client node.
4. send shared-tensor information to each client node.
5. Service loop for listening requests from client nodes.
""" """
# Get connected with all client nodes # Get connected with all client nodes
server_ip, server_port = self._addr.split(':') _receiver_wait(self._receiver, self._ip, self._port, self._client_count)
_receiver_wait(self._receiver, server_ip, int(server_port), self._client_count)
# recv client addr and assign ID for clients # recv client address information
addr_list = [] addr_list = []
for i in range(self._client_count): for i in range(self._client_count):
msg = _recv_kv_msg(self._receiver) msg = _recv_kv_msg(self._receiver)
assert msg.type == KVMsgType.IP_ID assert msg.type == KVMsgType.IP_ID
addr_list.append(msg.name) addr_list.append(msg.name)
self._sort_addr(addr_list) # Assign client ID to each client node
addr_list.sort()
for ID in range(len(addr_list)): for ID in range(len(addr_list)):
self._client_namebook[ID] = addr_list[ID] self._client_namebook[ID] = addr_list[ID]
...@@ -306,8 +327,7 @@ class KVServer(object): ...@@ -306,8 +327,7 @@ class KVServer(object):
_sender_connect(self._sender) _sender_connect(self._sender)
if self._server_id == 0: if self._server_id == 0:
# assign ID to client nodes for client_id in range(len(self._client_namebook)):
for client_id, addr in self._client_namebook.items():
msg = KVStoreMsg( msg = KVStoreMsg(
type=KVMsgType.IP_ID, type=KVMsgType.IP_ID,
rank=self._server_id, rank=self._server_id,
...@@ -317,14 +337,14 @@ class KVServer(object): ...@@ -317,14 +337,14 @@ class KVServer(object):
c_ptr=None) c_ptr=None)
_send_kv_msg(self._sender, msg, client_id) _send_kv_msg(self._sender, msg, client_id)
# send serilaized shared-memory tensor information to clients # Send shared-tensor information to each client node
if self._server_id == 0:
shared_tensor = '' shared_tensor = ''
for name in self._has_data: for name in self._has_data:
shared_tensor += self._serialize_shared_tensor( shared_tensor += self._serialize_shared_tensor(
name, name,
F.shape(self._data_store[name]), F.shape(self._data_store[name]),
F.dtype(self._data_store[name])) F.dtype(self._data_store[name]))
shared_tensor += '|' shared_tensor += '|'
msg = KVStoreMsg( msg = KVStoreMsg(
...@@ -338,17 +358,19 @@ class KVServer(object): ...@@ -338,17 +358,19 @@ class KVServer(object):
for client_id in range(len(self._client_namebook)): for client_id in range(len(self._client_namebook)):
_send_kv_msg(self._sender, msg, client_id) _send_kv_msg(self._sender, msg, client_id)
print('KVStore service %d start successfully! Listen for request ...' % self.get_id())
# Service loop # Service loop
while True: while True:
msg = _recv_kv_msg(self._receiver) msg = _recv_kv_msg(self._receiver)
# PUSH message # Push message
if msg.type == KVMsgType.PUSH: if msg.type == KVMsgType.PUSH:
if (msg.name+'-g2l-' in self._has_data) == True: if (msg.name+'-g2l-' in self._has_data) == True:
local_id = self._data_store[msg.name+'-g2l-'][msg.id] local_id = self._data_store[msg.name+'-g2l-'][msg.id]
else: else:
local_id = msg.id local_id = msg.id
self._push_handler(msg.name+'-data-', local_id, msg.data, self._data_store) self._push_handler(msg.name+'-data-', local_id, msg.data, self._data_store)
# PULL message # Pull message
elif msg.type == KVMsgType.PULL: elif msg.type == KVMsgType.PULL:
if (msg.name+'-g2l-' in self._has_data) == True: if (msg.name+'-g2l-' in self._has_data) == True:
local_id = self._data_store[msg.name+'-g2l-'][msg.id] local_id = self._data_store[msg.name+'-g2l-'][msg.id]
...@@ -374,26 +396,132 @@ class KVServer(object): ...@@ -374,26 +396,132 @@ class KVServer(object):
id=None, id=None,
data=None, data=None,
c_ptr=None) c_ptr=None)
for i in range(self._client_count): for client_id in range(self._client_count):
_send_kv_msg(self._sender, back_msg, i) _send_kv_msg(self._sender, back_msg, client_id)
self._barrier_count = 0 self._barrier_count = 0
# FINAL message # Final message
elif msg.type == KVMsgType.FINAL: elif msg.type == KVMsgType.FINAL:
print("Exit KVStore service, server ID: %d" % self._server_id) print("Exit KVStore service %d, solved message count: %d" % (self.get_id(), self.get_message_count()))
break # exit loop break # exit loop
else: else:
raise RuntimeError('Unknown type of kvstore message: %d' % msg.type.value) raise RuntimeError('Unknown type of kvstore message: %d' % msg.type.value)
# garbage collection
self._garbage_msg.append(msg) self._garbage_msg.append(msg)
if len(self._garbage_msg) > 1000: if len(self._garbage_msg) > GARBAGE_COLLECTION_COUNT:
_clear_kv_msg(self._garbage_msg) _clear_kv_msg(self._garbage_msg)
self._garbage_msg = [] self._garbage_msg = []
self._msg_count += 1
def _serialize_shared_tensor(self, name, shape, dtype):
"""Serialize shared tensor information.
Parameters
----------
name : str
tensor name
shape : tuple of int
tensor shape
dtype : str
data type
Returns
-------
str
serialized string
"""
assert len(name) > 0, 'data name cannot be empty.'
assert len(shape) > 0, 'data shape cannot be empty.'
str_data = name
str_data += '/'
for s in shape:
str_data += str(s)
str_data += '/'
if 'float32' in str(dtype):
str_data += 'float32'
elif 'int64' in str(dtype):
str_data += 'int64'
else:
raise RuntimeError('We can only process int64 and float32 shared-memory tensor now.')
return str_data
def _write_data_shape(self, filename, data):
"""Write data shape to a temp file.
Parameters
----------
filename : str
name of temp file.
data : tensor (mx.ndarray or torch.tensor)
data tensor
"""
if(os.path.exists(filename)):
os.remove(filename)
shape = F.shape(data)
str_data = ''
f = open(filename, "a");
for s in shape:
str_data += str(s)
str_data += '|'
f.write(str_data)
f.close()
def _read_data_shape(self, filename):
"""Read data shape from a tmp file.
Parameters
----------
filename : str
name of temp file
Return
------
tuple
data shape
"""
f = open(filename, "r")
str_data = f.read()
data_list = str_data.split('|')
data_shape = []
for i in range(len(data_list)-1):
data_shape.append(int(data_list[i]))
f.close()
return data_shape
def _get_group_count(self):
"""Get count of backup server
Return
------
int
count of backup server
"""
group_count = 0
pre_id = 0
for ID, data in self._server_namebook.items():
machine_id = data[0]
if machine_id != pre_id:
break
group_count += 1
pre_id = machine_id
return group_count
def _push_handler(self, name, ID, data, target): def _push_handler(self, name, ID, data, target):
"""Default handler for PUSH message. """Default handler for PUSH message.
On default, _push_handler perform SET operation on the target tensor. On default, _push_handler perform update operation for the tensor.
Parameters Parameters
---------- ----------
...@@ -412,7 +540,7 @@ class KVServer(object): ...@@ -412,7 +540,7 @@ class KVServer(object):
def _pull_handler(self, name, ID, target): def _pull_handler(self, name, ID, target):
"""Default handler for PULL operation. """Default handler for PULL operation.
On default, _pull_handler perform index select operation for the tensor. On default, _pull_handler perform get operation for the tensor.
Parameters Parameters
---------- ----------
...@@ -431,53 +559,9 @@ class KVServer(object): ...@@ -431,53 +559,9 @@ class KVServer(object):
return target[name][ID] return target[name][ID]
def _serialize_shared_tensor(self, name, shape, dtype):
"""Serialize shared tensor
Parameters
----------
name : str
tensor name
shape : tuple of int
tensor shape
dtype : str
data type
Returns
-------
str
serialized string
"""
str_data = name
str_data += '/'
for s in shape:
str_data += str(s)
str_data += '/'
if 'float32' in str(dtype):
str_data += 'float32'
elif 'int64' in str(dtype):
str_data += 'int64'
else:
raise RuntimeError('We can only process int64 and float32 shared-memory tensor now.')
return str_data
def _sort_addr(self, addr_list):
"""Sort client address list
Parameters
----------
addr_list : list of str
IP address list
"""
return addr_list.sort()
class KVClient(object): class KVClient(object):
"""KVClient is used to push/pull tensors to/from KVServer. If one server node and one client node """KVClient is used to push/pull tensors to/from KVServer. If the server node and client node are on the
are on the same machine, they can commuincated using shared-memory tensor (close_shared_mem=False), same machine, they can commuincate with each other using shared-memory tensor, instead of TCP/IP connections.
instead of TCP/IP connections.
Note that, DO NOT use KVClient in multiple threads on Python because this behavior is not defined. Note that, DO NOT use KVClient in multiple threads on Python because this behavior is not defined.
...@@ -487,43 +571,47 @@ class KVClient(object): ...@@ -487,43 +571,47 @@ class KVClient(object):
---------- ----------
server_namebook: dict server_namebook: dict
IP address namebook of KVServer, where key is the KVServer's ID IP address namebook of KVServer, where key is the KVServer's ID
(start from 0) and value is the server's IP address and port, e.g., (start from 0) and value is the server's machine_id, IP address and port, e.g.,
{ 0:'168.12.23.45:50051', {0:'[0, 172.31.40.143, 30050],
1:'168.12.23.21:50051', 1:'[0, 172.31.40.143, 30051],
2:'168.12.46.12:50051' } 2:'[1, 172.31.36.140, 30050],
close_shared_mem : bool 3:'[1, 172.31.36.140, 30051],
DO NOT use shared-memory access on local machine. 4:'[2, 172.31.47.147, 30050],
msg_queue_size : int 5:'[2, 172.31.47.147, 30051],
Size of message queue (2GB by default). 6:'[3, 172.31.30.180, 30050],
7:'[3, 172.31.30.180, 30051]}
queue_size : int
Sise (bytes) of kvstore message queue buffer (~20 GB on default).
net_type : str net_type : str
networking type, e.g., 'socket' (default) or 'mpi'. networking type, e.g., 'socket' (default) or 'mpi'.
""" """
def __init__(self, server_namebook, close_shared_mem=False, msg_queue_size=2 * 1024 * 1024 * 1024, net_type='socket'): def __init__(self, server_namebook, queue_size=2*1024*1024*1024, net_type='socket'):
assert len(server_namebook) > 0, 'server_namebook cannot be empty.' assert len(server_namebook) > 0, 'server_namebook cannot be empty.'
assert queue_size > 0, 'queue_size cannot be a negative number.'
assert net_type == 'socket' or net_type == 'mpi', 'net_type (%s) can only be \'socket\' or \'mpi\'.' % net_type assert net_type == 'socket' or net_type == 'mpi', 'net_type (%s) can only be \'socket\' or \'mpi\'.' % net_type
if close_shared_mem == True: # check if target data has been initialized
print("The shared-memory tensor has been closed, all data connections will go through TCP/IP network.")
# check if target data has a ID mapping for global ID to local ID
self._has_data = set() self._has_data = set()
# This is used to store local data, which can share memory with local KVServer. # This is used to store local data, which can share memory with local KVServer.
self._data_store = {} self._data_store = {}
# This is used to check if we can access server data locally
self._local_server_id = set()
# Server information # Server information
self._server_namebook = server_namebook self._server_namebook = server_namebook
self._server_count = len(server_namebook) self._server_count = len(server_namebook)
self._close_shared_mem = close_shared_mem self._group_count = self._get_group_count()
# client ID will be assign by server after connecting to server # client ID will be assign by server after connecting to server
self._client_id = -1 self._client_id = -1
# Get local machine id via server_namebook
self._machine_id = self._get_machine_id()
# create C communicator of sender and receiver # create C communicator of sender and receiver
self._sender = _create_sender(net_type, msg_queue_size) self._sender = _create_sender(net_type, queue_size)
self._receiver = _create_receiver(net_type, msg_queue_size) self._receiver = _create_receiver(net_type, queue_size)
# A naive garbage collocetion for kvstore # A naive garbage collocetion for kvstore
self._garbage_msg = [] self._garbage_msg = []
self._open_file_list = []
# Used load-balance
random.seed(time.time())
def __del__(self): def __del__(self):
...@@ -532,52 +620,69 @@ class KVClient(object): ...@@ -532,52 +620,69 @@ class KVClient(object):
# finalize C communicator of sender and receiver # finalize C communicator of sender and receiver
_finalize_sender(self._sender) _finalize_sender(self._sender)
_finalize_receiver(self._receiver) _finalize_receiver(self._receiver)
# Delete temp file
for file in self._open_file_list:
if(os.path.exists(file)):
os.remove(file)
def set_partition_book(self, name, partition_book): def set_partition_book(self, name, partition_book=None):
"""Set partition book for KVClient. """Partition book contains the mapping of global ID to machine ID.
Using partition book, client can know the corresponded server ID of each data.
Parameters Parameters
---------- ----------
name : str name : str
data name data name
partition_book : list or tensor (mx.ndarray or torch.tensor) partition_book : list or tensor (mx.ndarray or torch.tensor)
A book that maps global ID to target server ID. Mapping global ID to target machine ID.
Note that, if the partition_book is None KVClient will read shared-tensor.
""" """
assert len(name) > 0, 'name cannot be empty.' assert len(name) > 0, 'name connot be empty.'
assert len(partition_book) > 0, 'partition_book cannot be empty.'
if partition_book is not None: # Create shared-tensor
if isinstance(partition_book, list): if isinstance(partition_book, list):
self._data_store[name+'-part-'] = F.tensor(partition_book) partition_book = F.tensor(partition_book)
else: shared_data = empty_shared_mem(name+'-part-', True, partition_book.shape, 'int64')
self._data_store[name+'-part-'] = partition_book dlpack = shared_data.to_dlpack()
self._data_store[name+'-part-'] = F.zerocopy_from_dlpack(dlpack)
self._data_store[name+'-part-'][:] = partition_book[:]
self._write_data_shape(name+'-part-shape', partition_book)
self._open_file_list.append(name+'-part-shape')
else: # Read shared-tensor
data_shape = self._read_data_shape(name+'-part-shape')
shared_data = empty_shared_mem(name+'-part-', False, data_shape, 'int64')
dlpack = shared_data.to_dlpack()
self._data_store[name+'-part-'] = F.zerocopy_from_dlpack(dlpack)
self._has_data.add(name+'-part-') self._has_data.add(name+'-part-')
def connect(self): def connect(self):
"""Connect to all the KVServer nodes """Connect to all the KVServer nodes
The connect() api performs the following things:
1. Get connected with all server nodes.
2. Send client address information to server.
3. Recv client ID from server.
4. Recv shared-tensor information from server.
""" """
# Get connected with all server nodes
for ID, addr in self._server_namebook.items(): for ID, addr in self._server_namebook.items():
server_ip, server_port = addr.split(':') server_ip = addr[1]
_add_receiver_addr(self._sender, server_ip, int(server_port), ID) server_port = addr[2]
_add_receiver_addr(self._sender, server_ip, server_port, ID)
_sender_connect(self._sender) _sender_connect(self._sender)
self._addr = self._get_local_addr() # Send client address to server nodes
self._addr = self._get_local_usable_addr()
client_ip, client_port = self._addr.split(':') client_ip, client_port = self._addr.split(':')
# find local server nodes
for ID, addr in self._server_namebook.items():
server_ip, server_port = addr.split(':')
if server_ip in self._ip4_addr_list():
self._local_server_id.add(ID)
# send addr to server nodes
msg = KVStoreMsg( msg = KVStoreMsg(
type=KVMsgType.IP_ID, type=KVMsgType.IP_ID,
rank=0, rank=0, # a tmp client ID
name=self._addr, name=self._addr,
id=None, id=None,
data=None, data=None,
...@@ -588,28 +693,73 @@ class KVClient(object): ...@@ -588,28 +693,73 @@ class KVClient(object):
_receiver_wait(self._receiver, client_ip, int(client_port), self._server_count) _receiver_wait(self._receiver, client_ip, int(client_port), self._server_count)
# recv client id # Recv client ID from server
msg = _recv_kv_msg(self._receiver) msg = _recv_kv_msg(self._receiver)
assert msg.rank == 0 assert msg.rank == 0
self._client_id = int(msg.name) self._client_id = int(msg.name)
# recv name of shared tensor from server 0 # Recv shared-tensor information from server
msg = _recv_kv_msg(self._receiver) msg = _recv_kv_msg(self._receiver)
assert msg.rank == 0 assert msg.rank == 0
data_str = msg.name.split('|') data_str = msg.name.split('|')
# open shared tensor on local machine
for data in data_str: for data in data_str:
if data != '' and self._close_shared_mem == False: if data != '':
tensor_name, shape, dtype = self._deserialize_shared_tensor(data) tensor_name, shape, dtype = self._deserialize_shared_tensor(data)
for server_id in self._local_server_id: shared_data = empty_shared_mem(tensor_name, False, shape, dtype)
shared_data = empty_shared_mem(tensor_name+str(server_id), False, shape, dtype)
dlpack = shared_data.to_dlpack() dlpack = shared_data.to_dlpack()
self._data_store[tensor_name+str(server_id)] = F.zerocopy_from_dlpack(dlpack) self._data_store[tensor_name] = F.zerocopy_from_dlpack(dlpack)
self._has_data.add(tensor_name+str(server_id)) self._has_data.add(tensor_name)
print("KVClient %d connect to kvstore successfully!" % self.get_id())
def print(self):
"""Print client information (Used by debug)
"""
print("----------")
print("client id: %d" % self.get_id())
print("data:")
for name, data in self._data_store.items():
print(name)
print(data)
print("----------")
def get_id(self):
"""Get current client id
Return
------
int
KVClient ID
"""
return self._client_id
def get_addr(self):
"""Get current client IP address
Return
------
str
IP address
"""
return self._addr
def get_machine_id(self):
"""Get local machine ID
Return
-------
int
machine ID
"""
return self._machine_id
def push(self, name, id_tensor, data_tensor): def push(self, name, id_tensor, data_tensor):
"""Push message to KVServer. """Push data to KVServer.
Note that push() is an async operation that will return immediately after calling. Note that push() is an async operation that will return immediately after calling.
...@@ -626,29 +776,30 @@ class KVClient(object): ...@@ -626,29 +776,30 @@ class KVClient(object):
assert F.ndim(id_tensor) == 1, 'ID must be a vector.' assert F.ndim(id_tensor) == 1, 'ID must be a vector.'
assert F.shape(id_tensor)[0] == F.shape(data_tensor)[0], 'The data must has the same row size with ID.' assert F.shape(id_tensor)[0] == F.shape(data_tensor)[0], 'The data must has the same row size with ID.'
# partition data (we can move this part of code into C-api if needed) # partition data
server_id = self._data_store[name+'-part-'][id_tensor] machine_id = self._data_store[name+'-part-'][id_tensor]
# sort index by server id # sort index by machine id
sorted_id = F.tensor(np.argsort(F.asnumpy(server_id))) sorted_id = F.tensor(np.argsort(F.asnumpy(machine_id)))
id_tensor = id_tensor[sorted_id] id_tensor = id_tensor[sorted_id]
data_tensor = data_tensor[sorted_id] data_tensor = data_tensor[sorted_id]
server, count = np.unique(F.asnumpy(server_id), return_counts=True) machine, count = np.unique(F.asnumpy(machine_id), return_counts=True)
# push data to server by order # push data to server by order
start = 0 start = 0
for idx in range(len(server)): local_id = None
local_data = None
for idx in range(len(machine)):
end = start + count[idx] end = start + count[idx]
if start == end: # don't have any data for target server if start == end: # No data for target machine
continue continue
partial_id = id_tensor[start:end] partial_id = id_tensor[start:end]
partial_data = data_tensor[start:end] partial_data = data_tensor[start:end]
if machine[idx] == self._machine_id: # local push
if server[idx] in self._local_server_id and self._close_shared_mem == False: if (name+'-g2l-' in self._has_data) == True:
if (name+'-g2l-'+str(server[idx]) in self._has_data) == True: local_id = self._data_store[name+'-g2l-'][partial_id]
local_id = self._data_store[name+'-g2l-'+str(server[idx])][partial_id]
else: else:
local_id = partial_id local_id = partial_id
self._push_handler(name+'-data-'+str(server[idx]), local_id, data_tensor, self._data_store) local_data = partial_data
else: else: # push data to remote server
msg = KVStoreMsg( msg = KVStoreMsg(
type=KVMsgType.PUSH, type=KVMsgType.PUSH,
rank=self._client_id, rank=self._client_id,
...@@ -656,10 +807,15 @@ class KVClient(object): ...@@ -656,10 +807,15 @@ class KVClient(object):
id=partial_id, id=partial_id,
data=partial_data, data=partial_data,
c_ptr=None) c_ptr=None)
_send_kv_msg(self._sender, msg, server[idx]) # randomly select a server node in target machine for load-balance
s_id = random.randint(machine[idx]*self._group_count, (machine[idx]+1)*self._group_count-1)
_send_kv_msg(self._sender, msg, s_id)
start += count[idx] start += count[idx]
if local_id is not None:
self._push_handler(name+'-data-', local_id, local_data, self._data_store)
def pull(self, name, id_tensor): def pull(self, name, id_tensor):
"""Pull message from KVServer. """Pull message from KVServer.
...@@ -679,35 +835,33 @@ class KVClient(object): ...@@ -679,35 +835,33 @@ class KVClient(object):
assert len(name) > 0, 'name cannot be empty.' assert len(name) > 0, 'name cannot be empty.'
assert F.ndim(id_tensor) == 1, 'ID must be a vector.' assert F.ndim(id_tensor) == 1, 'ID must be a vector.'
if len(self._garbage_msg) > 1000: if len(self._garbage_msg) > GARBAGE_COLLECTION_COUNT:
_clear_kv_msg(self._garbage_msg) _clear_kv_msg(self._garbage_msg)
self._garbage_msg = [] self._garbage_msg = []
# partition data (we can move this part of code into C-api if needed) # partition data
server_id = self._data_store[name+'-part-'][id_tensor] machine_id = self._data_store[name+'-part-'][id_tensor]
# sort index by server id # sort index by machine id
sorted_id = np.argsort(F.asnumpy(server_id)) sorted_id = F.tensor(np.argsort(F.asnumpy(machine_id)))
# we need return data with original order of ID back_sorted_id = F.tensor(np.argsort(F.asnumpy(sorted_id)))
back_sorted_id = F.tensor(np.argsort(sorted_id)) id_tensor = id_tensor[sorted_id]
id_tensor = id_tensor[F.tensor(sorted_id)] machine, count = np.unique(F.asnumpy(machine_id), return_counts=True)
server, count = np.unique(F.asnumpy(server_id), return_counts=True) # pull data from server by order
# pull data from server by server order
start = 0 start = 0
pull_count = 0 pull_count = 0
local_data = {} local_id = None
for idx in range(len(server)):
for idx in range(len(machine)):
end = start + count[idx] end = start + count[idx]
if start == end: # don't have any data in target server if start == end: # No data for target machine
continue continue
partial_id = id_tensor[start:end] partial_id = id_tensor[start:end]
if machine[idx] == self._machine_id: # local pull
if server[idx] in self._local_server_id and self._close_shared_mem == False: if (name+'-g2l-' in self._has_data) == True:
if (name+'-g2l-'+str(server[idx]) in self._has_data) == True: local_id = self._data_store[name+'-g2l-'][partial_id]
local_id = self._data_store[name+'-g2l-'+str(server[idx])][partial_id]
else: else:
local_id = partial_id local_id = partial_id
local_data[server[idx]] = self._pull_handler(name+'-data-'+str(server[idx]), local_id, self._data_store) else: # pull data from remote server
else:
msg = KVStoreMsg( msg = KVStoreMsg(
type=KVMsgType.PULL, type=KVMsgType.PULL,
rank=self._client_id, rank=self._client_id,
...@@ -715,19 +869,23 @@ class KVClient(object): ...@@ -715,19 +869,23 @@ class KVClient(object):
id=partial_id, id=partial_id,
data=None, data=None,
c_ptr=None) c_ptr=None)
_send_kv_msg(self._sender, msg, server[idx]) s_id = random.randint(machine[idx]*self._group_count, (machine[idx]+1)*self._group_count-1)
_send_kv_msg(self._sender, msg, s_id)
pull_count += 1 pull_count += 1
start += count[idx] start += count[idx]
msg_list = [] msg_list = []
for server_id, data in local_data.items():
if local_id is not None:
local_data = self._pull_handler(name+'-data-', local_id, self._data_store)
s_id = random.randint(self._machine_id*self._group_count, (self._machine_id+1)*self._group_count-1)
local_msg = KVStoreMsg( local_msg = KVStoreMsg(
type=KVMsgType.PULL_BACK, type=KVMsgType.PULL_BACK,
rank=server_id, rank=s_id,
name=name, name=name,
id=None, id=None,
data=data, data=local_data,
c_ptr=None) c_ptr=None)
msg_list.append(local_msg) msg_list.append(local_msg)
self._garbage_msg.append(local_msg) self._garbage_msg.append(local_msg)
...@@ -782,29 +940,7 @@ class KVClient(object): ...@@ -782,29 +940,7 @@ class KVClient(object):
_send_kv_msg(self._sender, msg, server_id) _send_kv_msg(self._sender, msg, server_id)
def get_id(self): def _get_local_usable_addr(self):
"""Get client id
Return
------
int
KVClient ID
"""
return self._client_id
def get_addr(self):
"""Get client IP address
Return
------
str
IP address
"""
return self._addr
def _get_local_addr(self):
"""Get local available IP and port """Get local available IP and port
Return Return
...@@ -831,41 +967,148 @@ class KVClient(object): ...@@ -831,41 +967,148 @@ class KVClient(object):
return IP + ':' + str(port) return IP + ':' + str(port)
def _get_ip_address(self, NICname): def _get_group_count(self):
"""Return IP by given a NIC name """Get count of backup server
Return
------
int
count of backup server
""" """
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) group_count = 0
return socket.inet_ntoa(fcntl.ioctl( pre_id = 0
s.fileno(), for ID, data in self._server_namebook.items():
0x8915, # SIOCGIFADDR machine_id = data[0]
struct.pack('256s', NICname[:15].encode("UTF-8")) if machine_id != pre_id:
)[20:24]) break
group_count += 1
pre_id = machine_id
return group_count
def _get_machine_id(self):
"""Get local machine ID from server_namebook
Return
------
int
local machine ID
"""
res = 0
for ID, data in self._server_namebook.items():
machine_id = data[0]
ip = data[1]
if ip in self._local_ip4_addr_list():
res = machine_id
break
return res
def _ip4_addr_list(self):
def _local_ip4_addr_list(self):
"""Return a set of IPv4 address """Return a set of IPv4 address
""" """
nic = set() nic = set()
for ix in socket.if_nameindex(): for ix in socket.if_nameindex():
name = ix[1] name = ix[1]
ip = self._get_ip_address(name) s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ip = socket.inet_ntoa(fcntl.ioctl(
s.fileno(),
0x8915, # SIOCGIFADDR
struct.pack('256s', name[:15].encode("UTF-8")))[20:24])
nic.add(ip) nic.add(ip)
return nic return nic
def _deserialize_shared_tensor(self, data):
"""Deserialize shared tensor information sent from server
Parameters
----------
data : str
serialized string
Returns
-------
str
tensor name
tuple of int
tensor shape
str
data type
"""
data_list = data.split('/')
tensor_name = data_list[0]
data_type = data_list[-1]
tensor_shape = []
for i in range(1, len(data_list)-1):
tensor_shape.append(int(data_list[i]))
tensor_shape = tuple(tensor_shape)
return tensor_name, tensor_shape, data_type
def _write_data_shape(self, filename, data):
"""Write data shape to a temp file.
Parameters
----------
filename : str
name of temp file.
data : tensor (mx.ndarray or torch.tensor)
data tensor
"""
if(os.path.exists(filename)):
os.remove(filename)
shape = F.shape(data)
str_data = ''
f = open(filename, "a");
for s in shape:
str_data += str(s)
str_data += '|'
f.write(str_data)
f.close()
def _read_data_shape(self, filename):
"""Read data shape from a tmp file.
Parameters
----------
filename : str
name of temp file
Return
------
tuple
data shape
"""
f = open(filename, "r")
str_data = f.read()
data_list = str_data.split('|')
data_shape = []
for i in range(len(data_list)-1):
data_shape.append(int(data_list[i]))
f.close()
return data_shape
def _takeId(self, elem): def _takeId(self, elem):
"""Used by sort """Used by sort message list
""" """
return elem.rank return elem.rank
def _push_handler(self, name, ID, data, target): def _push_handler(self, name, ID, data, target):
"""Default handler for local PUSH message. """Default handler for PUSH message.
On default, _push_handler perform SET operation for the tensor. On default, _push_handler perform update operation for the tensor.
Parameters Parameters
---------- ----------
...@@ -875,58 +1118,30 @@ class KVClient(object): ...@@ -875,58 +1118,30 @@ class KVClient(object):
a vector storing the ID list. a vector storing the ID list.
data : tensor (mx.ndarray or torch.tensor) data : tensor (mx.ndarray or torch.tensor)
a tensor with the same row size of id a tensor with the same row size of id
target : tensor (mx.ndarray or torch.tensor) target : dict of data
the target tensor self._data_store
""" """
target[name][ID] = data target[name][ID] = data
def _pull_handler(self, name, ID, target): def _pull_handler(self, name, ID, target):
"""Default handler for local PULL operation. """Default handler for PULL operation.
On default, _pull_handler perform index select operation for the tensor. On default, _pull_handler perform get operation for the tensor.
Parameters Parameters
---------- ----------
name : str name : str
data name data name
ID : tensor (mx.ndarray or torch.tensor) ID : tensor (mx.ndarray or torch.tensor)
a vector storing the IDs that has been re-mapped to local id. a vector storing the ID list.
target : tensor (mx.ndarray or torch.tensor) target : dict of data
the target tensor self._data_store
Return Return
------ ------
tensor tensor
a tensor with the same row size of ID a tensor with the same row size of ID.
""" """
return target[name][ID] return target[name][ID]
\ No newline at end of file
def _deserialize_shared_tensor(self, data):
"""Deserialize shared tensor information sent from server
Parameters
----------
data : str
serialized string
Returns
-------
str
tensor name
tuple of int
tensor shape
str
data type
"""
data_list = data.split('/')
tensor_name = data_list[0]
data_type = data_list[-1]
tensor_shape = []
for i in range(1, len(data_list)-1):
tensor_shape.append(int(data_list[i]))
tensor_shape = tuple(tensor_shape)
return tensor_name, tensor_shape, data_type
...@@ -23,6 +23,7 @@ def _network_wait(): ...@@ -23,6 +23,7 @@ def _network_wait():
""" """
time.sleep(_WAIT_TIME_SEC) time.sleep(_WAIT_TIME_SEC)
def _create_sender(net_type, msg_queue_size=2*1024*1024*1024): def _create_sender(net_type, msg_queue_size=2*1024*1024*1024):
"""Create a Sender communicator via C api """Create a Sender communicator via C api
...@@ -36,6 +37,7 @@ def _create_sender(net_type, msg_queue_size=2*1024*1024*1024): ...@@ -36,6 +37,7 @@ def _create_sender(net_type, msg_queue_size=2*1024*1024*1024):
assert net_type in ('socket', 'mpi'), 'Unknown network type.' assert net_type in ('socket', 'mpi'), 'Unknown network type.'
return _CAPI_DGLSenderCreate(net_type, msg_queue_size) return _CAPI_DGLSenderCreate(net_type, msg_queue_size)
def _create_receiver(net_type, msg_queue_size=2*1024*1024*1024): def _create_receiver(net_type, msg_queue_size=2*1024*1024*1024):
"""Create a Receiver communicator via C api """Create a Receiver communicator via C api
...@@ -49,6 +51,7 @@ def _create_receiver(net_type, msg_queue_size=2*1024*1024*1024): ...@@ -49,6 +51,7 @@ def _create_receiver(net_type, msg_queue_size=2*1024*1024*1024):
assert net_type in ('socket', 'mpi'), 'Unknown network type.' assert net_type in ('socket', 'mpi'), 'Unknown network type.'
return _CAPI_DGLReceiverCreate(net_type, msg_queue_size) return _CAPI_DGLReceiverCreate(net_type, msg_queue_size)
def _finalize_sender(sender): def _finalize_sender(sender):
"""Finalize Sender communicator """Finalize Sender communicator
...@@ -59,11 +62,13 @@ def _finalize_sender(sender): ...@@ -59,11 +62,13 @@ def _finalize_sender(sender):
""" """
_CAPI_DGLFinalizeSender(sender) _CAPI_DGLFinalizeSender(sender)
def _finalize_receiver(receiver): def _finalize_receiver(receiver):
"""Finalize Receiver Communicator """Finalize Receiver Communicator
""" """
_CAPI_DGLFinalizeReceiver(receiver) _CAPI_DGLFinalizeReceiver(receiver)
def _add_receiver_addr(sender, ip_addr, port, recv_id): def _add_receiver_addr(sender, ip_addr, port, recv_id):
"""Add Receiver IP address to namebook """Add Receiver IP address to namebook
...@@ -81,6 +86,7 @@ def _add_receiver_addr(sender, ip_addr, port, recv_id): ...@@ -81,6 +86,7 @@ def _add_receiver_addr(sender, ip_addr, port, recv_id):
assert recv_id >= 0, 'recv_id cannot be a negative number.' assert recv_id >= 0, 'recv_id cannot be a negative number.'
_CAPI_DGLSenderAddReceiver(sender, ip_addr, int(port), int(recv_id)) _CAPI_DGLSenderAddReceiver(sender, ip_addr, int(port), int(recv_id))
def _sender_connect(sender): def _sender_connect(sender):
"""Connect to all the Receiver """Connect to all the Receiver
...@@ -91,8 +97,9 @@ def _sender_connect(sender): ...@@ -91,8 +97,9 @@ def _sender_connect(sender):
""" """
_CAPI_DGLSenderConnect(sender) _CAPI_DGLSenderConnect(sender)
def _receiver_wait(receiver, ip_addr, port, num_sender): def _receiver_wait(receiver, ip_addr, port, num_sender):
"""Wait all Sender to connect.. """Wait all Sender to connect.
Parameters Parameters
---------- ----------
...@@ -186,7 +193,8 @@ class KVMsgType(Enum): ...@@ -186,7 +193,8 @@ class KVMsgType(Enum):
BARRIER = 6 BARRIER = 6
IP_ID = 7 IP_ID = 7
KVStoreMsg = namedtuple("KVStoreMsg", "type rank name id data, c_ptr")
KVStoreMsg = namedtuple("KVStoreMsg", "type rank name id data c_ptr")
"""Message of DGL kvstore """Message of DGL kvstore
Data Field Data Field
...@@ -201,6 +209,8 @@ id : tensor (mx.ndarray or torch.tensor) ...@@ -201,6 +209,8 @@ id : tensor (mx.ndarray or torch.tensor)
data vector storing the global IDs data vector storing the global IDs
data : tensor (mx.ndarray or torch.tensor) data : tensor (mx.ndarray or torch.tensor)
data matrix with the same row size of id data matrix with the same row size of id
c_ptr : void*
c pointer of message
""" """
def _send_kv_msg(sender, msg, recv_id): def _send_kv_msg(sender, msg, recv_id):
...@@ -249,6 +259,7 @@ def _send_kv_msg(sender, msg, recv_id): ...@@ -249,6 +259,7 @@ def _send_kv_msg(sender, msg, recv_id):
tensor_id, tensor_id,
data) data)
def _recv_kv_msg(receiver): def _recv_kv_msg(receiver):
"""Receive kvstore message. """Receive kvstore message.
...@@ -256,7 +267,6 @@ def _recv_kv_msg(receiver): ...@@ -256,7 +267,6 @@ def _recv_kv_msg(receiver):
---------- ----------
receiver : ctypes.c_void_p receiver : ctypes.c_void_p
C Receiver handle C Receiver handle
Return Return
------ ------
KVStoreMsg KVStoreMsg
...@@ -319,4 +329,3 @@ def _clear_kv_msg(garbage_msg): ...@@ -319,4 +329,3 @@ def _clear_kv_msg(garbage_msg):
if msg.c_ptr is not None: if msg.c_ptr is not None:
_CAPI_DeleteKVMsg(msg.c_ptr) _CAPI_DeleteKVMsg(msg.c_ptr)
garbage_msg = [] garbage_msg = []
\ No newline at end of file
...@@ -24,7 +24,6 @@ using namespace dgl::runtime; ...@@ -24,7 +24,6 @@ using namespace dgl::runtime;
namespace dgl { namespace dgl {
namespace network { namespace network {
static void NaiveDeleter(DLManagedTensor* managed_tensor) { static void NaiveDeleter(DLManagedTensor* managed_tensor) {
delete [] managed_tensor->dl_tensor.shape; delete [] managed_tensor->dl_tensor.shape;
delete [] managed_tensor->dl_tensor.strides; delete [] managed_tensor->dl_tensor.strides;
...@@ -607,7 +606,5 @@ DGL_REGISTER_GLOBAL("network._CAPI_DeleteKVMsg") ...@@ -607,7 +606,5 @@ DGL_REGISTER_GLOBAL("network._CAPI_DeleteKVMsg")
delete msg; delete msg;
}); });
} // namespace network } // namespace network
} // namespace dgl } // namespace dgl
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment