parmetis_wrapper.py 5.37 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import argparse
import logging
import os
import platform
import sys
from pathlib import Path

import constants
from utils import read_json


def check_dependencies():
    """Check if all the dependencies needed for the execution of this file
    are installed.
    """

    exec_path = os.get_exec_path()
    mpi_install = False
    for x in exec_path:
        if os.path.isfile(os.path.join(x, "mpirun")):
            mpi_install = True
            break
    assert (
        mpi_install
    ), "Could not locate the following dependency: MPI. Please install it and try again."

27
28
29
30
31
    dgl_path = os.environ.get("DGL_HOME", "")
    assert os.path.isdir(
        dgl_path
    ), "Environment variable DGL_HOME not found. Please define the DGL installation path"

32
33
34
35
36
37
38
39
40

def run_parmetis_wrapper(params):
    """Function to execute all the steps needed to run ParMETIS

    Parameters:
    -----------
    params : argparser object
        an instance of argparser class to capture command-line arguments
    """
41
42
43
    schema = read_json(
        os.path.join(params.preproc_input_dir, params.schema_file)
    )
44
    graph_name = schema[constants.STR_GRAPH_NAME]
45
    num_partitions = params.num_parts
46
47
48
49
50
51
52
53
54
55
56

    # Check if parmetis_preprocess.py exists.
    assert os.path.isfile(
        os.path.join(
            os.path.dirname(os.path.abspath(__file__)), "parmetis_preprocess.py"
        )
    ), "Please check DGL Installation, parmetis_preprocess.py file does not exist."

    # Trigger pre-processing step to generate input files for ParMETIS.
    preproc_cmd = (
        f"mpirun -np {num_partitions} -hostfile {params.hostfile} "
57
        f"python3 $DGL_HOME/tools/distpartitioning/parmetis_preprocess.py "
58
        f"--schema_file {params.schema_file} "
59
60
61
        f"--input_dir {params.preproc_input_dir} "
        f"--output_dir {params.preproc_output_dir} "
        f"--num_parts {num_partitions}"
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
    )
    logging.info(f"Executing Preprocessing Step: {preproc_cmd}")
    os.system(preproc_cmd)
    logging.info(f"Done Preprocessing Step")

    # Trigger ParMETIS for creating metis partitions for the input graph.
    parmetis_install_path = "pm_dglpart3"
    if params.parmetis_install_path is not None:
        parmetis_install_path = os.path.join(
            params.parmetis_install_path, parmetis_install_path
        )
    parmetis_nfiles = os.path.join(
        params.preproc_output_dir, "parmetis_nfiles.txt"
    )
    parmetis_efiles = os.path.join(
        params.preproc_output_dir, "parmetis_efiles.txt"
    )
    parmetis_cmd = (
        f"mpirun -np {num_partitions} -hostfile {params.hostfile} "
        f"{parmetis_install_path} {graph_name} {num_partitions} "
        f"{parmetis_nfiles} {parmetis_efiles}"
    )
    logging.info(f"Executing ParMETIS: {parmetis_cmd}")
    os.system(parmetis_cmd)
    logging.info(f"Done ParMETIS execution step")

    # Trigger post-processing step to convert parmetis output to the form
    # acceptable by dist. graph partitioning pipeline.
    parmetis_output_file = os.path.join(
        os.getcwd(), f"{graph_name}_part.{num_partitions}"
    )
    postproc_cmd = (
94
95
        f"python3 $DGL_HOME/tools/distpartitioning/parmetis_postprocess.py "
        f"--postproc_input_dir {params.preproc_input_dir} "
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
        f"--schema_file {params.schema_file} "
        f"--parmetis_output_file {parmetis_output_file} "
        f"--partitions_dir {params.partitions_dir}"
    )
    logging.info(f"Executing PostProcessing: {postproc_cmd}")
    os.system(postproc_cmd)
    logging.info("Done Executing ParMETIS...")


if __name__ == "__main__":
    """Main function to invoke the parmetis wrapper function"""
    parser = argparse.ArgumentParser(
        description="Run ParMETIS as part of the graph partitioning pipeline"
    )
    # Preprocessing step.
    parser.add_argument(
        "--schema_file",
        required=True,
        type=str,
        help="The schema of the input graph",
    )
117
118
119
120
121
    parser.add_argument(
        "--preproc_input_dir",
        type=str,
        help="The input directory for preprocess where the dataset is located",
    )
122
123
124
125
126
127
128
129
130
131
132
133
134
    parser.add_argument(
        "--preproc_output_dir",
        required=True,
        type=str,
        help="The output directory for the node weights files and auxiliary\
              files for ParMETIS.",
    )
    parser.add_argument(
        "--hostfile",
        required=True,
        type=str,
        help="A text file with a list of ip addresses.",
    )
135
136
137
138
139
140
    parser.add_argument(
        "--num_parts",
        required=True,
        type=int,
        help="integer representing no. of partitions.",
    )
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174

    # ParMETIS step.
    parser.add_argument(
        "--parmetis_install_path",
        required=False,
        type=str,
        help="The directory where ParMETIS is installed",
    )

    # Postprocessing step.
    parser.add_argument(
        "--parmetis_output_file",
        required=True,
        type=str,
        help="ParMETIS output file (global_node_id to partition_id mappings)",
    )
    parser.add_argument(
        "--partitions_dir",
        required=True,
        type=str,
        help="The directory where the files (with metis partition ids) grouped \
              by node_types",
    )
    params = parser.parse_args()

    # Configure logging.
    logging.basicConfig(
        level="INFO",
        format=f"[{platform.node()} \
        %(levelname)s %(asctime)s PID:%(process)d] %(message)s",
    )

    check_dependencies()
    run_parmetis_wrapper(params)