Commit f0f55cdc authored by Sudarshan Raghunathan's avatar Sudarshan Raghunathan Committed by Facebook GitHub Bot
Browse files

Add reply files to d2go training processes

Summary:
This diff contains a minimal set of changes to support returning reply files to MAST.

There are three parts:
1. First, we have a try..except in the main function to catch all the "catchable" Python exceptions. Exceptions from C++ code or segfaults will not be handled here.
2. Each exception is then written to a per-process JSON reply file.
3. At the end, all per-process files are stat-ed and the earliest file is copied to a location specified by MAST.

# Limitations
1. This only works when local processes are launched using multiprocessing (which is the default)
2. If any error happens in C++ code - it will likely not be caught in Python and the reply file might not have the correct logs

Differential Revision: D43097683

fbshipit-source-id: 0eaf4f19f6199a9c77f2ce4c7d2bbc2a2078be99
parent b21607b1
...@@ -31,6 +31,11 @@ from d2go.utils.misc import ( ...@@ -31,6 +31,11 @@ from d2go.utils.misc import (
) )
from detectron2.engine.defaults import create_ddp_model from detectron2.engine.defaults import create_ddp_model
from torch.distributed.elastic.multiprocessing.errors import (
_NOT_AVAILABLE,
ChildFailedError,
get_error_handler,
)
logger = logging.getLogger("d2go.tools.train_net") logger = logging.getLogger("d2go.tools.train_net")
...@@ -42,58 +47,90 @@ def main( ...@@ -42,58 +47,90 @@ def main(
eval_only: bool = False, eval_only: bool = False,
resume: bool = True, # NOTE: always enable resume when running on cluster resume: bool = True, # NOTE: always enable resume when running on cluster
) -> Union[TrainNetOutput, TestNetOutput]: ) -> Union[TrainNetOutput, TestNetOutput]:
runner = setup_after_launch(cfg, output_dir, runner_class)
model = runner.build_model(cfg)
logger.info("Model:\n{}".format(model))
if eval_only: logger.info("Starting main")
checkpointer = runner.build_checkpointer(cfg, model, save_dir=output_dir) error_handler = get_error_handler()
# checkpointer.resume_or_load() will skip all additional checkpointable logger.debug(f">>>>>>> Error handler is: {type(error_handler)=}, {error_handler=}")
# which may not be desired like ema states error_handler.initialize()
if resume and checkpointer.has_checkpoint(): logger.debug("Error handler has been initialized")
checkpoint = checkpointer.resume_or_load(cfg.MODEL.WEIGHTS, resume=resume)
try: # Main error handler starts here...
logger.debug(f"Entered main for d2go, {runner_class=}")
runner = setup_after_launch(cfg, output_dir, runner_class)
model = runner.build_model(cfg)
logger.info("Model:\n{}".format(model))
if eval_only:
checkpointer = runner.build_checkpointer(cfg, model, save_dir=output_dir)
# checkpointer.resume_or_load() will skip all additional checkpointable
# which may not be desired like ema states
if resume and checkpointer.has_checkpoint():
checkpoint = checkpointer.resume_or_load(
cfg.MODEL.WEIGHTS, resume=resume
)
else:
checkpoint = checkpointer.load(cfg.MODEL.WEIGHTS)
train_iter = checkpoint.get("iteration", None)
model.eval()
metrics = runner.do_test(cfg, model, train_iter=train_iter)
print_metrics_table(metrics)
return TestNetOutput(
accuracy=metrics,
metrics=metrics,
)
# Use DDP if FSDP is not enabled
# TODO (T142223289): rewrite ddp wrapping as modeling hook
if not is_fsdp_enabled(cfg):
model = create_ddp_model(
model,
fp16_compression=cfg.MODEL.DDP_FP16_GRAD_COMPRESS,
device_ids=None
if cfg.MODEL.DEVICE == "cpu"
else [comm.get_local_rank()],
broadcast_buffers=False,
find_unused_parameters=cfg.MODEL.DDP_FIND_UNUSED_PARAMETERS,
)
logger.info("Starting train..")
trained_cfgs = runner.do_train(cfg, model, resume=resume)
final_eval = cfg.TEST.FINAL_EVAL
if final_eval:
# run evaluation after training in the same processes
metrics = runner.do_test(cfg, model)
print_metrics_table(metrics)
else: else:
checkpoint = checkpointer.load(cfg.MODEL.WEIGHTS) metrics = {}
train_iter = checkpoint.get("iteration", None)
model.eval() # dump config files for trained models
metrics = runner.do_test(cfg, model, train_iter=train_iter) trained_model_configs = dump_trained_model_configs(cfg.OUTPUT_DIR, trained_cfgs)
print_metrics_table(metrics) return TrainNetOutput(
return TestNetOutput( # for e2e_workflow
accuracy=metrics, accuracy=metrics,
# for unit_workflow
model_configs=trained_model_configs,
metrics=metrics, metrics=metrics,
) )
except ChildFailedError as e:
# Use DDP if FSDP is not enabled logger.info(f"Got a ChildFailedError: {e=}")
# TODO (T142223289): rewrite ddp wrapping as modeling hook rank, failure = e.get_first_failure()
if not is_fsdp_enabled(cfg): if failure.error_file != _NOT_AVAILABLE:
model = create_ddp_model( error_handler.dump_error_file(failure.error_file, failure.exitcode)
model, else:
fp16_compression=cfg.MODEL.DDP_FP16_GRAD_COMPRESS, logger.info(
device_ids=None if cfg.MODEL.DEVICE == "cpu" else [comm.get_local_rank()], (
broadcast_buffers=False, f"local_rank {rank} FAILED with no error file."
find_unused_parameters=cfg.MODEL.DDP_FIND_UNUSED_PARAMETERS, f" Decorate your entrypoint fn with @record for traceback info."
) f" See: https://pytorch.org/docs/stable/elastic/errors.html"
)
trained_cfgs = runner.do_train(cfg, model, resume=resume) )
raise
final_eval = cfg.TEST.FINAL_EVAL except Exception as e:
if final_eval: logger.info(f"Caught a generic exception: {e=}")
# run evaluation after training in the same processes error_handler.record_exception(e)
metrics = runner.do_test(cfg, model) raise
print_metrics_table(metrics)
else:
metrics = {}
# dump config files for trained models
trained_model_configs = dump_trained_model_configs(cfg.OUTPUT_DIR, trained_cfgs)
return TrainNetOutput(
# for e2e_workflow
accuracy=metrics,
# for unit_workflow
model_configs=trained_model_configs,
metrics=metrics,
)
def run_with_cmdline_args(args): def run_with_cmdline_args(args):
...@@ -122,6 +159,7 @@ def run_with_cmdline_args(args): ...@@ -122,6 +159,7 @@ def run_with_cmdline_args(args):
def cli(args=None): def cli(args=None):
logger.info(f"Inside CLI, {args=}")
parser = basic_argument_parser(requires_output_dir=False) parser = basic_argument_parser(requires_output_dir=False)
parser.add_argument( parser.add_argument(
"--eval-only", action="store_true", help="perform evaluation only" "--eval-only", action="store_true", help="perform evaluation only"
...@@ -153,4 +191,29 @@ def build_cli_args( ...@@ -153,4 +191,29 @@ def build_cli_args(
if __name__ == "__main__": if __name__ == "__main__":
setup_root_logger() setup_root_logger()
cli() logger.info("Starting CLI application")
try:
cli()
finally:
logging.info("Entering final reply file generation step")
import glob
import os
import shutil
torchx_reply_files = glob.glob("/tmp/torchx_*/**/*.json", recursive=True)
logger.info(
f"Found the following reply files on this host: {torchx_reply_files}"
)
first_reply_file = None
first_reply_file_st = float("Inf")
for f in torchx_reply_files:
if (mtime := os.stat(f).st_mtime) < first_reply_file_st:
first_reply_file = f
first_reply_file_st = mtime
if first_reply_file and os.environ.get("MAST_HPC_TASK_FAILURE_REPLY_FILE"):
logger.info(
f'Copying {first_reply_file=} to {os.environ["MAST_HPC_TASK_FAILURE_REPLY_FILE"]}'
)
shutil.copyfile(
first_reply_file, os.environ["MAST_HPC_TASK_FAILURE_REPLY_FILE"]
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment