import os import time import numpy as np import socket from scipy import sparse as spsp import dgl import backend as F import unittest, pytest from dgl.graph_index import create_graph_index import multiprocessing as mp from numpy.testing import assert_array_equal if os.name != 'nt': import fcntl import struct def get_local_usable_addr(): """Get local usable IP and port Returns ------- str IP address, e.g., '192.168.8.12:50051' """ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: # doesn't even have to be reachable sock.connect(('10.255.255.255', 1)) ip_addr = sock.getsockname()[0] except ValueError: ip_addr = '127.0.0.1' finally: sock.close() sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(("", 0)) sock.listen(1) port = sock.getsockname()[1] sock.close() return ip_addr + ' ' + str(port) def create_random_graph(n): arr = (spsp.random(n, n, density=0.001, format='coo') != 0).astype(np.int64) ig = create_graph_index(arr, readonly=True) return dgl.DGLGraph(ig) # Create an one-part Graph node_map = F.tensor([0,0,0,0,0,0], F.int64) edge_map = F.tensor([0,0,0,0,0,0,0], F.int64) global_nid = F.tensor([0,1,2,3,4,5], F.int64) global_eid = F.tensor([0,1,2,3,4,5,6], F.int64) g = dgl.DGLGraph() g.add_nodes(6) g.add_edge(0, 1) # 0 g.add_edge(0, 2) # 1 g.add_edge(0, 3) # 2 g.add_edge(2, 3) # 3 g.add_edge(1, 1) # 4 g.add_edge(0, 4) # 5 g.add_edge(2, 5) # 6 g.ndata[dgl.NID] = global_nid g.edata[dgl.EID] = global_eid gpb = dgl.distributed.GraphPartitionBook(part_id=0, num_parts=1, node_map=node_map, edge_map=edge_map, part_graph=g) node_policy = dgl.distributed.PartitionPolicy(policy_str='node', partition_book=gpb) edge_policy = dgl.distributed.PartitionPolicy(policy_str='edge', partition_book=gpb) data_0 = F.tensor([[1.,1.],[1.,1.],[1.,1.],[1.,1.],[1.,1.],[1.,1.]], F.float32) data_0_1 = F.tensor([1.,2.,3.,4.,5.,6.], F.float32) data_0_2 = F.tensor([1,2,3,4,5,6], F.int32) data_0_3 = F.tensor([1,2,3,4,5,6], F.int64) data_1 = F.tensor([[2.,2.],[2.,2.],[2.,2.],[2.,2.],[2.,2.],[2.,2.],[2.,2.]], F.float32) data_2 = F.tensor([[0.,0.],[0.,0.],[0.,0.],[0.,0.],[0.,0.],[0.,0.]], F.float32) def init_zero_func(shape, dtype): return F.zeros(shape, dtype, F.cpu()) def udf_push(target, name, id_tensor, data_tensor): target[name][id_tensor] = data_tensor * data_tensor def add_push(target, name, id_tensor, data_tensor): target[name][id_tensor] += data_tensor @unittest.skipIf(os.name == 'nt' or os.getenv('DGLBACKEND') == 'tensorflow', reason='Do not support windows and TF yet') def test_partition_policy(): assert node_policy.policy_str == 'node' assert edge_policy.policy_str == 'edge' assert node_policy.part_id == 0 assert edge_policy.part_id == 0 local_nid = node_policy.to_local(F.tensor([0,1,2,3,4,5])) local_eid = edge_policy.to_local(F.tensor([0,1,2,3,4,5,6])) assert_array_equal(F.asnumpy(local_nid), F.asnumpy(F.tensor([0,1,2,3,4,5], F.int64))) assert_array_equal(F.asnumpy(local_eid), F.asnumpy(F.tensor([0,1,2,3,4,5,6], F.int64))) nid_partid = node_policy.to_partid(F.tensor([0,1,2,3,4,5], F.int64)) eid_partid = edge_policy.to_partid(F.tensor([0,1,2,3,4,5,6], F.int64)) assert_array_equal(F.asnumpy(nid_partid), F.asnumpy(F.tensor([0,0,0,0,0,0], F.int64))) assert_array_equal(F.asnumpy(eid_partid), F.asnumpy(F.tensor([0,0,0,0,0,0,0], F.int64))) assert node_policy.get_data_size() == len(node_map) assert edge_policy.get_data_size() == len(edge_map) def start_server(server_id, num_clients): # Init kvserver kvserver = dgl.distributed.KVServer(server_id=server_id, ip_config='kv_ip_config.txt', num_clients=num_clients) kvserver.add_part_policy(node_policy) kvserver.add_part_policy(edge_policy) if kvserver.is_backup_server(): kvserver.init_data('data_0', 'node') kvserver.init_data('data_0_1', 'node') kvserver.init_data('data_0_2', 'node') kvserver.init_data('data_0_3', 'node') else: kvserver.init_data('data_0', 'node', data_0) kvserver.init_data('data_0_1', 'node', data_0_1) kvserver.init_data('data_0_2', 'node', data_0_2) kvserver.init_data('data_0_3', 'node', data_0_3) # start server server_state = dgl.distributed.ServerState(kv_store=kvserver, local_g=None, partition_book=None) dgl.distributed.start_server(server_id=server_id, ip_config='kv_ip_config.txt', num_clients=num_clients, server_state=server_state) def start_client(num_clients): # Note: connect to server first ! dgl.distributed.connect_to_server(ip_config='kv_ip_config.txt') # Init kvclient kvclient = dgl.distributed.KVClient(ip_config='kv_ip_config.txt') assert dgl.distributed.get_num_client() == num_clients kvclient.init_data(name='data_1', shape=F.shape(data_1), dtype=F.dtype(data_1), part_policy=edge_policy, init_func=init_zero_func) kvclient.init_data(name='data_2', shape=F.shape(data_2), dtype=F.dtype(data_2), part_policy=node_policy, init_func=init_zero_func) kvclient.map_shared_data(partition_book=gpb) # Test data_name_list name_list = kvclient.data_name_list() print(name_list) assert 'data_0' in name_list assert 'data_0_1' in name_list assert 'data_0_2' in name_list assert 'data_0_3' in name_list assert 'data_1' in name_list assert 'data_2' in name_list # Test get_meta_data meta = kvclient.get_data_meta('data_0') dtype, shape, policy = meta assert dtype == F.dtype(data_0) assert shape == F.shape(data_0) assert policy.policy_str == 'node' meta = kvclient.get_data_meta('data_0_1') dtype, shape, policy = meta assert dtype == F.dtype(data_0_1) assert shape == F.shape(data_0_1) assert policy.policy_str == 'node' meta = kvclient.get_data_meta('data_0_2') dtype, shape, policy = meta assert dtype == F.dtype(data_0_2) assert shape == F.shape(data_0_2) assert policy.policy_str == 'node' meta = kvclient.get_data_meta('data_0_3') dtype, shape, policy = meta assert dtype == F.dtype(data_0_3) assert shape == F.shape(data_0_3) assert policy.policy_str == 'node' meta = kvclient.get_data_meta('data_1') dtype, shape, policy = meta assert dtype == F.dtype(data_1) assert shape == F.shape(data_1) assert policy.policy_str == 'edge' meta = kvclient.get_data_meta('data_2') dtype, shape, policy = meta assert dtype == F.dtype(data_2) assert shape == F.shape(data_2) assert policy.policy_str == 'node' # Test push and pull id_tensor = F.tensor([0,2,4], F.int64) data_tensor = F.tensor([[6.,6.],[6.,6.],[6.,6.]], F.float32) kvclient.push(name='data_0', id_tensor=id_tensor, data_tensor=data_tensor) kvclient.push(name='data_1', id_tensor=id_tensor, data_tensor=data_tensor) kvclient.push(name='data_2', id_tensor=id_tensor, data_tensor=data_tensor) res = kvclient.pull(name='data_0', id_tensor=id_tensor) assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor)) res = kvclient.pull(name='data_1', id_tensor=id_tensor) assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor)) res = kvclient.pull(name='data_2', id_tensor=id_tensor) assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor)) # Register new push handler kvclient.register_push_handler('data_0', udf_push) kvclient.register_push_handler('data_1', udf_push) kvclient.register_push_handler('data_2', udf_push) # Test push and pull kvclient.push(name='data_0', id_tensor=id_tensor, data_tensor=data_tensor) kvclient.push(name='data_1', id_tensor=id_tensor, data_tensor=data_tensor) kvclient.push(name='data_2', id_tensor=id_tensor, data_tensor=data_tensor) kvclient.barrier() data_tensor = data_tensor * data_tensor res = kvclient.pull(name='data_0', id_tensor=id_tensor) assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor)) res = kvclient.pull(name='data_1', id_tensor=id_tensor) assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor)) res = kvclient.pull(name='data_2', id_tensor=id_tensor) assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor)) # Register new push handler kvclient.init_data(name='data_3', shape=F.shape(data_2), dtype=F.dtype(data_2), part_policy=node_policy, init_func=init_zero_func) kvclient.register_push_handler('data_3', add_push) kvclient.map_shared_data(partition_book=gpb) data_tensor = F.tensor([[6.,6.],[6.,6.],[6.,6.]], F.float32) time.sleep(kvclient.client_id + 1) print("add...") kvclient.push(name='data_3', id_tensor=id_tensor, data_tensor=data_tensor) kvclient.barrier() res = kvclient.pull(name='data_3', id_tensor=id_tensor) data_tensor = data_tensor * num_clients assert_array_equal(F.asnumpy(res), F.asnumpy(data_tensor)) # clean up kvclient.barrier() dgl.distributed.shutdown_servers() dgl.distributed.finalize_client() @unittest.skipIf(os.name == 'nt' or os.getenv('DGLBACKEND') == 'tensorflow', reason='Do not support windows and TF yet') def test_kv_store(): ip_config = open("kv_ip_config.txt", "w") num_servers = 2 num_clients = 2 ip_addr = get_local_usable_addr() ip_config.write('{} {}\n'.format(ip_addr, num_servers)) ip_config.close() ctx = mp.get_context('spawn') pserver_list = [] pclient_list = [] for i in range(num_servers): pserver = ctx.Process(target=start_server, args=(i, num_clients)) pserver.start() pserver_list.append(pserver) time.sleep(2) for i in range(num_clients): pclient = ctx.Process(target=start_client, args=(num_clients,)) pclient.start() pclient_list.append(pclient) for i in range(num_clients): pclient_list[i].join() for i in range(num_servers): pserver_list[i].join() if __name__ == '__main__': test_partition_policy() test_kv_store()