Unverified Commit 099b173f authored by Rhett Ying's avatar Rhett Ying Committed by GitHub
Browse files

[DistPart] expose timeout config for process group (#4532)



* [DistPart] expose timeout config for process group

* refine code

* Update tools/distpartitioning/data_proc_pipeline.py
Co-authored-by: default avatarMinjie Wang <wmjlyjemaine@gmail.com>
Co-authored-by: default avatarMinjie Wang <wmjlyjemaine@gmail.com>
parent cf035927
......@@ -171,9 +171,13 @@ def test_part_pipeline():
f.write('127.0.0.1\n')
f.write('127.0.0.2\n')
os.system('python tools/dispatch_data.py '\
'--in-dir {} --partitions-dir {} --out-dir {} --ip-config {}'.format(
in_dir, partition_dir, out_dir, ip_config))
cmd = 'python tools/dispatch_data.py'
cmd += f' --in-dir {in_dir}'
cmd += f' --partitions-dir {partition_dir}'
cmd += f' --out-dir {out_dir}'
cmd += f' --ip-config {ip_config}'
cmd += ' --process-group-timeout 60'
os.system(cmd)
# check metadata.json
meta_fname = os.path.join(out_dir, 'metadata.json')
......
......@@ -49,6 +49,7 @@ def submit_jobs(args) -> str:
argslist += "--schema {} ".format(schema_path)
argslist += "--num-parts {} ".format(num_parts)
argslist += "--output {} ".format(os.path.abspath(args.out_dir))
argslist += "--process-group-timeout {} ".format(args.process_group_timeout)
# (BarclayII) Is it safe to assume all the workers have the Python executable at the same path?
pipeline_cmd = os.path.join(INSTALL_DIR, PIPELINE_SCRIPT)
......@@ -69,6 +70,8 @@ def main():
parser.add_argument('--ip-config', type=str, help='File location of IP configuration for server processes')
parser.add_argument('--master-port', type=int, default=12345, help='port used by gloo group to create randezvous point')
parser.add_argument('--python-path', type=str, default=sys.executable, help='Path to the Python executable on all workers')
parser.add_argument('--process-group-timeout', type=int, default=1800,
help='timeout[seconds] for operations executed against the process group')
args, udf_command = parser.parse_known_args()
......
......@@ -19,8 +19,6 @@ OWNER_PROCESS = "owner_proc_id"
PART_LOCAL_NID = "part_local_nid"
GLOO_MESSAGING_TIMEOUT = 60 #seconds
STR_NODE_TYPE = "node_type"
STR_NUM_NODES_PER_CHUNK = "num_nodes_per_chunk"
STR_EDGE_TYPE = "edge_type"
......
import argparse
import numpy as np
import torch.multiprocessing as mp
import logging
import platform
import os
from data_shuffle import single_machine_run, multi_machine_run
def log_params(params):
......@@ -45,6 +42,9 @@ if __name__ == "__main__":
#arguments needed for the distributed implementation
parser.add_argument('--world-size', help='no. of processes to spawn',
default=1, type=int, required=True)
parser.add_argument('--process-group-timeout', required=True, type=int,
help='timeout[seconds] for operations executed against the process group '
'(see torch.distributed.init_process_group)')
params = parser.parse_args()
#invoke the pipeline function
......
......@@ -683,7 +683,7 @@ def multi_machine_run(params):
backend="gloo",
rank=rank,
world_size=params.world_size,
timeout=timedelta(seconds=constants.GLOO_MESSAGING_TIMEOUT))
timeout=timedelta(seconds=params.process_group_timeout))
logging.info(f'[Rank: {rank}] Done with process group initialization...')
#invoke the main function here.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment