Unverified Commit c5e83757 authored by Serge Panev's avatar Serge Panev Committed by GitHub
Browse files

[Dist][Test] Add tests for multi-node DistGraph (#4396)


Signed-off-by: default avatarSerge Panev <spanev@nvidia.com>
Signed-off-by: default avatarSerge Panev <spanev@nvidia.com>
parent 2caa6bd0
......@@ -4,7 +4,8 @@ import numpy as np
import dgl
import dgl.backend as F
from dgl.distributed import load_partition_book
import json
from dgl.distributed import load_partition_book, node_split, edge_split
mode = os.environ.get("DIST_DGL_TEST_MODE", "")
graph_name = os.environ.get("DIST_DGL_TEST_GRAPH_NAME", "random_test_graph")
......@@ -19,10 +20,19 @@ ip_config = os.environ.get("DIST_DGL_TEST_IP_CONFIG", "ip_config.txt")
os.environ["DGL_DIST_MODE"] = "distributed"
def batched_assert_zero(tensor, size):
BATCH_SIZE=2**16
curr_pos = 0
while curr_pos < size:
end = min(curr_pos + BATCH_SIZE, size)
assert F.sum(tensor[F.arange(curr_pos, end)], 0) == 0
curr_pos = end
def zeros_init(shape, dtype):
return F.zeros(shape, dtype=dtype, ctx=F.cpu())
def rand_init(shape, dtype):
return F.tensor((np.random.randint(0, 100, size=shape) > 30), dtype=dtype)
def run_server(
graph_name,
......@@ -48,6 +58,156 @@ def run_server(
g.start()
##########################################
############### DistGraph ###############
##########################################
def node_split_test(g, force_even, ntype='_N'):
gpb = g.get_partition_book()
selected_nodes_dist_tensor = dgl.distributed.DistTensor([g.number_of_nodes(ntype)], F.uint8, init_func=rand_init)
nodes = node_split(selected_nodes_dist_tensor, gpb, ntype=ntype, force_even=force_even)
g.barrier()
selected_nodes_dist_tensor[nodes] = F.astype(F.zeros_like(nodes), selected_nodes_dist_tensor.dtype)
g.barrier()
if g.rank() == 0:
batched_assert_zero(selected_nodes_dist_tensor, g.number_of_nodes(ntype))
g.barrier()
def edge_split_test(g, force_even, etype='_E'):
gpb = g.get_partition_book()
selected_edges_dist_tensor = dgl.distributed.DistTensor([g.number_of_edges(etype)], F.uint8, init_func=rand_init)
edges = edge_split(selected_edges_dist_tensor, gpb, etype=etype, force_even=force_even)
g.barrier()
selected_edges_dist_tensor[edges] = F.astype(F.zeros_like(edges), selected_edges_dist_tensor.dtype)
g.barrier()
if g.rank() == 0:
batched_assert_zero(selected_edges_dist_tensor, g.number_of_edges(etype))
g.barrier()
def test_dist_graph(g):
gpb_path = graph_path + '/{}.json'.format(graph_name)
with open(gpb_path) as conf_f:
part_metadata = json.load(conf_f)
assert 'num_nodes' in part_metadata
assert 'num_edges' in part_metadata
num_nodes = part_metadata['num_nodes']
num_edges = part_metadata['num_edges']
assert g.number_of_nodes() == num_nodes
assert g.number_of_edges() == num_edges
num_nodes = {ntype : g.num_nodes(ntype) for ntype in g.ntypes}
num_edges = {etype : g.num_edges(etype) for etype in g.etypes}
for key, n_nodes in num_nodes.items():
assert g.number_of_nodes(key) == n_nodes
node_split_test(g, force_even=False, ntype=key)
node_split_test(g, force_even=True, ntype=key)
for key, n_edges in num_edges.items():
assert g.number_of_edges(key) == n_edges
edge_split_test(g, force_even=False, etype=key)
edge_split_test(g, force_even=True, etype=key)
##########################################
########### DistGraphServices ###########
##########################################
nids = F.arange(0, 16)
# Test in_degrees
orig_in_degrees = g.ndata['in_degrees']
local_in_degrees = g.in_degrees(nids)
F.allclose(local_in_degrees, orig_in_degrees[nids])
# Test out_degrees
orig_out_degrees = g.ndata['out_degrees']
local_out_degrees = g.out_degrees(nids)
F.allclose(local_out_degrees, orig_out_degrees[nids])
find_edges_test(g)
edge_subgraph_test(g)
sample_neighbors_test(g)
def find_edges_test(g, orig_nid_map):
etypes = g.canonical_etypes
etype_eids_uv_map = dict()
for u_type, etype, v_type in etypes:
orig_u = g.edges[etype].data['edge_u']
orig_v = g.edges[etype].data['edge_v']
eids = F.tensor(np.random.randint(g.number_of_edges(etype), size=100))
u, v = g.find_edges(eids, etype=etype)
assert F.allclose(orig_nid_map[u_type][u], orig_u[eids])
assert F.allclose(orig_nid_map[v_type][v], orig_v[eids])
etype_eids_uv_map[etype] = (eids, F.cat([u, v], dim=0))
return etype_eids_uv_map
def edge_subgraph_test(g, etype_eids_uv_map):
etypes = g.canonical_etypes
all_eids = dict()
for t in etypes:
all_eids[t] = etype_eids_uv_map[t[1]][0]
sg = g.edge_subgraph(all_eids)
for t in etypes:
assert sg.number_of_edges(t[1]) == len(all_eids[t])
assert F.allclose(sg.edges[t].data[dgl.EID], all_eids[t])
for u_type, etype, v_type in etypes:
uv = etype_eids_uv_map[etype][1]
sg_u_nids = sg.nodes[u_type].data[dgl.NID]
sg_v_nids = sg.nodes[v_type].data[dgl.NID]
sg_uv = F.cat([sg_u_nids, sg_v_nids], dim=0)
for node_id in uv:
assert node_id in sg_uv
def sample_neighbors_with_args(g, size, fanout):
num_nodes = {ntype : g.num_nodes(ntype) for ntype in g.ntypes}
etypes = g.canonical_etypes
sampled_graph = g.sample_neighbors({ntype : np.random.randint(0, n, size=size) for ntype, n in num_nodes.items()}, fanout)
for ntype, n in num_nodes.items():
assert sampled_graph.number_of_nodes(ntype) == n
for t in etypes:
src, dst = sampled_graph.edges(etype=t)
eids = sampled_graph.edges[t].data[dgl.EID]
dist_u, dist_v = g.find_edges(eids, etype=t[1])
assert F.allclose(dist_u, src)
assert F.allclose(dist_v, dst)
def sample_neighbors_test(g):
sample_neighbors_with_args(g, size=1024, fanout=3)
sample_neighbors_with_args(g, size=1, fanout=10)
sample_neighbors_with_args(g, size=1024, fanout=2)
sample_neighbors_with_args(g, size=10, fanout=-1)
sample_neighbors_with_args(g, size=2**10, fanout=1)
sample_neighbors_with_args(g, size=2**12, fanout=1)
def test_dist_graph_services(g):
num_nodes = {ntype : g.num_nodes(ntype) for ntype in g.ntypes}
orig_nid_map = dict()
dtype = g.edges[g.etypes[0]].data['edge_u'].dtype
for ntype, _ in num_nodes.items():
orig_nid = F.tensor(np.load(graph_path + f'/orig_nid_array_{ntype}.npy'), dtype)
orig_nid_map[ntype] = orig_nid
etype_eids_uv_map = find_edges_test(g, orig_nid_map)
edge_subgraph_test(g, etype_eids_uv_map)
sample_neighbors_test(g)
##########################################
############### DistTensor ###############
##########################################
......@@ -254,6 +414,8 @@ elif mode == "client":
g = dgl.distributed.DistGraph(graph_name, gpb=gpb)
target_func_map = {
"DistGraph": test_dist_graph,
"DistGraphServices": test_dist_graph_services,
"DistTensor": test_dist_tensor,
"DistEmbedding": test_dist_embedding,
"DistOptimizer": test_dist_optimizer,
......
......@@ -6,6 +6,7 @@ import unittest
import numpy as np
import pytest
import utils
import shutil
import dgl
import dgl.backend as F
......@@ -13,7 +14,7 @@ from dgl.distributed import partition_graph
graph_name = os.environ.get("DIST_DGL_TEST_GRAPH_NAME", "random_test_graph")
target = os.environ.get("DIST_DGL_TEST_OBJECT_TYPE", "")
shared_workspace = os.environ.get("DIST_DGL_TEST_WORKSPACE")
shared_workspace = os.environ.get('DIST_DGL_TEST_WORKSPACE', '/shared_workspace/dgl_dist_tensor_test/')
def create_graph(num_part, dist_graph_path, hetero):
......@@ -21,7 +22,23 @@ def create_graph(num_part, dist_graph_path, hetero):
g = dgl.rand_graph(10000, 42000)
g.ndata["feat"] = F.unsqueeze(F.arange(0, g.number_of_nodes()), 1)
g.edata["feat"] = F.unsqueeze(F.arange(0, g.number_of_edges()), 1)
partition_graph(g, graph_name, num_part, dist_graph_path)
g.ndata["in_degrees"] = g.in_degrees()
g.ndata["out_degrees"] = g.out_degrees()
etype = g.etypes[0]
ntype = g.ntypes[0]
edge_u, edge_v = g.find_edges(F.arange(0, g.number_of_edges(etype)))
g.edges[etype].data["edge_u"] = edge_u
g.edges[etype].data["edge_v"] = edge_v
orig_nid, orig_eid = partition_graph(g, graph_name, num_part, dist_graph_path, return_mapping=True)
orig_nid_f = os.path.join(dist_graph_path, f"orig_nid_array_{ntype}.npy")
np.save(orig_nid_f, orig_nid.numpy())
orig_eid_f = os.path.join(dist_graph_path, f"orig_eid_array_{etype}.npy")
np.save(orig_eid_f, orig_eid.numpy())
else:
from scipy import sparse as spsp
......@@ -39,15 +56,29 @@ def create_graph(num_part, dist_graph_path, hetero):
)
edges[etype] = (arr.row, arr.col)
g = dgl.heterograph(edges, num_nodes)
g.nodes["n1"].data["feat"] = F.unsqueeze(
F.arange(0, g.number_of_nodes("n1")), 1
)
g.edges["r1"].data["feat"] = F.unsqueeze(
F.arange(0, g.number_of_edges("r1")), 1
)
partition_graph(g, graph_name, num_part, dist_graph_path)
for _, etype, _ in etypes:
edge_u, edge_v = g.find_edges(F.arange(0, g.number_of_edges(etype)))
g.edges[etype].data["edge_u"] = edge_u
g.edges[etype].data["edge_v"] = edge_v
orig_nid, orig_eid = partition_graph(g, graph_name, num_part, dist_graph_path, return_mapping=True)
for n_type, tensor in orig_nid.items():
orig_nid_f = os.path.join(dist_graph_path, f"orig_nid_array_{n_type}.npy")
np.save(orig_nid_f, tensor.numpy())
for e_type, tensor in orig_eid.items():
orig_eid_f = os.path.join(dist_graph_path, f"orig_eid_array_{e_type}.npy")
np.save(orig_eid_f, tensor.numpy())
@unittest.skipIf(os.name == "nt", reason="Do not support windows yet")
@pytest.mark.parametrize("net_type", ["tensorpipe", "socket"])
@pytest.mark.parametrize("num_servers", [1, 4])
......@@ -60,9 +91,6 @@ def test_dist_objects(net_type, num_servers, num_clients, hetero, shared_mem):
f"Backup servers are not supported when shared memory is disabled"
)
ip_config = os.environ.get("DIST_DGL_TEST_IP_CONFIG", "ip_config.txt")
workspace = os.environ.get(
"DIST_DGL_TEST_WORKSPACE", "/shared_workspace/dgl_dist_tensor_test/"
)
ips = utils.get_ips(ip_config)
num_part = len(ips)
......@@ -72,13 +100,13 @@ def test_dist_objects(net_type, num_servers, num_clients, hetero, shared_mem):
)
dist_graph_path = os.path.join(
workspace, "hetero_dist_graph" if hetero else "dist_graph"
shared_workspace, "hetero_dist_graph" if hetero else "dist_graph"
)
if not os.path.isdir(dist_graph_path):
create_graph(num_part, dist_graph_path, hetero)
base_envs = (
f"DIST_DGL_TEST_WORKSPACE={workspace} "
f"DIST_DGL_TEST_WORKSPACE={shared_workspace} "
f"DIST_DGL_TEST_NUM_PART={num_part} "
f"DIST_DGL_TEST_NUM_SERVER={num_servers} "
f"DIST_DGL_TEST_NUM_CLIENT={num_clients} "
......@@ -117,3 +145,10 @@ def test_dist_objects(net_type, num_servers, num_clients, hetero, shared_mem):
for p in procs:
p.join()
assert p.exitcode == 0
def teardown():
for name in ['dist_graph', 'hetero_dist_graph']:
path = os.path.join(shared_workspace, name)
if os.path.exists(path):
print(f"Removing {path}...")
shutil.rmtree(path)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment