Unverified Commit 331aed2c authored by anj-s's avatar anj-s Committed by GitHub
Browse files

[refactor] Add benchmark config object and validation function (#314)



* [refactor]Remove unused variables and refactor common configurations

* move helper function to call site

* fixed lint errors

* fix lint errors

* fix lint errors

* fix lint errors

* fix import order

* format files

* remove unused imports

* fix lint errors

* fix lint errors

* refactor common utilities

* address PR comments

* sorted imports

* add space

* modify comment

* added doc strings and addressed PR comments.

* addressed PR comments

* added another comment to clarify.

* fixing lint errors

* addressed PR comments

* addressed PR comments

* fixed typos

* initialize var

* rename seq_pred to lm

* fix lint errors

* move datasets and models into separate folders

* add the folders created

* fix lint errors

* create golden config to stats mapping

* add common batching for both synthetic and real data

* fixed lint errors

* enable real pipe benchmakrs with new golden data

* reduce seq len to avoid OOM

* updated golden data

* add logging

* add golden data

* add golden data

* fix lint errors

* add doc string

* remove unused class

* add seq len and batch size to the config

* remove commented out line

* address comments

* rename imports

* refactor common logic in dataloaders

* add golden configs

* lint changes

* merge latest changes

* lint errors

* address PR comments

* initial refactoring

* lint fixes

* fix lint errors

* update comment
Co-authored-by: default avatarAnjali Sridhar <anj@devfair0443.h2.fair>
parent 14491030
# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved.
def get_golden_real_stats():
return {
"reference_speed": 1430,
"reference_memory": 1220,
"reference_loss": 0.006,
}
def get_golden_synthetic_stats():
# TODO(anj-s): Add support for synthetic regression benchmarks
raise NotImplementedError("Synthetic data benchmarks are not supported.")
...@@ -10,6 +10,7 @@ import tempfile ...@@ -10,6 +10,7 @@ import tempfile
import time import time
from typing import Any, List, Optional, cast from typing import Any, List, Optional, cast
from golden_configs import oss_mnist
import numpy as np import numpy as np
import torch import torch
import torch.autograd.profiler as profiler import torch.autograd.profiler as profiler
...@@ -77,6 +78,37 @@ class OptimType(str, Enum): ...@@ -77,6 +78,37 @@ class OptimType(str, Enum):
everyone = "everyone" everyone = "everyone"
def validate_benchmark(measurements, args, check_regression):
"""Validate the measurments against the golden benchmark config."""
golden_data = oss_mnist.get_golden_real_stats()
max_memory = -1.0
rank = dist.get_rank()
if not args.cpu:
# TODO(anj-s): Check if we need to synchronize before we caculate total training time.
torch.cuda.synchronize(rank)
max_memory = torch.cuda.max_memory_allocated(rank) / 2 ** 20
logging.info(f"[{rank}] : Peak memory {max_memory:.1f}MiB")
measurements.sort()
median = measurements[len(measurements) // 2]
# Compute the median and median of absolute differences img per second.
abs_diff = list(map(lambda x: abs(x - median), measurements))
abs_diff.sort()
mad = abs_diff[len(measurements) // 2] if args.epochs > 2 else -1
# TODO(anj-s): Add a debug flag to perform the above calculation only when required.
logging.info(f"[{rank}] : Median speed: {median:.2f} +/- {mad:.2f}")
if check_regression and rank == 0:
assert (median + 3.0 * mad) > golden_data["reference_speed"], "Speed regression detected"
assert max_memory < 1.05 * golden_data["reference_memory"], "Memory use regression detected"
assert abs(cast(float, final_loss) - golden_data["reference_loss"]) < 1e-3, "Loss regression detected"
logging.info("[Regression Test] VALID")
def train( def train(
rank: int, rank: int,
args: argparse.Namespace, args: argparse.Namespace,
...@@ -142,7 +174,7 @@ def train( ...@@ -142,7 +174,7 @@ def train(
for batch in dataloader: for batch in dataloader:
if not args.cpu: if not args.cpu:
torch.cuda.synchronize(rank) torch.cuda.synchronize(rank)
batch__start = time.monotonic() batch_start = time.monotonic()
def closure(data=batch, grad_scaler=None): def closure(data=batch, grad_scaler=None):
model.zero_grad() model.zero_grad()
...@@ -173,27 +205,26 @@ def train( ...@@ -173,27 +205,26 @@ def train(
) )
return loss return loss
def run_closure(closure, scaler, optimizer):
if scaler is not None:
final_loss = closure(grad_scaler=scaler) # AMP scaler.step does not support closures
scaler.step(optimizer)
scaler.update()
return final_loss
else:
return optimizer.step(closure)
if need_profiling and not args.cpu: if need_profiling and not args.cpu:
logging.info("Profiling the run") logging.info("Profiling the run")
with profiler.profile(use_cuda=True, record_shapes=True, profile_memory=True) as prof: # type: ignore with profiler.profile(use_cuda=True, record_shapes=True, profile_memory=True) as prof: # type: ignore
with profiler.record_function("batch"): with profiler.record_function("batch"):
if scaler is not None: final_loss = run_closure(closure, scaler, optimizer)
final_loss = closure(grad_scaler=scaler) # AMP scaler.step does not support closures
scaler.step(optimizer)
scaler.update()
else:
final_loss = optimizer.step(closure)
prof.export_chrome_trace(f"{optim_type}_trace_rank_{rank}.json") prof.export_chrome_trace(f"{optim_type}_trace_rank_{rank}.json")
need_profiling = False # only profile once need_profiling = False # only profile once
else: else:
if scaler is not None: final_loss = run_closure(closure, scaler, optimizer)
final_loss = closure(grad_scaler=scaler) # AMP scaler.step does not support closures
scaler.step(optimizer)
scaler.update()
else:
final_loss = optimizer.step(closure)
if args.debug and rank == 0: if args.debug and rank == 0:
logging.debug("buffer: {}".format(next(model.buffers()).norm().item())) logging.debug("buffer: {}".format(next(model.buffers()).norm().item()))
...@@ -210,7 +241,7 @@ def train( ...@@ -210,7 +241,7 @@ def train(
torch.cuda.synchronize(rank) torch.cuda.synchronize(rank)
batch_end = time.monotonic() batch_end = time.monotonic()
epoch_runtime += batch_end - batch__start epoch_runtime += batch_end - batch_start
if optim_type == OptimType.oss_ddp or optim_type == OptimType.oss_sharded_ddp: if optim_type == OptimType.oss_ddp or optim_type == OptimType.oss_sharded_ddp:
# Check the checkpointing in the case of the OSS optimizer # Check the checkpointing in the case of the OSS optimizer
...@@ -225,32 +256,11 @@ def train( ...@@ -225,32 +256,11 @@ def train(
if dist.get_rank() == 0: if dist.get_rank() == 0:
logging.info(f"Epoch {epoch} - processed {measurements[-1]:.2f} img per sec. Loss {final_loss:.3f}") logging.info(f"Epoch {epoch} - processed {measurements[-1]:.2f} img per sec. Loss {final_loss:.3f}")
max_memory = -1.0
if not args.cpu:
torch.cuda.synchronize(rank)
max_memory = torch.cuda.max_memory_allocated(rank) / 2 ** 20
logging.info(f"[{dist.get_rank()}] : Peak memory {max_memory:.1f}MiB")
training_stop = time.monotonic() training_stop = time.monotonic()
img_per_sec = n_items / (training_stop - training_start) * args.epochs img_per_sec = n_items / (training_stop - training_start) * args.epochs
logging.info(f"[{dist.get_rank()}] : Training done. {img_per_sec:.2f} img per sec inc. checkpoint") logging.info(f"[{dist.get_rank()}] : Training done. {img_per_sec:.2f} img per sec inc. checkpoint")
# Compute the median and median of absolute differences img per second validate_benchmark(measurements, args, check_regression)
measurements.sort()
median = measurements[len(measurements) // 2]
abs_diff = list(map(lambda x: abs(x - median), measurements))
abs_diff.sort()
mad = abs_diff[len(measurements) // 2] if args.epochs > 2 else -1
logging.info(f"[{dist.get_rank()}] : Median speed: {median:.2f} +/- {mad:.2f}")
if check_regression and dist.get_rank() == 0:
assert (median + 3.0 * mad) > args.reference_speed, "Speed regression detected"
assert max_memory < 1.05 * args.reference_memory, "Memory use regression detected"
assert abs(cast(float, final_loss) - args.reference_loss) < 1e-3, "Loss regression detected"
logging.info("[Regression Test] VALID")
dist.destroy_process_group() # type: ignore dist.destroy_process_group() # type: ignore
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment