Unverified Commit e3b6ac8e authored by Serge Panev's avatar Serge Panev Committed by GitHub
Browse files

[Dist][Test] Add tests for multi-node DistTensor (#4122)


Signed-off-by: default avatarSerge Panev <spanev@nvidia.com>
Co-authored-by: default avatarRhett Ying <85214957+Rhett-Ying@users.noreply.github.com>
parent 28b09047
...@@ -28,6 +28,9 @@ wheels/ ...@@ -28,6 +28,9 @@ wheels/
*.egg *.egg
MANIFEST MANIFEST
# Whitelist some distribution / package non-related directories
!tests/dist
# PyInstaller # PyInstaller
# Usually these files are written by a python script from a template # Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it. # before PyInstaller builds the exe, so as to inject date/other infos into it.
......
import dgl
import torch
import os
import numpy as np
import dgl.backend as F
from dgl.distributed import load_partition_book
import time
mode = os.environ.get('DIST_DGL_TEST_MODE', "")
graph_name = os.environ.get('DIST_DGL_TEST_GRAPH_NAME', 'random_test_graph')
num_part = int(os.environ.get('DIST_DGL_TEST_NUM_PART'))
num_servers_per_machine = int(os.environ.get('DIST_DGL_TEST_NUM_SERVER'))
num_client_per_machine = int(os.environ.get('DIST_DGL_TEST_NUM_CLIENT'))
shared_workspace = os.environ.get('DIST_DGL_TEST_WORKSPACE')
graph_path = os.environ.get('DIST_DGL_TEST_GRAPH_PATH')
part_id = int(os.environ.get('DIST_DGL_TEST_PART_ID'))
net_type = os.environ.get('DIST_DGL_TEST_NET_TYPE')
ip_config = os.environ.get('DIST_DGL_TEST_IP_CONFIG', 'ip_config.txt')
os.environ['DGL_DIST_MODE'] = 'distributed'
def zeros_init(shape, dtype):
return F.zeros(shape, dtype=dtype, ctx=F.cpu())
def run_server(graph_name, server_id, server_count, num_clients, shared_mem, keep_alive=False):
# server_count = num_servers_per_machine
g = dgl.distributed.DistGraphServer(server_id, ip_config,
server_count, num_clients,
graph_path + '/{}.json'.format(graph_name),
disable_shared_mem=not shared_mem,
graph_format=['csc', 'coo'], keep_alive=keep_alive,
net_type=net_type)
print('start server', server_id)
g.start()
def dist_tensor_test_sanity(data_shape, rank, name=None):
dist_ten = dgl.distributed.DistTensor(data_shape,
F.int32,
init_func=zeros_init,
name=name)
# arbitrary value
stride = 3
if part_id == 0:
dist_ten[rank*stride:(rank+1)*stride] = F.ones((stride, 2), dtype=F.int32, ctx=F.cpu()) * (rank+1)
dgl.distributed.client_barrier()
else:
dgl.distributed.client_barrier()
original_rank = rank % num_client_per_machine
assert F.allclose(dist_ten[original_rank*stride:(original_rank+1)*stride],
F.ones((stride, 2), dtype=F.int32, ctx=F.cpu()) * (original_rank+1))
def dist_tensor_test_destroy_recreate(data_shape, name):
dist_ten = dgl.distributed.DistTensor(data_shape, F.float32, name, init_func=zeros_init)
del dist_ten
dgl.distributed.client_barrier()
new_shape = (data_shape[0], 4)
dist_ten = dgl.distributed.DistTensor(new_shape, F.float32, name, init_func=zeros_init)
def dist_tensor_test_persistent(data_shape):
dist_ten_name = 'persistent_dist_tensor'
dist_ten = dgl.distributed.DistTensor(data_shape, F.float32, dist_ten_name, init_func=zeros_init,
persistent=True)
del dist_ten
try:
dist_ten = dgl.distributed.DistTensor(data_shape, F.float32, dist_ten_name)
raise Exception('')
except:
pass
def test_dist_tensor(g, rank):
first_type = g.ntypes[0]
data_shape = (g.number_of_nodes(first_type), 2)
dist_tensor_test_sanity(data_shape, rank)
dist_tensor_test_sanity(data_shape, rank, name="DistTensorSanity")
dist_tensor_test_destroy_recreate(data_shape, name="DistTensorRecreate")
dist_tensor_test_persistent(data_shape)
if mode == "server":
shared_mem = bool(int(os.environ.get('DIST_DGL_TEST_SHARED_MEM')))
server_id = int(os.environ.get('DIST_DGL_TEST_SERVER_ID'))
run_server(graph_name, server_id, server_count=num_servers_per_machine,
num_clients=num_part*num_client_per_machine, shared_mem=shared_mem, keep_alive=False)
elif mode == "client":
os.environ['DGL_NUM_SERVER'] = str(num_servers_per_machine)
dgl.distributed.initialize(ip_config, net_type=net_type)
global_rank = dgl.distributed.get_rank()
gpb, graph_name, _, _ = load_partition_book(graph_path + '/{}.json'.format(graph_name), part_id, None)
g = dgl.distributed.DistGraph(graph_name, gpb=gpb)
target = os.environ.get('DIST_DGL_TEST_OBJECT_TYPE', 'DistTensor')
if target == "DistTensor":
test_dist_tensor(g, global_rank)
elif target == "DistEmbedding":
# TODO: implement DistEmbedding
pass
else:
print(target + " is not a valid DIST_DGL_TEST_OBJECT_TYPE")
else:
print("DIST_DGL_TEST_MODE has to be either server or client")
exit(1)
import os
import unittest
import pytest
import multiprocessing as mp
import subprocess
import utils
import dgl
import numpy as np
import dgl.backend as F
from dgl.distributed import partition_graph
graph_name = os.environ.get('DIST_DGL_TEST_GRAPH_NAME', 'random_test_graph')
shared_workspace = os.environ.get('DIST_DGL_TEST_WORKSPACE')
def create_graph(num_part, dist_graph_path, hetero):
if not hetero:
g = dgl.rand_graph(10000, 42000)
g.ndata['feat'] = F.unsqueeze(F.arange(0, g.number_of_nodes()), 1)
g.edata['feat'] = F.unsqueeze(F.arange(0, g.number_of_edges()), 1)
partition_graph(g, graph_name, num_part, dist_graph_path)
else:
from scipy import sparse as spsp
num_nodes = {'n1': 10000, 'n2': 10010, 'n3': 10020}
etypes = [('n1', 'r1', 'n2'),
('n1', 'r2', 'n3'),
('n2', 'r3', 'n3')]
edges = {}
for etype in etypes:
src_ntype, _, dst_ntype = etype
arr = spsp.random(num_nodes[src_ntype], num_nodes[dst_ntype], density=0.001, format='coo',
random_state=100)
edges[etype] = (arr.row, arr.col)
g = dgl.heterograph(edges, num_nodes)
g.nodes['n1'].data['feat'] = F.unsqueeze(F.arange(0, g.number_of_nodes('n1')), 1)
g.edges['r1'].data['feat'] = F.unsqueeze(F.arange(0, g.number_of_edges('r1')), 1)
partition_graph(g, graph_name, num_part, dist_graph_path)
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@pytest.mark.parametrize("target", ['DistTensor'])
@pytest.mark.parametrize("net_type", ['tensorpipe', 'socket'])
@pytest.mark.parametrize("num_servers", [1, 4])
@pytest.mark.parametrize("num_clients", [1, 4])
@pytest.mark.parametrize("hetero", [False, True])
@pytest.mark.parametrize("shared_mem", [False, True])
def test_dist_objects(target, net_type, num_servers, num_clients, hetero, shared_mem):
if not shared_mem and num_servers > 1:
pytest.skip(f"Backup servers are not supported when shared memory is disabled")
ip_config = os.environ.get('DIST_DGL_TEST_IP_CONFIG', 'ip_config.txt')
workspace = os.environ.get('DIST_DGL_TEST_WORKSPACE', '/shared_workspace/dgl_dist_tensor_test/')
ips = utils.get_ips(ip_config)
num_part = len(ips)
test_bin = os.path.join(os.environ.get(
'DIST_DGL_TEST_PY_BIN_DIR', '.'), 'run_dist_objects.py')
dist_graph_path = os.path.join(workspace, 'hetero_dist_graph' if hetero else 'dist_graph')
if not os.path.isdir(dist_graph_path):
create_graph(num_part, dist_graph_path, hetero)
base_envs = f"DIST_DGL_TEST_WORKSPACE={workspace} " \
f"DIST_DGL_TEST_NUM_PART={num_part} " \
f"DIST_DGL_TEST_NUM_SERVER={num_servers} " \
f"DIST_DGL_TEST_NUM_CLIENT={num_clients} " \
f"DIST_DGL_TEST_NET_TYPE={net_type} " \
f"DIST_DGL_TEST_GRAPH_PATH={dist_graph_path} " \
f"DIST_DGL_TEST_IP_CONFIG={ip_config} "
procs = []
# Start server
server_id = 0
for part_id, ip in enumerate(ips):
for _ in range(num_servers):
cmd_envs = base_envs + \
f"DIST_DGL_TEST_SERVER_ID={server_id} " \
f"DIST_DGL_TEST_PART_ID={part_id} " \
f"DIST_DGL_TEST_SHARED_MEM={str(int(shared_mem))} " \
f"DIST_DGL_TEST_MODE=server "
procs.append(utils.execute_remote(
f"{cmd_envs} python3 {test_bin}",
ip))
server_id += 1
# Start client processes
for part_id, ip in enumerate(ips):
for _ in range(num_clients):
cmd_envs = base_envs + \
f"DIST_DGL_TEST_PART_ID={part_id} " \
f"DIST_DGL_TEST_OBJECT_TYPE={target} " \
f"DIST_DGL_TEST_MODE=client "
procs.append(utils.execute_remote(
f"{cmd_envs} python3 {test_bin}",
ip))
for p in procs:
p.join()
assert p.exitcode == 0
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment