Unverified Commit eff16b61 authored by Rhett Ying's avatar Rhett Ying Committed by GitHub
Browse files

[Dist] add input_dir for parmetis preprocess (#5232)

* [Dist] add input_dir for parmetis preprocess

* add support for parquet

* update parmetis_wrapper accordingly
parent cf678299
...@@ -13,6 +13,7 @@ import torch.distributed as dist ...@@ -13,6 +13,7 @@ import torch.distributed as dist
import constants import constants
from utils import get_idranges, get_node_types, read_json from utils import get_idranges, get_node_types, read_json
import array_readwriter
def get_proc_info(): def get_proc_info():
...@@ -130,7 +131,7 @@ def gen_edge_files(schema_map, output): ...@@ -130,7 +131,7 @@ def gen_edge_files(schema_map, output):
return edge_files return edge_files
def read_node_features(schema_map, tgt_ntype_name, feat_names): def read_node_features(schema_map, tgt_ntype_name, feat_names, input_dir):
"""Helper function to read the node features. """Helper function to read the node features.
Only node features which are requested are read from the input dataset. Only node features which are requested are read from the input dataset.
...@@ -142,6 +143,8 @@ def read_node_features(schema_map, tgt_ntype_name, feat_names): ...@@ -142,6 +143,8 @@ def read_node_features(schema_map, tgt_ntype_name, feat_names):
node-type name, for which node features will be read from the input dataset. node-type name, for which node features will be read from the input dataset.
feat_names : set feat_names : set
A set of strings, feature names, which will be read for a given node type. A set of strings, feature names, which will be read for a given node type.
input_dir : str
The input directory where the dataset is located.
Returns: Returns:
-------- --------
...@@ -161,17 +164,19 @@ def read_node_features(schema_map, tgt_ntype_name, feat_names): ...@@ -161,17 +164,19 @@ def read_node_features(schema_map, tgt_ntype_name, feat_names):
for feat_name, feat_data in ntype_feature_data.items(): for feat_name, feat_data in ntype_feature_data.items():
if feat_name in feat_names: if feat_name in feat_names:
feat_data_fname = feat_data[constants.STR_DATA][rank] feat_data_fname = feat_data[constants.STR_DATA][rank]
if not os.path.isabs(feat_data_fname):
feat_data_fname = os.path.join(input_dir, feat_data_fname)
logging.info(f"Reading: {feat_data_fname}") logging.info(f"Reading: {feat_data_fname}")
if os.path.isabs(feat_data_fname): file_suffix = Path(feat_data_fname).suffix
node_features[feat_name] = np.load(feat_data_fname) reader_fmt_meta = {
else: "name": file_suffix[1:]
node_features[feat_name] = np.load( }
os.path.join(input_dir, feat_data_fname) node_features[feat_name] = array_readwriter.get_array_parser(
) **reader_fmt_meta).read(feat_data_fname)
return node_features return node_features
def gen_node_weights_files(schema_map, output): def gen_node_weights_files(schema_map, input_dir, output):
"""Function to create node weight files for ParMETIS along with the edge files. """Function to create node weight files for ParMETIS along with the edge files.
This function generates node-data files, which will be read by the ParMETIS This function generates node-data files, which will be read by the ParMETIS
...@@ -190,6 +195,8 @@ def gen_node_weights_files(schema_map, output): ...@@ -190,6 +195,8 @@ def gen_node_weights_files(schema_map, output):
----------- -----------
schema_map : json dictionary schema_map : json dictionary
Dictionary created by reading the metadata.json file for the input dataset. Dictionary created by reading the metadata.json file for the input dataset.
input_dir : str
The input directory where the dataset is located.
output : string output : string
Location of storing the node-weights and edge files for ParMETIS. Location of storing the node-weights and edge files for ParMETIS.
...@@ -236,7 +243,8 @@ def gen_node_weights_files(schema_map, output): ...@@ -236,7 +243,8 @@ def gen_node_weights_files(schema_map, output):
# Add train/test/validation masks if present. node-degree will be added when this file # Add train/test/validation masks if present. node-degree will be added when this file
# is read by ParMETIS to mimic the exisiting single process pipeline present in dgl. # is read by ParMETIS to mimic the exisiting single process pipeline present in dgl.
node_feats = read_node_features( node_feats = read_node_features(
schema_map, ntype_name, set(["train_mask", "val_mask", "test_mask"]) schema_map, ntype_name, set(["train_mask", "val_mask", "test_mask"]),
input_dir
) )
for k, v in node_feats.items(): for k, v in node_feats.items():
assert sz == v.shape assert sz == v.shape
...@@ -388,7 +396,7 @@ def run_preprocess_data(params): ...@@ -388,7 +396,7 @@ def run_preprocess_data(params):
schema_map = read_json(params.schema_file) schema_map = read_json(params.schema_file)
num_nodes_per_chunk = schema_map[constants.STR_NUM_NODES_PER_CHUNK] num_nodes_per_chunk = schema_map[constants.STR_NUM_NODES_PER_CHUNK]
num_parts = len(num_nodes_per_chunk[0]) num_parts = len(num_nodes_per_chunk[0])
gen_node_weights_files(schema_map, params.output_dir) gen_node_weights_files(schema_map, params.input_dir, params.output_dir)
logging.info(f"Done with node weights....") logging.info(f"Done with node weights....")
gen_edge_files(schema_map, params.output_dir) gen_edge_files(schema_map, params.output_dir)
...@@ -416,6 +424,11 @@ if __name__ == "__main__": ...@@ -416,6 +424,11 @@ if __name__ == "__main__":
type=str, type=str,
help="The schema of the input graph", help="The schema of the input graph",
) )
parser.add_argument(
"--input_dir",
type=str,
help="The input directory where the dataset is located",
)
parser.add_argument( parser.add_argument(
"--output_dir", "--output_dir",
required=True, required=True,
......
...@@ -52,6 +52,7 @@ def run_parmetis_wrapper(params): ...@@ -52,6 +52,7 @@ def run_parmetis_wrapper(params):
f"mpirun -np {num_partitions} -hostfile {params.hostfile} " f"mpirun -np {num_partitions} -hostfile {params.hostfile} "
f"python3 tools/distpartitioning/parmetis_preprocess.py " f"python3 tools/distpartitioning/parmetis_preprocess.py "
f"--schema_file {params.schema_file} " f"--schema_file {params.schema_file} "
f"--input_dir {params.preproc_input_dir}"
f"--output_dir {params.preproc_output_dir}" f"--output_dir {params.preproc_output_dir}"
) )
logging.info(f"Executing Preprocessing Step: {preproc_cmd}") logging.info(f"Executing Preprocessing Step: {preproc_cmd}")
...@@ -107,6 +108,11 @@ if __name__ == "__main__": ...@@ -107,6 +108,11 @@ if __name__ == "__main__":
type=str, type=str,
help="The schema of the input graph", help="The schema of the input graph",
) )
parser.add_argument(
"--preproc_input_dir",
type=str,
help="The input directory for preprocess where the dataset is located",
)
parser.add_argument( parser.add_argument(
"--preproc_output_dir", "--preproc_output_dir",
required=True, required=True,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment