"vscode:/vscode.git/clone" did not exist on "571530dd9a1c2323675cc6b93592e20148604c70"
parmetis_wrapper.py 5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import argparse
import logging
import os
import platform
import sys
from pathlib import Path

import constants
from utils import read_json


def check_dependencies():
    """Check if all the dependencies needed for the execution of this file
    are installed.
    """

    exec_path = os.get_exec_path()
    mpi_install = False
    for x in exec_path:
        if os.path.isfile(os.path.join(x, "mpirun")):
            mpi_install = True
            break
    assert (
        mpi_install
    ), "Could not locate the following dependency: MPI. Please install it and try again."


def run_parmetis_wrapper(params):
    """Function to execute all the steps needed to run ParMETIS

    Parameters:
    -----------
    params : argparser object
        an instance of argparser class to capture command-line arguments
    """
    assert os.path.isfile(params.schema_file)
    assert os.path.isfile(params.hostfile)

    schema = read_json(params.schema_file)
    graph_name = schema[constants.STR_GRAPH_NAME]
    num_partitions = len(schema[constants.STR_NUM_NODES_PER_CHUNK][0])

    # Check if parmetis_preprocess.py exists.
    assert os.path.isfile(
        os.path.join(
            os.path.dirname(os.path.abspath(__file__)), "parmetis_preprocess.py"
        )
    ), "Please check DGL Installation, parmetis_preprocess.py file does not exist."

    # Trigger pre-processing step to generate input files for ParMETIS.
    preproc_cmd = (
        f"mpirun -np {num_partitions} -hostfile {params.hostfile} "
        f"python3 tools/distpartitioning/parmetis_preprocess.py "
        f"--schema_file {params.schema_file} "
55
        f"--input_dir {params.preproc_input_dir}"
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
        f"--output_dir {params.preproc_output_dir}"
    )
    logging.info(f"Executing Preprocessing Step: {preproc_cmd}")
    os.system(preproc_cmd)
    logging.info(f"Done Preprocessing Step")

    # Trigger ParMETIS for creating metis partitions for the input graph.
    parmetis_install_path = "pm_dglpart3"
    if params.parmetis_install_path is not None:
        parmetis_install_path = os.path.join(
            params.parmetis_install_path, parmetis_install_path
        )
    parmetis_nfiles = os.path.join(
        params.preproc_output_dir, "parmetis_nfiles.txt"
    )
    parmetis_efiles = os.path.join(
        params.preproc_output_dir, "parmetis_efiles.txt"
    )
    parmetis_cmd = (
        f"mpirun -np {num_partitions} -hostfile {params.hostfile} "
        f"{parmetis_install_path} {graph_name} {num_partitions} "
        f"{parmetis_nfiles} {parmetis_efiles}"
    )
    logging.info(f"Executing ParMETIS: {parmetis_cmd}")
    os.system(parmetis_cmd)
    logging.info(f"Done ParMETIS execution step")

    # Trigger post-processing step to convert parmetis output to the form
    # acceptable by dist. graph partitioning pipeline.
    parmetis_output_file = os.path.join(
        os.getcwd(), f"{graph_name}_part.{num_partitions}"
    )
    postproc_cmd = (
        f"python3 tools/distpartitioning/parmetis_postprocess.py "
        f"--schema_file {params.schema_file} "
        f"--parmetis_output_file {parmetis_output_file} "
        f"--partitions_dir {params.partitions_dir}"
    )
    logging.info(f"Executing PostProcessing: {postproc_cmd}")
    os.system(postproc_cmd)
    logging.info("Done Executing ParMETIS...")


if __name__ == "__main__":
    """Main function to invoke the parmetis wrapper function"""
    parser = argparse.ArgumentParser(
        description="Run ParMETIS as part of the graph partitioning pipeline"
    )
    # Preprocessing step.
    parser.add_argument(
        "--schema_file",
        required=True,
        type=str,
        help="The schema of the input graph",
    )
111
112
113
114
115
    parser.add_argument(
        "--preproc_input_dir",
        type=str,
        help="The input directory for preprocess where the dataset is located",
    )
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
    parser.add_argument(
        "--preproc_output_dir",
        required=True,
        type=str,
        help="The output directory for the node weights files and auxiliary\
              files for ParMETIS.",
    )
    parser.add_argument(
        "--hostfile",
        required=True,
        type=str,
        help="A text file with a list of ip addresses.",
    )

    # ParMETIS step.
    parser.add_argument(
        "--parmetis_install_path",
        required=False,
        type=str,
        help="The directory where ParMETIS is installed",
    )

    # Postprocessing step.
    parser.add_argument(
        "--parmetis_output_file",
        required=True,
        type=str,
        help="ParMETIS output file (global_node_id to partition_id mappings)",
    )
    parser.add_argument(
        "--partitions_dir",
        required=True,
        type=str,
        help="The directory where the files (with metis partition ids) grouped \
              by node_types",
    )
    params = parser.parse_args()

    # Configure logging.
    logging.basicConfig(
        level="INFO",
        format=f"[{platform.node()} \
        %(levelname)s %(asctime)s PID:%(process)d] %(message)s",
    )

    check_dependencies()
    run_parmetis_wrapper(params)