Unverified Commit eae6ce2a authored by kylasa's avatar kylasa Committed by GitHub
Browse files

ParMETIS wrapper script to enable ParMETIS to process chunked dataset format (#4605)

* Creating ParMETIS wrapper script to run parmetis using one script from user perspective

* Addressed all the CI comments from PR https://github.com/dmlc/dgl/pull/4529

* Addressing CI comments.

* Isort, and black changes.

* Replaced python with python3

* Replaced single quote with double quotes per suggestion.

* Removed print statement

* Addressing CI Commets.

* Addressing CI review comments.

* Addressing CI comments as per chime discussion with Rui

* CI Comments, Black and isort changes

* Align with code refactoring, black, isort and code review comments.

* Addressing CI review comments, and fixing merge issues with the master branch

* Updated with proper unit test skip decorator
parent 1f471396
...@@ -13,6 +13,7 @@ from chunk_graph import chunk_graph ...@@ -13,6 +13,7 @@ from chunk_graph import chunk_graph
from dgl.data.utils import load_graphs, load_tensors from dgl.data.utils import load_graphs, load_tensors
def create_chunked_dataset( def create_chunked_dataset(
root_dir, num_chunks, include_masks=False, include_edge_data=False root_dir, num_chunks, include_masks=False, include_edge_data=False
): ):
......
...@@ -12,10 +12,11 @@ from dgl.data.utils import load_graphs, load_tensors ...@@ -12,10 +12,11 @@ from dgl.data.utils import load_graphs, load_tensors
from create_chunked_dataset import create_chunked_dataset from create_chunked_dataset import create_chunked_dataset
@pytest.mark.parametrize("num_chunks", [1, 8]) @pytest.mark.parametrize("num_chunks", [1, 8])
def test_chunk_graph(num_chunks): def test_chunk_graph(num_chunks):
with tempfile.TemporaryDirectory() as root_dir: with tempfile.TemporaryDirectory() as root_dir:
g = create_chunked_dataset(root_dir, num_chunks, include_edge_data=True) g = create_chunked_dataset(root_dir, num_chunks, include_edge_data=True)
num_cite_edges = g.number_of_edges('cites') num_cite_edges = g.number_of_edges('cites')
...@@ -61,7 +62,6 @@ def test_chunk_graph(num_chunks): ...@@ -61,7 +62,6 @@ def test_chunk_graph(num_chunks):
assert feat_array.shape[0] == num_papers // num_chunks assert feat_array.shape[0] == num_papers // num_chunks
# check edge_data # check edge_data
edge_data_gold = {}
num_edges = { num_edges = {
'paper:cites:paper': num_cite_edges, 'paper:cites:paper': num_cite_edges,
'author:writes:paper': num_write_edges, 'author:writes:paper': num_write_edges,
...@@ -74,15 +74,12 @@ def test_chunk_graph(num_chunks): ...@@ -74,15 +74,12 @@ def test_chunk_graph(num_chunks):
['paper:rev_writes:author', 'year'], ['paper:rev_writes:author', 'year'],
]: ]:
output_edge_sub_dir = os.path.join(output_edge_data_dir, etype) output_edge_sub_dir = os.path.join(output_edge_data_dir, etype)
features = []
for i in range(num_chunks): for i in range(num_chunks):
chunk_f_name = '{}-{}.npy'.format(feat, i) chunk_f_name = '{}-{}.npy'.format(feat, i)
chunk_f_name = os.path.join(output_edge_sub_dir, chunk_f_name) chunk_f_name = os.path.join(output_edge_sub_dir, chunk_f_name)
assert os.path.isfile(chunk_f_name) assert os.path.isfile(chunk_f_name)
feat_array = np.load(chunk_f_name) feat_array = np.load(chunk_f_name)
assert feat_array.shape[0] == num_edges[etype] // num_chunks assert feat_array.shape[0] == num_edges[etype] // num_chunks
features.append(feat_array)
edge_data_gold[etype + '/' + feat] = np.concatenate(features)
@pytest.mark.parametrize("num_chunks", [1, 2, 3, 4, 8]) @pytest.mark.parametrize("num_chunks", [1, 2, 3, 4, 8])
......
import argparse
import json
import os
import sys
import tempfile
import unittest
import dgl
import numpy as np
import torch
from chunk_graph import chunk_graph
from dgl.data.utils import load_graphs, load_tensors
from create_chunked_dataset import create_chunked_dataset
"""
TODO: skipping this test case since the dependency, mpirun, is
not yet configured in the CI framework.
"""
@unittest.skipIf(True, reason="mpi is not available in CI test framework.")
def test_parmetis_preprocessing():
with tempfile.TemporaryDirectory() as root_dir:
num_chunks = 2
g = create_chunked_dataset(root_dir, num_chunks, include_masks=True)
# Trigger ParMETIS pre-processing here.
schema_path = os.path.join(root_dir, 'chunked-data/metadata.json')
results_dir = os.path.join(root_dir, 'parmetis-data')
os.system(
f'mpirun -np 2 python3 tools/distpartitioning/parmetis_preprocess.py '
f'--schema {schema_path} --output {results_dir}'
)
# Now add all the tests and check whether the test has passed or failed.
# Read parmetis_nfiles and ensure all files are present.
parmetis_data_dir = os.path.join(root_dir, 'parmetis-data')
assert os.path.isdir(parmetis_data_dir)
parmetis_nodes_file = os.path.join(
parmetis_data_dir, 'parmetis_nfiles.txt'
)
assert os.path.isfile(parmetis_nodes_file)
# `parmetis_nfiles.txt` should have each line in the following format.
# <filename> <global_id_start> <global_id_end>
with open(parmetis_nodes_file, 'r') as nodes_metafile:
lines = nodes_metafile.readlines()
total_node_count = 0
for line in lines:
tokens = line.split(" ")
assert len(tokens) == 3
assert os.path.isfile(tokens[0])
assert int(tokens[1]) == total_node_count
# check contents of each of the nodes files here
with open(tokens[0], 'r') as nodes_file:
node_lines = nodes_file.readlines()
for line in node_lines:
val = line.split(" ")
# <ntype_id> <weight_list> <mask_list> <type_node_id>
assert len(val) == 8
node_count = len(node_lines)
total_node_count += node_count
assert int(tokens[2]) == total_node_count
# Meta_data object.
output_dir = os.path.join(root_dir, 'chunked-data')
json_file = os.path.join(output_dir, 'metadata.json')
assert os.path.isfile(json_file)
with open(json_file, 'rb') as f:
meta_data = json.load(f)
# Count the total no. of nodes.
true_node_count = 0
num_nodes_per_chunk = meta_data['num_nodes_per_chunk']
for i in range(len(num_nodes_per_chunk)):
node_per_part = num_nodes_per_chunk[i]
for j in range(len(node_per_part)):
true_node_count += node_per_part[j]
assert total_node_count == true_node_count
# Read parmetis_efiles and ensure all files are present.
# This file contains a list of filenames.
parmetis_edges_file = os.path.join(
parmetis_data_dir, 'parmetis_efiles.txt'
)
assert os.path.isfile(parmetis_edges_file)
with open(parmetis_edges_file, 'r') as edges_metafile:
lines = edges_metafile.readlines()
total_edge_count = 0
for line in lines:
edges_filename = line.strip()
assert os.path.isfile(edges_filename)
with open(edges_filename, 'r') as edges_file:
edge_lines = edges_file.readlines()
total_edge_count += len(edge_lines)
for line in edge_lines:
val = line.split(" ")
assert len(val) == 2
# Count the total no. of edges
true_edge_count = 0
num_edges_per_chunk = meta_data['num_edges_per_chunk']
for i in range(len(num_edges_per_chunk)):
edges_per_part = num_edges_per_chunk[i]
for j in range(len(edges_per_part)):
true_edge_count += edges_per_part[j]
assert true_edge_count == total_edge_count
def test_parmetis_postprocessing():
with tempfile.TemporaryDirectory() as root_dir:
num_chunks = 2
g = create_chunked_dataset(root_dir, num_chunks, include_masks=True)
num_nodes = g.number_of_nodes()
num_institutions = g.number_of_nodes('institution')
num_authors = g.number_of_nodes('author')
num_papers = g.number_of_nodes('paper')
# Generate random parmetis partition ids for the nodes in the graph.
# Replace this code with actual ParMETIS executable when it is ready
output_dir = os.path.join(root_dir, 'chunked-data')
parmetis_file = os.path.join(output_dir, 'parmetis_output.txt')
node_ids = np.arange(num_nodes)
partition_ids = np.random.randint(0, 2, (num_nodes,))
parmetis_output = np.column_stack([node_ids, partition_ids])
# Create parmetis output, this is mimicking running actual parmetis.
with open(parmetis_file, 'w') as f:
np.savetxt(f, parmetis_output)
# Check the post processing script here.
results_dir = os.path.join(output_dir, 'partitions_dir')
json_file = os.path.join(output_dir, 'metadata.json')
print(json_file)
print(results_dir)
print(parmetis_file)
os.system(
f'python3 tools/distpartitioning/parmetis_postprocess.py '
f'--schema_file {json_file} '
f'--parmetis_output_file {parmetis_file} '
f'--partitions_dir {results_dir}'
)
ntype_count = {
'author': num_authors,
'paper': num_papers,
'institution': num_institutions,
}
for ntype_name in ['author', 'paper', 'institution']:
fname = os.path.join(results_dir, f'{ntype_name}.txt')
print(fname)
assert os.path.isfile(fname)
# Load and check the partition ids in this file.
part_ids = np.loadtxt(fname)
assert part_ids.shape[0] == ntype_count[ntype_name]
assert np.min(part_ids) == 0
assert np.max(part_ids) == 1
"""
TODO: skipping this test case since it depends on the dependency, mpi,
which is not yet configured in the CI framework.
"""
@unittest.skipIf(True, reason="mpi is not available in CI test framework.")
def test_parmetis_wrapper():
with tempfile.TemporaryDirectory() as root_dir:
num_chunks = 2
graph_name = "mag240m"
g = create_chunked_dataset(root_dir, num_chunks, include_masks=True)
all_ntypes = g.ntypes
all_etypes = g.etypes
num_constraints = len(all_ntypes) + 3
num_institutions = g.number_of_nodes('institution')
num_authors = g.number_of_nodes('author')
num_papers = g.number_of_nodes('paper')
# Trigger ParMETIS.
schema_file = os.path.join(root_dir, 'chunked-data/metadata.json')
preproc_output_dir = os.path.join(
root_dir, 'chunked-data/preproc_output_dir'
)
parmetis_output_file = os.path.join(
os.getcwd(), f'{graph_name}_part.{num_chunks}'
)
partitions_dir = os.path.join(root_dir, 'chunked-data/partitions_dir')
hostfile = os.path.join(root_dir, 'ip_config.txt')
with open(hostfile, 'w') as f:
f.write('127.0.0.1\n')
f.write('127.0.0.1\n')
num_nodes = g.number_of_nodes()
num_edges = g.number_of_edges()
stats_file = f'{graph_name}_stats.txt'
with open(stats_file, 'w') as f:
f.write(f'{num_nodes} {num_edges} {num_constraints}')
parmetis_cmd = (
f'python3 tools/distpartitioning/parmetis_wrapper.py '
f'--schema_file {schema_file} '
f'--preproc_output_dir {preproc_output_dir} '
f'--hostfile {hostfile} '
f'--parmetis_output_file {parmetis_output_file} '
f'--partitions_dir {partitions_dir} '
)
print(f'Executing the following cmd: {parmetis_cmd}')
print(parmetis_cmd)
os.system(parmetis_cmd)
ntype_count = {
'author': num_authors,
'paper': num_papers,
'institution': num_institutions,
}
for ntype_name in ['author', 'paper', 'institution']:
fname = os.path.join(partitions_dir, f'{ntype_name}.txt')
print(fname)
assert os.path.isfile(fname)
# Load and check the partition ids in this file.
part_ids = np.loadtxt(fname)
assert part_ids.shape[0] == ntype_count[ntype_name]
assert np.min(part_ids) == 0
assert np.max(part_ids) == (num_chunks - 1)
...@@ -33,5 +33,6 @@ STR_NUMPY = "numpy" ...@@ -33,5 +33,6 @@ STR_NUMPY = "numpy"
STR_CSV = "csv" STR_CSV = "csv"
STR_NAME = "name" STR_NAME = "name"
STR_GRAPH_NAME = "graph_name"
STR_NODE_FEATURES = "node_features" STR_NODE_FEATURES = "node_features"
STR_EDGE_FEATURES = "edge_features" STR_EDGE_FEATURES = "edge_features"
\ No newline at end of file
import argparse
import logging
import os
import platform
import sys
from pathlib import Path
import numpy as np
import pyarrow
import pyarrow.csv as csv
import constants
from utils import get_idranges, get_node_types, read_json
def post_process(params):
"""Auxiliary function to read the parmetis output file and generate
metis partition-id files, sorted, per node-type. These files are used
by the dist. graph partitioning pipeline for further processing.
Parameters:
-----------
params : argparser object
argparser object to capture command line options passed to the
executable
"""
logging.info("Starting to process parmetis output.")
logging.info(params.schema_file)
logging.info(params.parmetis_output_file)
assert os.path.isfile(params.schema_file)
assert os.path.isfile(params.parmetis_output_file)
schema = read_json(params.schema_file)
metis_df = csv.read_csv(
params.parmetis_output_file,
read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
parse_options=pyarrow.csv.ParseOptions(delimiter=" "),
)
global_nids = metis_df["f0"].to_numpy()
partition_ids = metis_df["f1"].to_numpy()
sort_idx = np.argsort(global_nids)
global_nids = global_nids[sort_idx]
partition_ids = partition_ids[sort_idx]
ntypes_ntypeid_map, ntypes, ntid_ntype_map = get_node_types(schema)
type_nid_dict, ntype_gnid_offset = get_idranges(
schema[constants.STR_NODE_TYPE],
schema[constants.STR_NUM_NODES_PER_CHUNK],
)
outdir = Path(params.partitions_dir)
os.makedirs(outdir, exist_ok=True)
for ntype_id, ntype_name in ntid_ntype_map.items():
start = ntype_gnid_offset[ntype_name][0, 0]
end = ntype_gnid_offset[ntype_name][0, 1]
out_data = partition_ids[start:end]
out_file = os.path.join(outdir, f"{ntype_name}.txt")
options = csv.WriteOptions(include_header=False, delimiter=" ")
csv.write_csv(
pyarrow.Table.from_arrays([out_data], names=["partition-ids"]),
out_file,
options,
)
logging.info(f"Generated {out_file}")
logging.info("Done processing parmetis output")
if __name__ == "__main__":
"""Main function to convert the output of parmetis into metis partitions
which are accepted by graph partitioning pipeline.
ParMETIS currently generates one output file, which is in the following format:
<global-node-id> <partition-id>
Graph partitioing pipeline, per the new dataset file format rules expects the
metis partitions to be in the following format:
No. of files will be equal to the no. of node-types in the graph
Each file will have one-number/line which is <partition-id>.
Example usage:
--------------
python parmetis_postprocess.py
--input_file <metis-partitions-file>
--output-dir <directory where the output files are stored>
--schema <schema-file-path>
"""
parser = argparse.ArgumentParser(
description="PostProcessing the ParMETIS\
output for partitioning pipeline"
)
parser.add_argument(
"--schema_file",
required=True,
type=str,
help="The schema of the input graph",
)
parser.add_argument(
"--parmetis_output_file",
required=True,
type=str,
help="ParMETIS output file",
)
parser.add_argument(
"--partitions_dir",
required=True,
type=str,
help="The output\
will be files (with metis partition ids) and each file corresponds to\
a node-type in the input graph dataset.",
)
params = parser.parse_args()
# Configure logging.
logging.basicConfig(
level="INFO",
format=f"[{platform.node()} \
%(levelname)s %(asctime)s PID:%(process)d] %(message)s",
)
# Invoke the function for post processing
post_process(params)
import argparse
import logging
import os
import sys
from pathlib import Path
import numpy as np
import pyarrow
import pyarrow.csv as csv
import torch
import torch.distributed as dist
import constants
from utils import get_idranges, get_node_types, read_json
def get_proc_info():
"""Helper function to get the rank from the
environment when `mpirun` is used to run this python program.
Please note that for mpi(openmpi) installation the rank is retrieved from the
environment using OMPI_COMM_WORLD_RANK and for mpi(standard installation) it is
retrieved from the environment using MPI_LOCALRANKID.
For retrieving world_size please use OMPI_COMM_WORLD_SIZE or
MPI_WORLDSIZE appropriately as described above to retrieve total no. of
processes, when needed.
Returns:
--------
integer :
Rank of the current process.
"""
env_variables = dict(os.environ)
if "OMPI_COMM_WORLD_RANK" in env_variables:
local_rank = int(os.environ.get("OMPI_COMM_WORLD_RANK") or 0)
elif "MPI_LOCALRANKID" in env_variables:
local_rank = int(os.environ.get("MPI_LOCALRANKID") or 0)
return local_rank
def gen_edge_files(schema_map, output):
"""Function to create edges files to be consumed by ParMETIS
for partitioning purposes.
This function creates the edge files and each of these will have the
following format (meaning each line of these file is of the following format)
<global_src_id> <global_dst_id>
Here ``global`` prefix means that globally unique identifier assigned each node
in the input graph. In this context globally unique means unique across all the
nodes in the input graph.
Parameters:
-----------
schema_map : json dictionary
Dictionary created by reading the metadata.json file for the input dataset.
output : string
Location of storing the node-weights and edge files for ParMETIS.
"""
rank = get_proc_info()
type_nid_dict, ntype_gnid_offset = get_idranges(
schema_map[constants.STR_NODE_TYPE],
schema_map[constants.STR_NUM_NODES_PER_CHUNK],
)
# Regenerate edge files here.
edge_data = schema_map[constants.STR_EDGES]
etype_names = schema_map[constants.STR_EDGE_TYPE]
etype_name_idmap = {e: idx for idx, e in enumerate(etype_names)}
edge_tids, _ = get_idranges(
schema_map[constants.STR_EDGE_TYPE],
schema_map[constants.STR_NUM_EDGES_PER_CHUNK],
)
outdir = Path(output)
os.makedirs(outdir, exist_ok=True)
edge_files = []
num_parts = len(schema_map[constants.STR_NUM_EDGES_PER_CHUNK][0])
for etype_name, etype_info in edge_data.items():
edge_info = etype_info[constants.STR_DATA]
# ``edgetype`` strings are in canonical format, src_node_type:edge_type:dst_node_type
tokens = etype_name.split(":")
assert len(tokens) == 3
src_ntype_name = tokens[0]
rel_name = tokens[1]
dst_ntype_name = tokens[2]
data_df = csv.read_csv(
edge_info[rank],
read_options=pyarrow.csv.ReadOptions(
autogenerate_column_names=True
),
parse_options=pyarrow.csv.ParseOptions(delimiter=" "),
)
data_f0 = data_df["f0"].to_numpy()
data_f1 = data_df["f1"].to_numpy()
global_src_id = data_f0 + ntype_gnid_offset[src_ntype_name][0, 0]
global_dst_id = data_f1 + ntype_gnid_offset[dst_ntype_name][0, 0]
cols = [global_src_id, global_dst_id]
col_names = ["global_src_id", "global_dst_id"]
out_file = edge_info[rank].split("/")[-1]
out_file = os.path.join(outdir, "edges_{}".format(out_file))
options = csv.WriteOptions(include_header=False, delimiter=" ")
options.delimiter = " "
csv.write_csv(
pyarrow.Table.from_arrays(cols, names=col_names), out_file, options
)
edge_files.append(out_file)
return edge_files
def read_node_features(schema_map, tgt_ntype_name, feat_names):
"""Helper function to read the node features.
Only node features which are requested are read from the input dataset.
Parameters:
-----------
schema_map : json dictionary
Dictionary created by reading the metadata.json file for the input dataset.
tgt_ntype_name : string
node-type name, for which node features will be read from the input dataset.
feat_names : set
A set of strings, feature names, which will be read for a given node type.
Returns:
--------
dictionary :
A dictionary where key is the feature-name and value is the numpy array.
"""
rank = get_proc_info()
node_features = {}
if constants.STR_NODE_DATA in schema_map:
dataset_features = schema_map[constants.STR_NODE_DATA]
if dataset_features and (len(dataset_features) > 0):
for ntype_name, ntype_feature_data in dataset_features.items():
if ntype_name != tgt_ntype_name:
continue
# ntype_feature_data is a dictionary
# where key: feature_name, value: dictionary in which keys are "format", "data".
for feat_name, feat_data in ntype_feature_data.items():
if feat_name in feat_names:
feat_data_fname = feat_data[constants.STR_DATA][rank]
logging.info(f"Reading: {feat_data_fname}")
if os.path.isabs(feat_data_fname):
node_features[feat_name] = np.load(feat_data_fname)
else:
node_features[feat_name] = np.load(
os.path.join(input_dir, feat_data_fname)
)
return node_features
def gen_node_weights_files(schema_map, output):
"""Function to create node weight files for ParMETIS along with the edge files.
This function generates node-data files, which will be read by the ParMETIS
executable for partitioning purposes. Each line in these files will be of the
following format:
<node_type_id> <node_weight_list> <type_wise_node_id>
node_type_id - is id assigned to the node-type to which a given particular
node belongs to
weight_list - this is a one-hot vector in which the number in the location of
the current nodes' node-type will be set to `1` and other will be `0`
type_node_id - this is the id assigned to the node (in the context of the current
nodes` node-type). Meaning this id is unique across all the nodes which belong to
the current nodes` node-type.
Parameters:
-----------
schema_map : json dictionary
Dictionary created by reading the metadata.json file for the input dataset.
output : string
Location of storing the node-weights and edge files for ParMETIS.
Returns:
--------
list :
List of filenames for nodes of the input graph.
list :
List o ffilenames for edges of the input graph.
"""
rank = get_proc_info()
ntypes_ntypeid_map, ntypes, ntid_ntype_map = get_node_types(schema_map)
type_nid_dict, ntype_gnid_offset = get_idranges(
schema_map[constants.STR_NODE_TYPE],
schema_map[constants.STR_NUM_NODES_PER_CHUNK],
)
node_files = []
outdir = Path(output)
os.makedirs(outdir, exist_ok=True)
for ntype_id, ntype_name in ntid_ntype_map.items():
type_start, type_end = (
type_nid_dict[ntype_name][rank][0],
type_nid_dict[ntype_name][rank][1],
)
count = type_end - type_start
sz = (count,)
cols = []
col_names = []
cols.append(
pyarrow.array(np.ones(sz, dtype=np.int64) * np.int64(ntype_id))
)
col_names.append("ntype")
for i in range(len(ntypes)):
if i == ntype_id:
cols.append(pyarrow.array(np.ones(sz, dtype=np.int64)))
else:
cols.append(pyarrow.array(np.zeros(sz, dtype=np.int64)))
col_names.append("w{}".format(i))
# Add train/test/validation masks if present. node-degree will be added when this file
# is read by ParMETIS to mimic the exisiting single process pipeline present in dgl.
node_feats = read_node_features(
schema_map, ntype_name, set(["train_mask", "val_mask", "test_mask"])
)
for k, v in node_feats.items():
assert sz == v.shape
cols.append(pyarrow.array(v))
col_names.append(k)
# `type_nid` should be the very last column in the node weights files.
cols.append(
pyarrow.array(
np.arange(count, dtype=np.int64) + np.int64(type_start)
)
)
col_names.append("type_nid")
out_file = os.path.join(
outdir, "node_weights_{}_{}.txt".format(ntype_name, rank)
)
options = csv.WriteOptions(include_header=False, delimiter=" ")
options.delimiter = " "
csv.write_csv(
pyarrow.Table.from_arrays(cols, names=col_names), out_file, options
)
node_files.append(
(
ntype_gnid_offset[ntype_name][0, 0] + type_start,
ntype_gnid_offset[ntype_name][0, 0] + type_end,
out_file,
)
)
return node_files
def gen_parmetis_input_args(params, schema_map):
"""Function to create two input arguments which will be passed to the parmetis.
first argument is a text file which has a list of node-weights files,
namely parmetis-nfiles.txt, and second argument is a text file which has a
list of edge files, namely parmetis_efiles.txt.
ParMETIS uses these two files to read/load the graph and partition the graph
With regards to the file format, parmetis_nfiles.txt uses the following format
for each line in that file:
<filename> <global_node_id_start> <global_node_id_end>(exclusive)
While parmetis_efiles.txt just has <filename> in each line.
Parameters:
-----------
params : argparser instance
Instance of ArgParser class, which has all the input arguments passed to
run this program.
schema_map : json dictionary
Dictionary object created after reading the graph metadata.json file.
"""
num_nodes_per_chunk = schema_map[constants.STR_NUM_NODES_PER_CHUNK]
num_parts = len(num_nodes_per_chunk[0])
ntypes_ntypeid_map, ntypes, ntid_ntype_map = get_node_types(schema_map)
type_nid_dict, ntype_gnid_offset = get_idranges(
schema_map[constants.STR_NODE_TYPE],
schema_map[constants.STR_NUM_NODES_PER_CHUNK],
)
node_files = []
outdir = Path(params.output_dir)
os.makedirs(outdir, exist_ok=True)
for ntype_id, ntype_name in ntid_ntype_map.items():
global_nid_offset = ntype_gnid_offset[ntype_name][0, 0]
for r in range(num_parts):
type_start, type_end = (
type_nid_dict[ntype_name][r][0],
type_nid_dict[ntype_name][r][1],
)
out_file = os.path.join(
outdir, "node_weights_{}_{}.txt".format(ntype_name, r)
)
node_files.append(
(
out_file,
global_nid_offset + type_start,
global_nid_offset + type_end,
)
)
nfile = open(os.path.join(params.output_dir, "parmetis_nfiles.txt"), "w")
for f in node_files:
# format: filename global_node_id_start global_node_id_end(exclusive)
nfile.write("{} {} {}\n".format(f[0], f[1], f[2]))
nfile.close()
# Regenerate edge files here.
edge_data = schema_map[constants.STR_EDGES]
edge_files = []
for etype_name, etype_info in edge_data.items():
edge_info = etype_info[constants.STR_DATA]
for r in range(num_parts):
out_file = edge_info[r].split("/")[-1]
out_file = os.path.join(outdir, "edges_{}".format(out_file))
edge_files.append(out_file)
with open(
os.path.join(params.output_dir, "parmetis_efiles.txt"), "w"
) as efile:
for f in edge_files:
efile.write("{}\n".format(f))
def run_preprocess_data(params):
"""Main function which will help create graph files for ParMETIS processing
Parameters:
-----------
params : argparser object
An instance of argparser class which stores command line arguments.
"""
logging.info(f"Starting to generate ParMETIS files...")
rank = get_proc_info()
schema_map = read_json(params.schema_file)
num_nodes_per_chunk = schema_map[constants.STR_NUM_NODES_PER_CHUNK]
num_parts = len(num_nodes_per_chunk[0])
gen_node_weights_files(schema_map, params.output_dir)
logging.info(f"Done with node weights....")
gen_edge_files(schema_map, params.output_dir)
logging.info(f"Done with edge weights...")
if rank == 0:
gen_parmetis_input_args(params, schema_map)
logging.info(f"Done generating files for ParMETIS run ..")
if __name__ == "__main__":
"""Main function used to generate temporary files needed for ParMETIS execution.
This function generates node-weight files and edges files which are consumed by ParMETIS.
Example usage:
--------------
mpirun -np 4 python3 parmetis_preprocess.py --schema <file> --output <target-output-dir>
"""
parser = argparse.ArgumentParser(
description="Generate ParMETIS files for input dataset"
)
parser.add_argument(
"--schema_file",
required=True,
type=str,
help="The schema of the input graph",
)
parser.add_argument(
"--output_dir",
required=True,
type=str,
help="The output directory for the node weights files and auxiliary files for ParMETIS.",
)
params = parser.parse_args()
# Invoke the function to generate files for parmetis
run_preprocess_data(params)
import argparse
import logging
import os
import platform
import sys
from pathlib import Path
import constants
from utils import read_json
def check_dependencies():
"""Check if all the dependencies needed for the execution of this file
are installed.
"""
exec_path = os.get_exec_path()
mpi_install = False
for x in exec_path:
if os.path.isfile(os.path.join(x, "mpirun")):
mpi_install = True
break
assert (
mpi_install
), "Could not locate the following dependency: MPI. Please install it and try again."
def run_parmetis_wrapper(params):
"""Function to execute all the steps needed to run ParMETIS
Parameters:
-----------
params : argparser object
an instance of argparser class to capture command-line arguments
"""
assert os.path.isfile(params.schema_file)
assert os.path.isfile(params.hostfile)
schema = read_json(params.schema_file)
graph_name = schema[constants.STR_GRAPH_NAME]
num_partitions = len(schema[constants.STR_NUM_NODES_PER_CHUNK][0])
# Check if parmetis_preprocess.py exists.
assert os.path.isfile(
os.path.join(
os.path.dirname(os.path.abspath(__file__)), "parmetis_preprocess.py"
)
), "Please check DGL Installation, parmetis_preprocess.py file does not exist."
# Trigger pre-processing step to generate input files for ParMETIS.
preproc_cmd = (
f"mpirun -np {num_partitions} -hostfile {params.hostfile} "
f"python3 tools/distpartitioning/parmetis_preprocess.py "
f"--schema_file {params.schema_file} "
f"--output_dir {params.preproc_output_dir}"
)
logging.info(f"Executing Preprocessing Step: {preproc_cmd}")
os.system(preproc_cmd)
logging.info(f"Done Preprocessing Step")
# Trigger ParMETIS for creating metis partitions for the input graph.
parmetis_install_path = "pm_dglpart3"
if params.parmetis_install_path is not None:
parmetis_install_path = os.path.join(
params.parmetis_install_path, parmetis_install_path
)
parmetis_nfiles = os.path.join(
params.preproc_output_dir, "parmetis_nfiles.txt"
)
parmetis_efiles = os.path.join(
params.preproc_output_dir, "parmetis_efiles.txt"
)
parmetis_cmd = (
f"mpirun -np {num_partitions} -hostfile {params.hostfile} "
f"{parmetis_install_path} {graph_name} {num_partitions} "
f"{parmetis_nfiles} {parmetis_efiles}"
)
logging.info(f"Executing ParMETIS: {parmetis_cmd}")
os.system(parmetis_cmd)
logging.info(f"Done ParMETIS execution step")
# Trigger post-processing step to convert parmetis output to the form
# acceptable by dist. graph partitioning pipeline.
parmetis_output_file = os.path.join(
os.getcwd(), f"{graph_name}_part.{num_partitions}"
)
postproc_cmd = (
f"python3 tools/distpartitioning/parmetis_postprocess.py "
f"--schema_file {params.schema_file} "
f"--parmetis_output_file {parmetis_output_file} "
f"--partitions_dir {params.partitions_dir}"
)
logging.info(f"Executing PostProcessing: {postproc_cmd}")
os.system(postproc_cmd)
logging.info("Done Executing ParMETIS...")
if __name__ == "__main__":
"""Main function to invoke the parmetis wrapper function"""
parser = argparse.ArgumentParser(
description="Run ParMETIS as part of the graph partitioning pipeline"
)
# Preprocessing step.
parser.add_argument(
"--schema_file",
required=True,
type=str,
help="The schema of the input graph",
)
parser.add_argument(
"--preproc_output_dir",
required=True,
type=str,
help="The output directory for the node weights files and auxiliary\
files for ParMETIS.",
)
parser.add_argument(
"--hostfile",
required=True,
type=str,
help="A text file with a list of ip addresses.",
)
# ParMETIS step.
parser.add_argument(
"--parmetis_install_path",
required=False,
type=str,
help="The directory where ParMETIS is installed",
)
# Postprocessing step.
parser.add_argument(
"--parmetis_output_file",
required=True,
type=str,
help="ParMETIS output file (global_node_id to partition_id mappings)",
)
parser.add_argument(
"--partitions_dir",
required=True,
type=str,
help="The directory where the files (with metis partition ids) grouped \
by node_types",
)
params = parser.parse_args()
# Configure logging.
logging.basicConfig(
level="INFO",
format=f"[{platform.node()} \
%(levelname)s %(asctime)s PID:%(process)d] %(message)s",
)
check_dependencies()
run_parmetis_wrapper(params)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment