Commit e82635eb authored by Francisc Bungiu's avatar Francisc Bungiu Committed by Facebook GitHub Bot
Browse files

Add ODS logging to all runners

Summary:
X-link: https://github.com/facebookresearch/detectron2/pull/5050

Pull Request resolved: https://github.com/facebookresearch/d2go/pull/606

Allow attaching a monitoring service to the training loop.

Reviewed By: miqueljubert

Differential Revision: D47595332

fbshipit-source-id: 49d770207aeea56113c008fcd29ad7b545cec849
parent 94c7f647
...@@ -6,7 +6,7 @@ import logging ...@@ -6,7 +6,7 @@ import logging
import os import os
from collections import OrderedDict from collections import OrderedDict
from functools import lru_cache from functools import lru_cache
from typing import List, Optional, Type, Union from typing import Any, List, Optional, Type, Union
import detectron2.utils.comm as comm import detectron2.utils.comm as comm
import torch import torch
...@@ -168,6 +168,11 @@ def prepare_fb_model(cfg: CfgNode, model: torch.nn.Module) -> torch.nn.Module: ...@@ -168,6 +168,11 @@ def prepare_fb_model(cfg: CfgNode, model: torch.nn.Module) -> torch.nn.Module:
return model return model
@fb_overwritable()
def get_monitoring_service() -> Any:
pass
class BaseRunner(object): class BaseRunner(object):
def __init__(self): def __init__(self):
identifier = f"D2Go.Runner.{self.__class__.__name__}" identifier = f"D2Go.Runner.{self.__class__.__name__}"
...@@ -529,123 +534,124 @@ class Detectron2GoRunner(D2GoDataAPIMixIn, BaseRunner): ...@@ -529,123 +534,124 @@ class Detectron2GoRunner(D2GoDataAPIMixIn, BaseRunner):
] ]
def do_train(self, cfg, model, resume): def do_train(self, cfg, model, resume):
# Note that flops at the beginning of training is often inaccurate, with get_monitoring_service():
# if a model has input-dependent logic # Note that flops at the beginning of training is often inaccurate,
attach_profilers(cfg, model) # if a model has input-dependent logic
attach_profilers(cfg, model)
if cfg.NUMA_BINDING is True:
import numa
num_gpus_per_node = comm.get_local_size()
num_sockets = numa.get_max_node() + 1
socket_id = torch.cuda.current_device() // (
max(num_gpus_per_node // num_sockets, 1)
)
node_mask = set([socket_id])
numa.bind(node_mask)
if cfg.NUMA_BINDING is True: optimizer = self.build_optimizer(cfg, model)
import numa scheduler = self.build_lr_scheduler(cfg, optimizer)
num_gpus_per_node = comm.get_local_size() checkpointer = self.build_checkpointer(
num_sockets = numa.get_max_node() + 1 cfg,
socket_id = torch.cuda.current_device() // ( model,
max(num_gpus_per_node // num_sockets, 1) save_dir=cfg.OUTPUT_DIR,
load_ckpt_to_gpu=cfg.LOAD_CKPT_TO_GPU,
optimizer=optimizer,
scheduler=scheduler,
) )
node_mask = set([socket_id]) checkpoint = checkpointer.resume_or_load(cfg.MODEL.WEIGHTS, resume=resume)
numa.bind(node_mask) start_iter = (
checkpoint.get("iteration", -1)
optimizer = self.build_optimizer(cfg, model) if resume and checkpointer.has_checkpoint()
scheduler = self.build_lr_scheduler(cfg, optimizer) else -1
checkpointer = self.build_checkpointer(
cfg,
model,
save_dir=cfg.OUTPUT_DIR,
load_ckpt_to_gpu=cfg.LOAD_CKPT_TO_GPU,
optimizer=optimizer,
scheduler=scheduler,
)
checkpoint = checkpointer.resume_or_load(cfg.MODEL.WEIGHTS, resume=resume)
start_iter = (
checkpoint.get("iteration", -1)
if resume and checkpointer.has_checkpoint()
else -1
)
del checkpoint
# The checkpoint stores the training iteration that just finished, thus we start
# at the next iteration (or iter zero if there's no checkpoint).
start_iter += 1
max_iter = cfg.SOLVER.MAX_ITER
periodic_checkpointer = PeriodicCheckpointer(
checkpointer, cfg.SOLVER.CHECKPOINT_PERIOD, max_iter=max_iter
)
data_loader = self.build_detection_train_loader(cfg)
def _get_model_with_abnormal_checker(model):
if not cfg.ABNORMAL_CHECKER.ENABLED:
return model
tbx_writer = self.get_tbx_writer(cfg)
writers = get_writers(cfg, tbx_writer)
checker = AbnormalLossChecker(start_iter, writers)
ret = AbnormalLossCheckerWrapper(model, checker)
return ret
if cfg.SOLVER.AMP.ENABLED:
trainer = AMPTrainer(
_get_model_with_abnormal_checker(model),
data_loader,
optimizer,
gather_metric_period=cfg.GATHER_METRIC_PERIOD,
zero_grad_before_forward=cfg.ZERO_GRAD_BEFORE_FORWARD,
grad_scaler=get_grad_scaler(cfg),
precision=parse_precision_from_string(
cfg.SOLVER.AMP.PRECISION, lightning=False
),
log_grad_scaler=cfg.SOLVER.AMP.LOG_GRAD_SCALER,
async_write_metrics=cfg.ASYNC_WRITE_METRICS,
) )
else: del checkpoint
trainer = SimpleTrainer( # The checkpoint stores the training iteration that just finished, thus we start
_get_model_with_abnormal_checker(model), # at the next iteration (or iter zero if there's no checkpoint).
data_loader, start_iter += 1
optimizer, max_iter = cfg.SOLVER.MAX_ITER
gather_metric_period=cfg.GATHER_METRIC_PERIOD, periodic_checkpointer = PeriodicCheckpointer(
zero_grad_before_forward=cfg.ZERO_GRAD_BEFORE_FORWARD, checkpointer, cfg.SOLVER.CHECKPOINT_PERIOD, max_iter=max_iter
async_write_metrics=cfg.ASYNC_WRITE_METRICS,
) )
if cfg.SOLVER.AMP.ENABLED and torch.cuda.is_available(): data_loader = self.build_detection_train_loader(cfg)
# Allow to use the TensorFloat32 (TF32) tensor cores, available on A100 GPUs.
# For more details https://pytorch.org/docs/stable/notes/cuda.html#tf32-on-ampere.
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
trainer_hooks = self._get_trainer_hooks( def _get_model_with_abnormal_checker(model):
cfg, model, optimizer, scheduler, periodic_checkpointer, trainer if not cfg.ABNORMAL_CHECKER.ENABLED:
) return model
if comm.is_main_process(): tbx_writer = self.get_tbx_writer(cfg)
assert ( writers = get_writers(cfg, tbx_writer)
cfg.GATHER_METRIC_PERIOD <= cfg.WRITER_PERIOD checker = AbnormalLossChecker(start_iter, writers)
and cfg.WRITER_PERIOD % cfg.GATHER_METRIC_PERIOD == 0 ret = AbnormalLossCheckerWrapper(model, checker)
), "WRITER_PERIOD needs to be divisible by GATHER_METRIC_PERIOD" return ret
tbx_writer = self.get_tbx_writer(cfg)
writers = [ if cfg.SOLVER.AMP.ENABLED:
CommonMetricPrinter(max_iter, window_size=cfg.WRITER_PERIOD), trainer = AMPTrainer(
JSONWriter( _get_model_with_abnormal_checker(model),
os.path.join(cfg.OUTPUT_DIR, "metrics.json"), data_loader,
window_size=cfg.WRITER_PERIOD, optimizer,
), gather_metric_period=cfg.GATHER_METRIC_PERIOD,
tbx_writer, zero_grad_before_forward=cfg.ZERO_GRAD_BEFORE_FORWARD,
] grad_scaler=get_grad_scaler(cfg),
trainer_hooks.append(hooks.PeriodicWriter(writers, cfg.WRITER_PERIOD)) precision=parse_precision_from_string(
update_hooks_from_registry(trainer_hooks, cfg) cfg.SOLVER.AMP.PRECISION, lightning=False
trainer.register_hooks(trainer_hooks) ),
trainer.train(start_iter, max_iter) log_grad_scaler=cfg.SOLVER.AMP.LOG_GRAD_SCALER,
async_write_metrics=cfg.ASYNC_WRITE_METRICS,
if hasattr(self, "original_cfg"): )
table = get_cfg_diff_table(cfg, self.original_cfg) else:
logger.info( trainer = SimpleTrainer(
"GeneralizeRCNN Runner ignoring training config change: \n" + table _get_model_with_abnormal_checker(model),
data_loader,
optimizer,
gather_metric_period=cfg.GATHER_METRIC_PERIOD,
zero_grad_before_forward=cfg.ZERO_GRAD_BEFORE_FORWARD,
async_write_metrics=cfg.ASYNC_WRITE_METRICS,
)
if cfg.SOLVER.AMP.ENABLED and torch.cuda.is_available():
# Allow to use the TensorFloat32 (TF32) tensor cores, available on A100 GPUs.
# For more details https://pytorch.org/docs/stable/notes/cuda.html#tf32-on-ampere.
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
trainer_hooks = self._get_trainer_hooks(
cfg, model, optimizer, scheduler, periodic_checkpointer, trainer
) )
trained_cfg = self.original_cfg.clone()
else: if comm.is_main_process():
trained_cfg = cfg.clone() assert (
with temp_defrost(trained_cfg): cfg.GATHER_METRIC_PERIOD <= cfg.WRITER_PERIOD
trained_cfg.MODEL.WEIGHTS = checkpointer.get_checkpoint_file() and cfg.WRITER_PERIOD % cfg.GATHER_METRIC_PERIOD == 0
return {"model_final": trained_cfg} ), "WRITER_PERIOD needs to be divisible by GATHER_METRIC_PERIOD"
tbx_writer = self.get_tbx_writer(cfg)
writers = [
CommonMetricPrinter(max_iter, window_size=cfg.WRITER_PERIOD),
JSONWriter(
os.path.join(cfg.OUTPUT_DIR, "metrics.json"),
window_size=cfg.WRITER_PERIOD,
),
tbx_writer,
]
trainer_hooks.append(hooks.PeriodicWriter(writers, cfg.WRITER_PERIOD))
update_hooks_from_registry(trainer_hooks, cfg)
trainer.register_hooks(trainer_hooks)
trainer.train(start_iter, max_iter)
if hasattr(self, "original_cfg"):
table = get_cfg_diff_table(cfg, self.original_cfg)
logger.info(
"GeneralizeRCNN Runner ignoring training config change: \n" + table
)
trained_cfg = self.original_cfg.clone()
else:
trained_cfg = cfg.clone()
with temp_defrost(trained_cfg):
trained_cfg.MODEL.WEIGHTS = checkpointer.get_checkpoint_file()
return {"model_final": trained_cfg}
@staticmethod @staticmethod
def get_evaluator(cfg, dataset_name, output_folder): def get_evaluator(cfg, dataset_name, output_folder):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment