Unverified Commit 77006db2 authored by Jinjing Zhou's avatar Jinjing Zhou Committed by GitHub
Browse files

[CI] Use spawn in unittest instead of fork (#1627)



* update

* update

* update

* update

* update

* update

* fix ci using spawn
Co-authored-by: default avataraksnzhy <mctt90@gmail.com>
parent 0c313e51
...@@ -53,7 +53,7 @@ def create_random_graph(n): ...@@ -53,7 +53,7 @@ def create_random_graph(n):
def run_server(graph_name, server_id, num_clients, barrier): def run_server(graph_name, server_id, num_clients, barrier):
g = DistGraphServer(server_id, "kv_ip_config.txt", num_clients, graph_name, g = DistGraphServer(server_id, "kv_ip_config.txt", num_clients, graph_name,
'/tmp/{}.json'.format(graph_name)) '/tmp/dist_graph/{}.json'.format(graph_name))
barrier.wait() barrier.wait()
print('start server', server_id) print('start server', server_id)
g.start() g.start()
...@@ -125,30 +125,35 @@ def test_server_client(): ...@@ -125,30 +125,35 @@ def test_server_client():
# Partition the graph # Partition the graph
num_parts = 1 num_parts = 1
graph_name = 'dist_graph_test' graph_name = 'dist_graph_test_2'
g.ndata['features'] = F.unsqueeze(F.arange(0, g.number_of_nodes()), 1) g.ndata['features'] = F.unsqueeze(F.arange(0, g.number_of_nodes()), 1)
g.edata['features'] = F.unsqueeze(F.arange(0, g.number_of_edges()), 1) g.edata['features'] = F.unsqueeze(F.arange(0, g.number_of_edges()), 1)
partition_graph(g, graph_name, num_parts, '/tmp') partition_graph(g, graph_name, num_parts, '/tmp/dist_graph')
# let's just test on one partition for now. # let's just test on one partition for now.
# We cannot run multiple servers and clients on the same machine. # We cannot run multiple servers and clients on the same machine.
barrier = mp.Barrier(2) barrier = mp.Barrier(2)
serv_ps = [] serv_ps = []
ctx = mp.get_context('spawn')
for serv_id in range(1): for serv_id in range(1):
p = Process(target=run_server, args=(graph_name, serv_id, 1, barrier)) p = ctx.Process(target=run_server, args=(graph_name, serv_id, 1, barrier))
serv_ps.append(p) serv_ps.append(p)
p.start() p.start()
cli_ps = [] cli_ps = []
for cli_id in range(1): for cli_id in range(1):
print('start client', cli_id) print('start client', cli_id)
p = Process(target=run_client, args=(graph_name, barrier, g.number_of_nodes(), p = ctx.Process(target=run_client, args=(graph_name, barrier, g.number_of_nodes(),
g.number_of_edges())) g.number_of_edges()))
p.start() p.start()
cli_ps.append(p) cli_ps.append(p)
for p in cli_ps: for p in cli_ps:
p.join() p.join()
for p in serv_ps:
p.join()
print('clients have terminated') print('clients have terminated')
def test_split(): def test_split():
...@@ -156,14 +161,14 @@ def test_split(): ...@@ -156,14 +161,14 @@ def test_split():
g = create_random_graph(10000) g = create_random_graph(10000)
num_parts = 4 num_parts = 4
num_hops = 2 num_hops = 2
partition_graph(g, 'dist_graph_test', num_parts, '/tmp', num_hops=num_hops, part_method='metis') partition_graph(g, 'dist_graph_test', num_parts, '/tmp/dist_graph', num_hops=num_hops, part_method='metis')
node_mask = np.random.randint(0, 100, size=g.number_of_nodes()) > 30 node_mask = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
edge_mask = np.random.randint(0, 100, size=g.number_of_edges()) > 30 edge_mask = np.random.randint(0, 100, size=g.number_of_edges()) > 30
selected_nodes = np.nonzero(node_mask)[0] selected_nodes = np.nonzero(node_mask)[0]
selected_edges = np.nonzero(edge_mask)[0] selected_edges = np.nonzero(edge_mask)[0]
for i in range(num_parts): for i in range(num_parts):
part_g, node_feats, edge_feats, meta = load_partition('/tmp/dist_graph_test.json', i) part_g, node_feats, edge_feats, meta = load_partition('/tmp/dist_graph/dist_graph_test.json', i)
num_nodes, num_edges, node_map, edge_map, num_partitions = meta num_nodes, num_edges, node_map, edge_map, num_partitions = meta
gpb = GraphPartitionBook(part_id=i, gpb = GraphPartitionBook(part_id=i,
num_parts=num_partitions, num_parts=num_partitions,
...@@ -195,5 +200,6 @@ def prepare_dist(): ...@@ -195,5 +200,6 @@ def prepare_dist():
ip_config.close() ip_config.close()
if __name__ == '__main__': if __name__ == '__main__':
os.mkdir('/tmp/dist_graph')
test_split() test_split()
test_server_client() #test_server_client()
import dgl import dgl
import sys import sys
import os
import numpy as np import numpy as np
from scipy import sparse as spsp from scipy import sparse as spsp
from numpy.testing import assert_array_equal from numpy.testing import assert_array_equal
...@@ -22,10 +23,10 @@ def test_graph_partition_book(): ...@@ -22,10 +23,10 @@ def test_graph_partition_book():
num_parts = 4 num_parts = 4
num_hops = 2 num_hops = 2
partition_graph(g, 'gpb_test', num_parts, '/tmp', num_hops=num_hops, part_method='metis') partition_graph(g, 'gpb_test', num_parts, '/tmp/gpb', num_hops=num_hops, part_method='metis')
for i in range(num_parts): for i in range(num_parts):
part_g, node_feats, edge_feats, meta = load_partition('/tmp/gpb_test.json', i) part_g, node_feats, edge_feats, meta = load_partition('/tmp/gpb/gpb_test.json', i)
num_nodes, num_edges, node_map, edge_map, num_partitions = meta num_nodes, num_edges, node_map, edge_map, num_partitions = meta
gpb = GraphPartitionBook(part_id=i, gpb = GraphPartitionBook(part_id=i,
num_parts=num_partitions, num_parts=num_partitions,
...@@ -46,4 +47,5 @@ def test_graph_partition_book(): ...@@ -46,4 +47,5 @@ def test_graph_partition_book():
if __name__ == '__main__': if __name__ == '__main__':
os.mkdir('/tmp/gpb')
test_graph_partition_book() test_graph_partition_book()
...@@ -7,7 +7,7 @@ import dgl ...@@ -7,7 +7,7 @@ import dgl
import backend as F import backend as F
import unittest, pytest import unittest, pytest
from dgl.graph_index import create_graph_index from dgl.graph_index import create_graph_index
import multiprocessing as mp
from numpy.testing import assert_array_equal from numpy.testing import assert_array_equal
if os.name != 'nt': if os.name != 'nt':
...@@ -238,12 +238,14 @@ def test_kv_store(): ...@@ -238,12 +238,14 @@ def test_kv_store():
ip_addr = get_local_usable_addr() ip_addr = get_local_usable_addr()
ip_config.write('%s 1\n' % ip_addr) ip_config.write('%s 1\n' % ip_addr)
ip_config.close() ip_config.close()
pid = os.fork() ctx = mp.get_context('spawn')
if pid == 0: pserver = ctx.Process(target=start_server)
start_server() pclient = ctx.Process(target=start_client)
else: pserver.start()
time.sleep(1) time.sleep(1)
start_client() pclient.start()
pserver.join()
pclient.join()
if __name__ == '__main__': if __name__ == '__main__':
test_partition_policy() test_partition_policy()
......
import dgl import dgl
import sys import sys
import os
import numpy as np import numpy as np
from scipy import sparse as spsp from scipy import sparse as spsp
from numpy.testing import assert_array_equal from numpy.testing import assert_array_equal
...@@ -22,10 +23,9 @@ def test_partition(): ...@@ -22,10 +23,9 @@ def test_partition():
g.ndata['feats'] = F.tensor(np.random.randn(g.number_of_nodes(), 10)) g.ndata['feats'] = F.tensor(np.random.randn(g.number_of_nodes(), 10))
num_parts = 4 num_parts = 4
num_hops = 2 num_hops = 2
partition_graph(g, 'test', num_parts, '/tmp/partition', num_hops=num_hops, part_method='metis')
partition_graph(g, 'test', num_parts, '/tmp', num_hops=num_hops, part_method='metis')
for i in range(num_parts): for i in range(num_parts):
part_g, node_feats, edge_feats, meta = load_partition('/tmp/test.json', i) part_g, node_feats, edge_feats, meta = load_partition('/tmp/partition/test.json', i)
num_nodes, num_edges, node_map, edge_map, num_partitions = meta num_nodes, num_edges, node_map, edge_map, num_partitions = meta
# Check the metadata # Check the metadata
...@@ -55,4 +55,5 @@ def test_partition(): ...@@ -55,4 +55,5 @@ def test_partition():
if __name__ == '__main__': if __name__ == '__main__':
os.mkdir('/tmp/partition')
test_partition() test_partition()
...@@ -5,7 +5,7 @@ import socket ...@@ -5,7 +5,7 @@ import socket
import dgl import dgl
import backend as F import backend as F
import unittest, pytest import unittest, pytest
import multiprocessing as mp
from numpy.testing import assert_array_equal from numpy.testing import assert_array_equal
if os.name != 'nt': if os.name != 'nt':
...@@ -191,12 +191,14 @@ def test_rpc(): ...@@ -191,12 +191,14 @@ def test_rpc():
ip_addr = get_local_usable_addr() ip_addr = get_local_usable_addr()
ip_config.write('%s 1\n' % ip_addr) ip_config.write('%s 1\n' % ip_addr)
ip_config.close() ip_config.close()
pid = os.fork() ctx = mp.get_context('spawn')
if pid == 0: pserver = ctx.Process(target=start_server)
start_server() pclient = ctx.Process(target=start_client)
else: pserver.start()
time.sleep(1) time.sleep(1)
start_client() pclient.start()
pserver.join()
pclient.join()
if __name__ == '__main__': if __name__ == '__main__':
test_serialize() test_serialize()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment