Unverified Commit f40db9b7 authored by Serge Panev's avatar Serge Panev Committed by GitHub
Browse files

[Dist][Test] Add exit_code asserts in some distributed tests (#5053)


Signed-off-by: default avatarSerge Panev <spanev@nvidia.com>
Signed-off-by: default avatarSerge Panev <spanev@nvidia.com>
parent 11e61905
......@@ -165,9 +165,11 @@ def check_server_client_empty(shared_mem, num_servers, num_clients):
for p in cli_ps:
p.join()
assert p.exitcode == 0
for p in serv_ps:
p.join()
assert p.exitcode == 0
print("clients have terminated")
......@@ -528,6 +530,7 @@ def check_dist_emb_server_client(
dgl.distributed.shutdown_servers("kv_ip_config.txt", num_servers)
for p in serv_ps:
p.join()
assert p.exitcode == 0
print("clients have terminated")
......@@ -585,6 +588,7 @@ def check_server_client(shared_mem, num_servers, num_clients, num_groups=1):
cli_ps.append(p)
for p in cli_ps:
p.join()
assert p.exitcode == 0
if keep_alive:
for p in serv_ps:
......@@ -593,6 +597,7 @@ def check_server_client(shared_mem, num_servers, num_clients, num_groups=1):
dgl.distributed.shutdown_servers("kv_ip_config.txt", num_servers)
for p in serv_ps:
p.join()
assert p.exitcode == 0
print("clients have terminated")
......@@ -659,9 +664,10 @@ def check_server_client_hierarchy(shared_mem, num_servers, num_clients):
for p in cli_ps:
p.join()
assert p.exitcode == 0
for p in serv_ps:
p.join()
assert p.exitcode == 0
nodes1 = []
edges1 = []
for n, e in return_dict.values():
......@@ -889,9 +895,11 @@ def check_server_client_hetero(shared_mem, num_servers, num_clients):
for p in cli_ps:
p.join()
assert p.exitcode == 0
for p in serv_ps:
p.join()
assert p.exitcode == 0
print("clients have terminated")
......@@ -1022,6 +1030,7 @@ def check_dist_optim_server_client(
for p in serv_ps:
p.join()
assert p.exitcode == 0
@unittest.skipIf(
......
......@@ -115,6 +115,7 @@ def check_rpc_sampling(tmpdir, num_server):
print("Done sampling")
for p in pserver_list:
p.join()
assert p.exitcode == 0
src, dst = sampled_graph.edges()
assert sampled_graph.number_of_nodes() == g.number_of_nodes()
......@@ -242,6 +243,7 @@ def check_rpc_get_degree_shuffle(tmpdir, num_server):
print("Done get_degree")
for p in pserver_list:
p.join()
assert p.exitcode == 0
print('check results')
assert F.array_equal(g.in_degrees(orig_nid[nids]), in_degs)
......@@ -303,6 +305,7 @@ def check_rpc_sampling_shuffle(tmpdir, num_server, num_groups=1):
pclient_list.append(p)
for p in pclient_list:
p.join()
assert p.exitcode == 0
if keep_alive:
for p in pserver_list:
assert p.is_alive()
......@@ -310,6 +313,7 @@ def check_rpc_sampling_shuffle(tmpdir, num_server, num_groups=1):
dgl.distributed.shutdown_servers("rpc_ip_config.txt", 1)
for p in pserver_list:
p.join()
assert p.exitcode == 0
def start_hetero_sample_client(rank, tmpdir, disable_shared_mem, nodes):
gpb = None
......@@ -390,6 +394,7 @@ def check_rpc_hetero_sampling_shuffle(tmpdir, num_server):
print("Done sampling")
for p in pserver_list:
p.join()
assert p.exitcode == 0
for c_etype in block.canonical_etypes:
src_type, etype, dst_type = c_etype
......@@ -443,6 +448,7 @@ def check_rpc_hetero_sampling_empty_shuffle(tmpdir, num_server):
print("Done sampling")
for p in pserver_list:
p.join()
assert p.exitcode == 0
assert block.number_of_edges() == 0
assert len(block.etypes) == len(g.etypes)
......@@ -476,6 +482,7 @@ def check_rpc_hetero_etype_sampling_shuffle(tmpdir, num_server, graph_formats=No
print("Done sampling")
for p in pserver_list:
p.join()
assert p.exitcode == 0
src, dst = block.edges(etype=('n1', 'r13', 'n3'))
assert len(src) == 18
......@@ -526,6 +533,7 @@ def check_rpc_hetero_etype_sampling_empty_shuffle(tmpdir, num_server):
print("Done sampling")
for p in pserver_list:
p.join()
assert p.exitcode == 0
assert block.number_of_edges() == 0
assert len(block.etypes) == len(g.etypes)
......@@ -618,6 +626,7 @@ def check_rpc_bipartite_sampling_empty(tmpdir, num_server):
print("Done sampling")
for p in pserver_list:
p.join()
assert p.exitcode == 0
assert block.number_of_edges() == 0
assert len(block.etypes) == len(g.etypes)
......@@ -650,6 +659,7 @@ def check_rpc_bipartite_sampling_shuffle(tmpdir, num_server):
print("Done sampling")
for p in pserver_list:
p.join()
assert p.exitcode == 0
for c_etype in block.canonical_etypes:
src_type, etype, dst_type = c_etype
......@@ -701,6 +711,7 @@ def check_rpc_bipartite_etype_sampling_empty(tmpdir, num_server):
print("Done sampling")
for p in pserver_list:
p.join()
assert p.exitcode == 0
assert block is not None
assert block.number_of_edges() == 0
......@@ -735,6 +746,7 @@ def check_rpc_bipartite_etype_sampling_shuffle(tmpdir, num_server):
print("Done sampling")
for p in pserver_list:
p.join()
assert p.exitcode == 0
for c_etype in block.canonical_etypes:
src_type, etype, dst_type = c_etype
......@@ -919,6 +931,7 @@ def check_rpc_in_subgraph_shuffle(tmpdir, num_server):
sampled_graph = start_in_subgraph_client(0, tmpdir, num_server > 1, nodes)
for p in pserver_list:
p.join()
assert p.exitcode == 0
src, dst = sampled_graph.edges()
src = orig_nid[src]
......
......@@ -308,8 +308,10 @@ def check_neg_dataloader(g, num_server, num_workers):
for p in pserver_list:
p.join()
assert p.exitcode == 0
for p in ptrainer_list:
p.join()
assert p.exitcode == 0
@pytest.mark.parametrize("num_server", [3])
......@@ -392,6 +394,7 @@ def test_dist_dataloader(
for p in ptrainer_list:
p.join()
assert p.exitcode == 0
if keep_alive:
for p in pserver_list:
assert p.is_alive()
......@@ -399,6 +402,7 @@ def test_dist_dataloader(
dgl.distributed.shutdown_servers("mp_ip_config.txt", 1)
for p in pserver_list:
p.join()
assert p.exitcode == 0
def start_node_dataloader(
......@@ -619,8 +623,10 @@ def check_dataloader(g, num_server, num_workers, dataloader_type):
ptrainer_list.append(p)
for p in pserver_list:
p.join()
assert p.exitcode == 0
for p in ptrainer_list:
p.join()
assert p.exitcode == 0
def create_random_hetero():
......@@ -757,7 +763,9 @@ def test_multiple_dist_dataloaders(
p_client.start()
p_client.join()
assert p_client.exitcode == 0
for p in p_servers:
p.join()
assert p.exitcode == 0
reset_envs()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment