Unverified Commit 7b766393 authored by kylasa's avatar kylasa Committed by GitHub
Browse files

[DistDGL][UserEx]Sync parmetis_wrapper with changes in metadata.json (#5385)

* Sync parmetis_wrapper with changes in metadata.json

1. In the preprocess.py, make sure that num_partitions is defined as input argument. Also, align 'input_dir' with the input dataset. schema_file is assumed to be located inside the input_dir. Also, graph_stats.txt file is assumed to be present in the input_dir.

2. Use DGL_HOME environment variable so that parmetis_wrapper command can be run anywhere.

* Fix CI test failure cases.

* Addressing CI review comments.

* Addressing CI test failures.

* Applying lintrunner patch
parent 894ad1e3
...@@ -26,11 +26,14 @@ def test_parmetis_preprocessing(): ...@@ -26,11 +26,14 @@ def test_parmetis_preprocessing():
g = create_chunked_dataset(root_dir, num_chunks) g = create_chunked_dataset(root_dir, num_chunks)
# Trigger ParMETIS pre-processing here. # Trigger ParMETIS pre-processing here.
schema_path = os.path.join(root_dir, "chunked-data/metadata.json") input_dir = os.path.join(root_dir, "chunked-data")
results_dir = os.path.join(root_dir, "parmetis-data") results_dir = os.path.join(root_dir, "parmetis-data")
os.system( os.system(
f"mpirun -np 2 python3 tools/distpartitioning/parmetis_preprocess.py " f"mpirun -np {num_chunks} python3 tools/distpartitioning/parmetis_preprocess.py "
f"--schema {schema_path} --output {results_dir}" f"--schema {metadata.json} "
f"--input_dir {input_dir} "
f"--output_dir {results_dir} "
f"--num_parts {num_chunks}"
) )
# Now add all the tests and check whether the test has passed or failed. # Now add all the tests and check whether the test has passed or failed.
...@@ -124,6 +127,8 @@ def test_parmetis_postprocessing(): ...@@ -124,6 +127,8 @@ def test_parmetis_postprocessing():
# Generate random parmetis partition ids for the nodes in the graph. # Generate random parmetis partition ids for the nodes in the graph.
# Replace this code with actual ParMETIS executable when it is ready # Replace this code with actual ParMETIS executable when it is ready
output_dir = os.path.join(root_dir, "chunked-data") output_dir = os.path.join(root_dir, "chunked-data")
assert os.path.isdir(output_dir)
parmetis_file = os.path.join(output_dir, "parmetis_output.txt") parmetis_file = os.path.join(output_dir, "parmetis_output.txt")
node_ids = np.arange(num_nodes) node_ids = np.arange(num_nodes)
partition_ids = np.random.randint(0, 2, (num_nodes,)) partition_ids = np.random.randint(0, 2, (num_nodes,))
...@@ -132,6 +137,7 @@ def test_parmetis_postprocessing(): ...@@ -132,6 +137,7 @@ def test_parmetis_postprocessing():
# Create parmetis output, this is mimicking running actual parmetis. # Create parmetis output, this is mimicking running actual parmetis.
with open(parmetis_file, "w") as f: with open(parmetis_file, "w") as f:
np.savetxt(f, parmetis_output) np.savetxt(f, parmetis_output)
assert os.path.isfile(parmetis_file)
# Check the post processing script here. # Check the post processing script here.
results_dir = os.path.join(output_dir, "partitions_dir") results_dir = os.path.join(output_dir, "partitions_dir")
...@@ -141,7 +147,8 @@ def test_parmetis_postprocessing(): ...@@ -141,7 +147,8 @@ def test_parmetis_postprocessing():
print(parmetis_file) print(parmetis_file)
os.system( os.system(
f"python3 tools/distpartitioning/parmetis_postprocess.py " f"python3 tools/distpartitioning/parmetis_postprocess.py "
f"--schema_file {json_file} " f"--postproc_input_dir {output_dir} "
f"--schema_file metadata.json "
f"--parmetis_output_file {parmetis_file} " f"--parmetis_output_file {parmetis_file} "
f"--partitions_dir {results_dir}" f"--partitions_dir {results_dir}"
) )
...@@ -191,6 +198,7 @@ def test_parmetis_wrapper(): ...@@ -191,6 +198,7 @@ def test_parmetis_wrapper():
# Trigger ParMETIS. # Trigger ParMETIS.
schema_file = os.path.join(root_dir, "chunked-data/metadata.json") schema_file = os.path.join(root_dir, "chunked-data/metadata.json")
preproc_input_dir = os.path.join(root_dir, "chunked-data")
preproc_output_dir = os.path.join( preproc_output_dir = os.path.join(
root_dir, "chunked-data/preproc_output_dir" root_dir, "chunked-data/preproc_output_dir"
) )
...@@ -209,17 +217,17 @@ def test_parmetis_wrapper(): ...@@ -209,17 +217,17 @@ def test_parmetis_wrapper():
with open(stats_file, "w") as f: with open(stats_file, "w") as f:
f.write(f"{num_nodes} {num_edges} {num_constraints}") f.write(f"{num_nodes} {num_edges} {num_constraints}")
parmetis_cmd = ( os.system(
f"python3 tools/distpartitioning/parmetis_wrapper.py " f"python3 tools/distpartitioning/parmetis_wrapper.py "
f"--schema_file {schema_file} " f"--schema_file {schema_file} "
f"--preproc_input_dir {preproc_input_dir} "
f"--preproc_output_dir {preproc_output_dir} " f"--preproc_output_dir {preproc_output_dir} "
f"--hostfile {hostfile} " f"--hostfile {hostfile} "
f"--num_parts {num_chunks} "
f"--parmetis_output_file {parmetis_output_file} " f"--parmetis_output_file {parmetis_output_file} "
f"--partitions_dir {partitions_dir} " f"--partitions_dir {partitions_dir} "
) )
print(f"Executing the following cmd: {parmetis_cmd}") print("Executing Done.")
print(parmetis_cmd)
os.system(parmetis_cmd)
ntype_count = { ntype_count = {
"author": num_authors, "author": num_authors,
......
...@@ -27,11 +27,16 @@ def post_process(params): ...@@ -27,11 +27,16 @@ def post_process(params):
""" """
logging.info("Starting to process parmetis output.") logging.info("Starting to process parmetis output.")
logging.info(params.postproc_input_dir)
logging.info(params.schema_file) logging.info(params.schema_file)
logging.info(params.parmetis_output_file) logging.info(params.parmetis_output_file)
assert os.path.isfile(params.schema_file) assert os.path.isfile(
os.path.join(params.postproc_input_dir, params.schema_file)
)
assert os.path.isfile(params.parmetis_output_file) assert os.path.isfile(params.parmetis_output_file)
schema = read_json(params.schema_file) schema = read_json(
os.path.join(params.postproc_input_dir, params.schema_file)
)
metis_df = csv.read_csv( metis_df = csv.read_csv(
params.parmetis_output_file, params.parmetis_output_file,
...@@ -106,6 +111,12 @@ if __name__ == "__main__": ...@@ -106,6 +111,12 @@ if __name__ == "__main__":
description="PostProcessing the ParMETIS\ description="PostProcessing the ParMETIS\
output for partitioning pipeline" output for partitioning pipeline"
) )
parser.add_argument(
"--postproc_input_dir",
required=True,
type=str,
help="Base directory for post processing step.",
)
parser.add_argument( parser.add_argument(
"--schema_file", "--schema_file",
required=True, required=True,
......
...@@ -120,10 +120,13 @@ def gen_edge_files(schema_map, params): ...@@ -120,10 +120,13 @@ def gen_edge_files(schema_map, params):
file_idxes = generate_read_list(len(edge_data_files), params.num_parts) file_idxes = generate_read_list(len(edge_data_files), params.num_parts)
for idx in file_idxes[rank]: for idx in file_idxes[rank]:
reader_fmt_meta = { reader_fmt_meta = {
"name": etype_info[constants.STR_FORMAT][constants.STR_NAME] "name": etype_info[constants.STR_FORMAT][constants.STR_NAME],
"delimiter": etype_info[constants.STR_FORMAT][
constants.STR_FORMAT_DELIMITER
],
} }
data_df = array_readwriter.get_array_parser(**reader_fmt_meta).read( data_df = array_readwriter.get_array_parser(**reader_fmt_meta).read(
edge_data_files[idx] os.path.join(params.input_dir, edge_data_files[idx])
) )
out_file = process_and_write_back(data_df, idx) out_file = process_and_write_back(data_df, idx)
edge_files.append(out_file) edge_files.append(out_file)
...@@ -280,14 +283,18 @@ def gen_parmetis_input_args(params, schema_map): ...@@ -280,14 +283,18 @@ def gen_parmetis_input_args(params, schema_map):
constants.STR_GRAPH_NAME in schema_map constants.STR_GRAPH_NAME in schema_map
), "Graph name is not present in the json file" ), "Graph name is not present in the json file"
graph_name = schema_map[constants.STR_GRAPH_NAME] graph_name = schema_map[constants.STR_GRAPH_NAME]
if not os.path.isfile(f"{graph_name}_stats.txt"): if not os.path.isfile(
os.path.join(params.input_dir, f"{graph_name}_stats.txt")
):
num_nodes = np.sum(schema_map[constants.STR_NUM_NODES_PER_TYPE]) num_nodes = np.sum(schema_map[constants.STR_NUM_NODES_PER_TYPE])
num_edges = np.sum(schema_map[constants.STR_NUM_EDGES_PER_TYPE]) num_edges = np.sum(schema_map[constants.STR_NUM_EDGES_PER_TYPE])
num_ntypes = len(schema_map[constants.STR_NODE_TYPE]) num_ntypes = len(schema_map[constants.STR_NODE_TYPE])
num_constraints = num_ntypes num_constraints = num_ntypes
with open(f"{graph_name}_stats.txt", "w") as sf: with open(
os.path.join(params.input_dir, f"{graph_name}_stats.txt"), "w"
) as sf:
sf.write(f"{num_nodes} {num_edges} {num_constraints}") sf.write(f"{num_nodes} {num_edges} {num_constraints}")
node_files = [] node_files = []
...@@ -353,7 +360,12 @@ def run_preprocess_data(params): ...@@ -353,7 +360,12 @@ def run_preprocess_data(params):
""" """
logging.info(f"Starting to generate ParMETIS files...") logging.info(f"Starting to generate ParMETIS files...")
rank = get_proc_info() rank = get_proc_info()
schema_map = read_json(params.schema_file)
assert os.path.isdir(
params.input_dir
), f"Please check `input_dir` argument."
schema_map = read_json(os.path.join(params.input_dir, params.schema_file))
gen_node_weights_files(schema_map, params) gen_node_weights_files(schema_map, params)
logging.info(f"Done with node weights....") logging.info(f"Done with node weights....")
......
...@@ -24,6 +24,11 @@ def check_dependencies(): ...@@ -24,6 +24,11 @@ def check_dependencies():
mpi_install mpi_install
), "Could not locate the following dependency: MPI. Please install it and try again." ), "Could not locate the following dependency: MPI. Please install it and try again."
dgl_path = os.environ.get("DGL_HOME", "")
assert os.path.isdir(
dgl_path
), "Environment variable DGL_HOME not found. Please define the DGL installation path"
def run_parmetis_wrapper(params): def run_parmetis_wrapper(params):
"""Function to execute all the steps needed to run ParMETIS """Function to execute all the steps needed to run ParMETIS
...@@ -33,12 +38,11 @@ def run_parmetis_wrapper(params): ...@@ -33,12 +38,11 @@ def run_parmetis_wrapper(params):
params : argparser object params : argparser object
an instance of argparser class to capture command-line arguments an instance of argparser class to capture command-line arguments
""" """
assert os.path.isfile(params.schema_file) schema = read_json(
assert os.path.isfile(params.hostfile) os.path.join(params.preproc_input_dir, params.schema_file)
)
schema = read_json(params.schema_file)
graph_name = schema[constants.STR_GRAPH_NAME] graph_name = schema[constants.STR_GRAPH_NAME]
num_partitions = len(schema[constants.STR_NUM_NODES_PER_CHUNK][0]) num_partitions = params.num_parts
# Check if parmetis_preprocess.py exists. # Check if parmetis_preprocess.py exists.
assert os.path.isfile( assert os.path.isfile(
...@@ -50,10 +54,11 @@ def run_parmetis_wrapper(params): ...@@ -50,10 +54,11 @@ def run_parmetis_wrapper(params):
# Trigger pre-processing step to generate input files for ParMETIS. # Trigger pre-processing step to generate input files for ParMETIS.
preproc_cmd = ( preproc_cmd = (
f"mpirun -np {num_partitions} -hostfile {params.hostfile} " f"mpirun -np {num_partitions} -hostfile {params.hostfile} "
f"python3 tools/distpartitioning/parmetis_preprocess.py " f"python3 $DGL_HOME/tools/distpartitioning/parmetis_preprocess.py "
f"--schema_file {params.schema_file} " f"--schema_file {params.schema_file} "
f"--input_dir {params.preproc_input_dir}" f"--input_dir {params.preproc_input_dir} "
f"--output_dir {params.preproc_output_dir}" f"--output_dir {params.preproc_output_dir} "
f"--num_parts {num_partitions}"
) )
logging.info(f"Executing Preprocessing Step: {preproc_cmd}") logging.info(f"Executing Preprocessing Step: {preproc_cmd}")
os.system(preproc_cmd) os.system(preproc_cmd)
...@@ -86,7 +91,8 @@ def run_parmetis_wrapper(params): ...@@ -86,7 +91,8 @@ def run_parmetis_wrapper(params):
os.getcwd(), f"{graph_name}_part.{num_partitions}" os.getcwd(), f"{graph_name}_part.{num_partitions}"
) )
postproc_cmd = ( postproc_cmd = (
f"python3 tools/distpartitioning/parmetis_postprocess.py " f"python3 $DGL_HOME/tools/distpartitioning/parmetis_postprocess.py "
f"--postproc_input_dir {params.preproc_input_dir} "
f"--schema_file {params.schema_file} " f"--schema_file {params.schema_file} "
f"--parmetis_output_file {parmetis_output_file} " f"--parmetis_output_file {parmetis_output_file} "
f"--partitions_dir {params.partitions_dir}" f"--partitions_dir {params.partitions_dir}"
...@@ -126,6 +132,12 @@ if __name__ == "__main__": ...@@ -126,6 +132,12 @@ if __name__ == "__main__":
type=str, type=str,
help="A text file with a list of ip addresses.", help="A text file with a list of ip addresses.",
) )
parser.add_argument(
"--num_parts",
required=True,
type=int,
help="integer representing no. of partitions.",
)
# ParMETIS step. # ParMETIS step.
parser.add_argument( parser.add_argument(
......
...@@ -210,8 +210,8 @@ def get_node_partids(partitions_dir, graph_schema): ...@@ -210,8 +210,8 @@ def get_node_partids(partitions_dir, graph_schema):
) )
node_partids = {} node_partids = {}
for ntype_id, ntype in enumerate(graph_schema[constants.STR_NODE_TYPE]): for ntype_id, ntype in enumerate(graph_schema[constants.STR_NODE_TYPE]):
node_partids[ntype] = read_csv_file( node_partids[ntype] = read_file(
os.path.join(partitions_dir, f"{ntype}.txt"), True os.path.join(partitions_dir, f"{ntype}.txt"), constants.STR_CSV
) )
assert ( assert (
len(node_partids[ntype]) len(node_partids[ntype])
......
...@@ -24,10 +24,8 @@ from dgl.distributed.partition import ( ...@@ -24,10 +24,8 @@ from dgl.distributed.partition import (
from utils import get_idranges, read_json from utils import get_idranges, read_json
from verification_utils import ( from verification_utils import (
get_node_partids, get_node_partids,
read_csv_file, read_file,
read_npy_file,
read_orig_ids, read_orig_ids,
read_pq_file,
verify_graph_feats, verify_graph_feats,
verify_metadata_counts, verify_metadata_counts,
verify_node_partitionids, verify_node_partitionids,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment