"docs/basic_usage/sampling_params.md" did not exist on "9935f97b3e594e246776466d04134decff1b59ae"
dispatch_data.py 6.05 KB
Newer Older
1
2
3
"""Launching distributed graph partitioning pipeline """
import argparse
import json
4
5
6
7
import logging
import os
import sys

8
from partition_algo.base import load_partition_meta
9

10
INSTALL_DIR = os.path.abspath(os.path.join(__file__, ".."))
11
12
13
14
15
16
17
18
19
20
21
22
23
24
LAUNCH_SCRIPT = "distgraphlaunch.py"
PIPELINE_SCRIPT = "distpartitioning/data_proc_pipeline.py"

UDF_WORLD_SIZE = "world-size"
UDF_PART_DIR = "partitions-dir"
UDF_INPUT_DIR = "input-dir"
UDF_GRAPH_NAME = "graph-name"
UDF_SCHEMA = "schema"
UDF_NUM_PARTS = "num-parts"
UDF_OUT_DIR = "output"

LARG_PROCS_MACHINE = "num_proc_per_machine"
LARG_IPCONF = "ip_config"
LARG_MASTER_PORT = "master_port"
25
LARG_SSH_PORT = "ssh_port"
26

27

28
def get_launch_cmd(args) -> str:
29
    cmd = sys.executable + " " + os.path.join(INSTALL_DIR, LAUNCH_SCRIPT)
30
    cmd = f"{cmd} --{LARG_SSH_PORT} {args.ssh_port} "
31
32
33
34
35
36
37
38
    cmd = f"{cmd} --{LARG_PROCS_MACHINE} 1 "
    cmd = f"{cmd} --{LARG_IPCONF} {args.ip_config} "
    cmd = f"{cmd} --{LARG_MASTER_PORT} {args.master_port} "

    return cmd


def submit_jobs(args) -> str:
39
    # read the json file and get the remaining argument here.
40
    schema_path = args.metadata_filename
41
    with open(os.path.join(args.in_dir, schema_path)) as schema:
42
43
44
45
        schema_map = json.load(schema)

    graph_name = schema_map["graph_name"]

46
    # retrieve num_parts
47
    num_parts = 0
48
49
50
51
    partition_path = os.path.join(args.partitions_dir, "partition_meta.json")
    if os.path.isfile(partition_path):
        part_meta = load_partition_meta(partition_path)
        num_parts = part_meta.num_parts
52
53
54
55

    assert (
        num_parts != 0
    ), f"Invalid value for no. of partitions. Please check partition_meta.json file."
56
57

    # verify ip_config
58
    with open(args.ip_config, "r") as f:
59
        num_ips = len(f.readlines())
60
        assert (
61
62
            num_parts % num_ips == 0
        ), f"The num_parts[{args.num_parts}] should be a multiple of number of lines(ip addresses)[{args.ip_config}]."
63

64
    argslist = ""
65
    argslist += "--world-size {} ".format(num_ips)
66
67
68
    argslist += "--partitions-dir {} ".format(
        os.path.abspath(args.partitions_dir)
    )
69
    argslist += "--input-dir {} ".format(os.path.abspath(args.in_dir))
70
71
72
    argslist += "--graph-name {} ".format(graph_name)
    argslist += "--schema {} ".format(schema_path)
    argslist += "--num-parts {} ".format(num_parts)
73
    argslist += "--output {} ".format(os.path.abspath(args.out_dir))
74
    argslist += "--process-group-timeout {} ".format(args.process_group_timeout)
75
    argslist += "--log-level {} ".format(args.log_level)
76
77
    argslist += "--save-orig-nids " if args.save_orig_nids else ""
    argslist += "--save-orig-eids " if args.save_orig_eids else ""
78
79
80
    argslist += (
        f"--graph-formats {args.graph_formats} " if args.graph_formats else ""
    )
81

82
83
84
85
86
    # (BarclayII) Is it safe to assume all the workers have the Python executable at the same path?
    pipeline_cmd = os.path.join(INSTALL_DIR, PIPELINE_SCRIPT)
    udf_cmd = f"{args.python_path} {pipeline_cmd} {argslist}"

    launch_cmd = get_launch_cmd(args)
87
    launch_cmd += '"' + udf_cmd + '"'
88
89
90
91

    print(launch_cmd)
    os.system(launch_cmd)

92

93
def main():
94
95
96
97
98
99
100
101
102
103
    parser = argparse.ArgumentParser(
        description="Dispatch edge index and data to partitions",
        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
    )

    parser.add_argument(
        "--in-dir",
        type=str,
        help="Location of the input directory where the dataset is located",
    )
104
105
106
107
108
109
    parser.add_argument(
        "--metadata-filename",
        type=str,
        default="metadata.json",
        help="Filename for the metadata JSON file that describes the dataset to be dispatched.",
    )
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
    parser.add_argument(
        "--partitions-dir",
        type=str,
        help="Location of the partition-id mapping files which define node-ids and their respective partition-ids, relative to the input directory",
    )
    parser.add_argument(
        "--out-dir",
        type=str,
        help="Location of the output directory where the graph partitions will be created by this pipeline",
    )
    parser.add_argument(
        "--ip-config",
        type=str,
        help="File location of IP configuration for server processes",
    )
    parser.add_argument(
        "--master-port",
        type=int,
        default=12345,
        help="port used by gloo group to create randezvous point",
    )
    parser.add_argument(
        "--log-level",
133
        required=False,
134
        type=str,
135
136
137
        help="Log level to use for execution.",
        default="INFO",
        choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
    )
    parser.add_argument(
        "--python-path",
        type=str,
        default=sys.executable,
        help="Path to the Python executable on all workers",
    )
    parser.add_argument("--ssh-port", type=int, default=22, help="SSH Port.")
    parser.add_argument(
        "--process-group-timeout",
        type=int,
        default=1800,
        help="timeout[seconds] for operations executed against the process group",
    )
    parser.add_argument(
        "--save-orig-nids",
        action="store_true",
        help="Save original node IDs into files",
    )
    parser.add_argument(
        "--save-orig-eids",
        action="store_true",
        help="Save original edge IDs into files",
    )
162
163
164
165
166
    parser.add_argument(
        "--graph-formats",
        type=str,
        default=None,
        help="Save partitions in specified formats. It could be any combination(joined with ``,``) "
167
168
169
        "of ``coo``, ``csc`` and ``csr``. If not specified, save one format only according to "
        "what format is available. If multiple formats are available, selection priority "
        "from high to low is ``coo``, ``csc``, ``csr``.",
170
    )
171

172
173
174
175
176
177
178
    args, _ = parser.parse_known_args()

    fmt = "%(asctime)s %(levelname)s %(message)s"
    logging.basicConfig(
        format=fmt,
        level=getattr(logging, args.log_level, None),
    )
179
180
181
182
183
184
185
186

    assert os.path.isdir(args.in_dir)
    assert os.path.isdir(args.partitions_dir)
    assert os.path.isfile(args.ip_config)
    assert isinstance(args.master_port, int)

    submit_jobs(args)

187
188

if __name__ == "__main__":
189
    main()