Unverified Commit 47d4a79d authored by one's avatar one Committed by GitHub
Browse files

Benchmark: Model benchmark - deterministic training support (#731) (#2)



Adds opt-in deterministic training mode to SuperBench's PyTorch model
benchmarks. When enabled --enable-determinism. PyTorch deterministic
algorithms are enforced, and per-step numerical fingerprints (loss,
activation means) are recorded as metrics. These can be compared across
runs using the existing sb result diagnosis pipeline to verify bit-exact
reproducibility — useful for hardware validation and platform
comparison.
 
Flags added - 

--enable-determinism
--check-frequency: Number of steps after which you want the metrics to
be recorded
--deterministic-seed

Changes - 

Updated pytorch_base.py to handle deterministic settings, logging.
Added a new example script: pytorch_deterministic_example.py
Added a test file: test_pytorch_determinism_all.py to verify everything
works as expected.

Usage - 

Step 1: Run 1 - Run with --enable-determinism and the necessary metrics
will be recorded in the results-summary.jsonl file
Step 2: Generate the baseline file from the Run 1 results using - sb
result generate-baseline
Step 3: Run 2 - Run with --enable-determinism and the necessary metrics
will be recorded in the results-summary.jsonl file on a different
machine (or the same machine)
Step 4: Run diagnosis on the results generated from the 2 runs using the
- sb result diagnosis command

Note - 
1. Make sure all the parameters are constant between the 2 runs 
2. Running the diagnosis command requires the rules.yaml file

---------
Co-authored-by: default avatarAishwarya Tonpe <aishwarya.tonpe25@gmail.com>
Co-authored-by: default avatarUbuntu <rdadmin@HPCPLTNODE0.n3kgq4m0lhoednrx3hxtad2nha.cdmx.internal.cloudapp.net>
parent 8c28b69a
......@@ -34,6 +34,104 @@ For inference, supported percentiles include
**New: Support fp8_hybrid and fp8_e4m3 precision for BERT models.**
**New: Deterministic Training Support**
SuperBench now supports deterministic training to ensure reproducibility across runs. This includes fixed seeds and deterministic algorithms. To enable deterministic training, use the following flags:
- **Flags:**
- `--enable_determinism`: Enables deterministic computation for reproducible results.
- `--deterministic_seed <seed>`: Sets the seed for reproducibility (default: 42).
- `--check_frequency <steps>`: How often to record deterministic metrics (default: 100).
- **Environment Variables (set automatically by SuperBench when `--enable_determinism` is used):**
- `CUBLAS_WORKSPACE_CONFIG=:4096:8`: Ensures deterministic behavior in cuBLAS. This can be overridden by setting it manually before running SuperBench.
**Comparing Deterministic Results**
To compare deterministic results between runs, use the standard result analysis workflow:
1. Run benchmark with `--enable_determinism` flag
2. Generate baseline: `sb result generate-baseline --data-file results.jsonl --summary-rule-file rules.yaml`
3. Compare future runs: `sb result diagnosis --data-file new-results.jsonl --rule-file diagnosis-rule.yaml --baseline-file baseline.json`
This allows configurable tolerance for floating-point differences via YAML rules.
**Configuration Parameter Validation**
When determinism is enabled, benchmark configuration parameters (batch_size, num_steps, deterministic_seed, etc.) are automatically recorded in the results file as `deterministic_config_*` metrics. The diagnosis rules enforce exact matching of these parameters between runs to ensure valid comparisons:
If any configuration parameter differs between runs, the diagnosis will flag it as a failure, ensuring you only compare runs with identical configurations.
**Summary Rule Snippet for Determinism**
Include the following rule in your summary rule file (used with `sb result summary` or `sb result generate-baseline --summary-rule-file`) to surface deterministic metrics in the results summary:
```yaml
superbench:
rules:
model-benchmarks-deterministic:
statistics:
- mean
categories: Deterministic
metrics:
- model-benchmarks:.*/deterministic_loss.*
- model-benchmarks:.*/deterministic_act_mean.*
- model-benchmarks:.*/deterministic_check_count.*
- model-benchmarks:.*/deterministic_step.*
- model-benchmarks:.*/deterministic_config_.*
- model-benchmarks:.*/return_code.*
```
This groups all deterministic outputs — loss fingerprints, activation means, check counts, step numbers, configuration parameters, and return codes — under the **Deterministic** category.
**Diagnosis Rule Snippet for Determinism**
Include the following rules in your diagnosis rule file (used with `sb result diagnosis` or `sb result generate-baseline --diagnosis-rule-file`) to detect Silent Data Corruption (SDC) and validate configuration consistency:
```yaml
superbench:
rules:
deterministic_rule:
function: variance
criteria: "lambda x: x != 0"
categories: SDC-Fingerprint
metrics:
- model-benchmarks:.*/deterministic_loss.*
- model-benchmarks:.*/deterministic_act_mean.*
- model-benchmarks:.*/deterministic_check_count.*
deterministic_config_rule:
function: variance
criteria: "lambda x: x != 0"
categories: SDC-Config
metrics:
- model-benchmarks:.*/deterministic_config_batch_size.*
- model-benchmarks:.*/deterministic_config_num_steps.*
- model-benchmarks:.*/deterministic_config_num_warmup.*
- model-benchmarks:.*/deterministic_config_deterministic_seed.*
- model-benchmarks:.*/deterministic_config_check_frequency.*
- model-benchmarks:.*/deterministic_config_seq_len.*
- model-benchmarks:.*/deterministic_config_hidden_size.*
- model-benchmarks:.*/deterministic_config_num_classes.*
- model-benchmarks:.*/deterministic_config_input_size.*
- model-benchmarks:.*/deterministic_config_num_layers.*
- model-benchmarks:.*/deterministic_config_num_hidden_layers.*
- model-benchmarks:.*/deterministic_config_num_attention_heads.*
- model-benchmarks:.*/deterministic_config_intermediate_size.*
deterministic_failure_rule:
function: failure_check
criteria: "lambda x: x != 0"
categories: SDC-Failed
metrics:
- model-benchmarks:.*/return_code
```
- **SDC-Fingerprint** (`deterministic_rule`): Flags any node where loss, activation mean, or check count has *any* variance from baseline (`x != 0`), indicating a potential SDC issue.
- **SDC-Config** (`deterministic_config_rule`): Ensures all determinism configuration parameters (seed, batch size, sequence length, hidden size, etc.) are identical across nodes — any mismatch means the comparison is invalid.
- **SDC-Failed** (`deterministic_failure_rule`): Uses `failure_check` to catch nodes where the determinism benchmark failed to run or returned a non-zero exit code.
For complete rule files covering all benchmark categories (micro-benchmarks, NCCL, GPU copy bandwidth, NVBandwidth, etc.), refer to the rule file documentation in [Result Summary](../result-summary.md) and [Data Diagnosis](../data-diagnosis.md).
#### Metrics
| Name | Unit | Description |
......
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
"""Unified PyTorch deterministic training example for all supported models.
Deterministic metrics (loss, activation mean) are automatically stored in results
when --enable_determinism flag is enabled.
To compare deterministic results between runs, use the `sb result diagnosis` command
with a baseline file and comparison rules. See the SuperBench documentation for details.
Example workflow:
1. Run first benchmark (creates outputs/<timestamp>/results-summary.jsonl):
python3 examples/benchmarks/pytorch_deterministic_example.py \
--model resnet101 --enable_determinism --deterministic_seed 42
2. Generate baseline from results:
sb result generate-baseline --data-file outputs/<timestamp>/results-summary.jsonl \
--summary-rule-file summary-rules.yaml --output-dir outputs/<timestamp>
3. Run second benchmark:
python3 examples/benchmarks/pytorch_deterministic_example.py \
--model resnet101 --enable_determinism --deterministic_seed 42
4. Compare runs with diagnosis:
sb result diagnosis --data-file outputs/<run2-timestamp>/results-summary.jsonl \
--rule-file rules.yaml --baseline-file outputs/<run1-timestamp>/baseline.json
Note: CUBLAS_WORKSPACE_CONFIG is now automatically set by the code when determinism is enabled.
"""
import argparse
import json
import socket
from datetime import datetime
from pathlib import Path
from superbench.benchmarks import BenchmarkRegistry, Framework
from superbench.common.utils import logger
MODEL_CHOICES = [
'bert-large',
'gpt2-small',
'llama2-7b',
'mixtral-8x7b',
'resnet101',
'lstm',
]
DEFAULT_PARAMS = {
'bert-large':
'--batch_size 1 --seq_len 64 --num_warmup 1 --num_steps 200 --precision float32 '
'--model_action train --check_frequency 20',
'gpt2-small':
'--batch_size 1 --num_steps 300 --num_warmup 1 --seq_len 128 --precision float32 '
'--model_action train --check_frequency 20',
'llama2-7b':
'--batch_size 1 --num_steps 300 --num_warmup 1 --seq_len 512 --precision float32 --model_action train '
'--check_frequency 20',
'mixtral-8x7b':
'--hidden_size 4096 --num_hidden_layers 32 --num_attention_heads 32 --intermediate_size 14336 '
'--num_key_value_heads 8 --max_position_embeddings 32768 --router_aux_loss_coef 0.02 '
'--check_frequency 20',
'resnet101':
'--batch_size 1 --precision float32 --num_warmup 1 --num_steps 120 --sample_count 8192 '
'--pin_memory --model_action train --check_frequency 20',
'lstm':
'--batch_size 1 --num_steps 100 --num_warmup 2 --seq_len 64 --precision float32 '
'--model_action train --check_frequency 30',
}
def main():
"""Main function for determinism example file."""
parser = argparse.ArgumentParser(description='Unified PyTorch deterministic training example.')
parser.add_argument('--model', type=str, choices=MODEL_CHOICES, required=True, help='Model to run.')
parser.add_argument(
'--enable_determinism',
action='store_true',
help='Enable deterministic mode for reproducible results.',
)
parser.add_argument(
'--deterministic_seed',
type=int,
default=None,
help='Seed for deterministic training.',
)
args = parser.parse_args()
parameters = DEFAULT_PARAMS[args.model]
if args.enable_determinism:
parameters += ' --enable_determinism'
if args.deterministic_seed is not None:
parameters += f' --deterministic_seed {args.deterministic_seed}'
context = BenchmarkRegistry.create_benchmark_context(args.model, parameters=parameters, framework=Framework.PYTORCH)
benchmark = BenchmarkRegistry.launch_benchmark(context)
logger.info(f'Benchmark finished. Return code: {benchmark.return_code}')
# Create timestamped output directory
timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
output_dir = Path('outputs') / timestamp
output_dir.mkdir(parents=True, exist_ok=True)
# Parse benchmark results
benchmark_results = json.loads(benchmark.serialized_result)
benchmark_name = benchmark_results.get('name', f'pytorch-{args.model}')
# Convert to results-summary.jsonl format (flattened keys)
# Use format compatible with sb result commands: model-benchmarks:<category>/<benchmark>/<metric>
summary = {}
prefix = f'model-benchmarks:example:determinism/{benchmark_name}'
if 'result' in benchmark_results:
for metric, values in benchmark_results['result'].items():
# Use first value if it's a list
val = values[0] if isinstance(values, list) else values
# Add _rank0 suffix to deterministic metrics for compatibility with rules
if metric.startswith('deterministic_'):
metric_key = f'{prefix}/{metric}_rank0'
else:
metric_key = f'{prefix}/{metric}'
summary[metric_key] = val
# Add node identifier
summary['node'] = socket.gethostname()
# Write results-summary.jsonl
summary_file = output_dir / 'results-summary.jsonl'
with open(summary_file, 'w') as f:
f.write(json.dumps(summary))
logger.info(f'Results saved to {summary_file}')
# Also save full results for reference
full_results_file = output_dir / 'results-full.json'
with open(full_results_file, 'w') as f:
json.dump(benchmark_results, f, indent=2)
if 'raw_data' in benchmark_results and 'deterministic_loss' in benchmark_results['raw_data']:
num_checkpoints = len(benchmark_results['raw_data']['deterministic_loss'][0])
logger.info(f'Periodic fingerprints collected at {num_checkpoints} checkpoints')
logger.info(
f'To generate baseline: sb result generate-baseline '
f'--data-file {summary_file} --summary-rule-file summary-rules.yaml '
f'--output-dir {output_dir}'
)
logger.info('To compare results between runs, use `sb result diagnosis` command.')
if __name__ == '__main__':
main()
......@@ -150,6 +150,33 @@ def generate_baseline(self, algo, aggregated_df, diagnosis_rule_file, baseline):
aggregated_df[metrics[index]] = out[1]
return baseline
def _format_metric_value(self, metric, val, digit):
"""Format a single baseline metric value based on its type.
Args:
metric (str): the metric name.
val: the metric value.
digit (int): the number of digits after the decimal point.
Returns:
The formatted metric value.
"""
if metric not in self._raw_data_df:
return val
sample = self._raw_data_df[metric].iloc[0]
if isinstance(sample, float):
# Keep full precision for deterministic metrics to avoid false positives in diagnosis
if 'deterministic' in metric:
return float(val)
return f'%.{digit}g' % val if abs(val) < 1 else f'%.{digit}f' % val
if isinstance(sample, int):
return int(val)
try:
return float(val)
except Exception as e:
logger.error('Analyzer: {} baseline is not numeric, msg: {}'.format(metric, str(e)))
return val
def run(
self, raw_data_file, summary_rule_file, diagnosis_rule_file, pre_baseline_file, algorithm, output_dir, digit=2
):
......@@ -174,19 +201,9 @@ def run(
# generate baseline accordint to rules in diagnosis and fix threshold outlier detection method
baseline = self.generate_baseline(algorithm, self._raw_data_df, diagnosis_rule_file, baseline)
for metric in baseline:
val = baseline[metric]
if metric in self._raw_data_df:
if isinstance(self._raw_data_df[metric].iloc[0], float):
baseline[metric] = f'%.{digit}g' % val if abs(val) < 1 else f'%.{digit}f' % val
elif isinstance(self._raw_data_df[metric].iloc[0], int):
baseline[metric] = int(val)
else:
try:
baseline[metric] = float(val)
except Exception as e:
logger.error('Analyzer: {} baseline is not numeric, msg: {}'.format(metric, str(e)))
baseline[metric] = self._format_metric_value(metric, baseline[metric], digit)
baseline = json.dumps(baseline, indent=2, sort_keys=True)
baseline = re.sub(r': \"(\d+.?\d*)\"', r': \1', baseline)
baseline = re.sub(r': \"(-?\d+\.?\d*)\"', r': \1', baseline)
with (Path(output_dir) / 'baseline.json').open('w') as f:
f.write(baseline)
......
......@@ -238,7 +238,10 @@ def output_all_nodes_results(self, raw_data_df, data_not_accept_df):
'Category','Defective Details']
"""
append_columns = ['Accept', 'Number Of Issues', 'Category', 'Defective Details']
all_data_df = (raw_data_df).astype('float64')
# Preserve all columns, but only convert numeric columns to float64
all_data_df = raw_data_df.copy()
numeric_cols = all_data_df.select_dtypes(include=['number']).columns
all_data_df[numeric_cols] = all_data_df[numeric_cols].astype('float64')
if data_not_accept_df.shape[0] == 0:
all_data_df['Accept'] = [True for i in range(len(all_data_df))]
......
......@@ -110,14 +110,25 @@ def parse_args(self, ignore_invalid=False):
logger.error('Invalid argument - benchmark: {}, message: {}.'.format(self._name, str(e)))
return False, None, []
ret = True
ret = self._check_unknown_args(unknown)
return ret, args, unknown
def _check_unknown_args(self, unknown):
"""Check for unknown arguments and log an error if any are found.
Args:
unknown (list): List of unknown arguments.
Returns:
bool: False if unknown arguments are found, True otherwise.
"""
if len(unknown) > 0:
logger.error(
'Unknown arguments - benchmark: {}, unknown arguments: {}'.format(self._name, ' '.join(unknown))
)
ret = False
return ret, args, unknown
return False
return True
def _preprocess(self):
"""Preprocess/preparation operations before the benchmarking.
......
......@@ -186,6 +186,17 @@ def _generate_dataset(self):
"""
pass
def set_deterministic_seed(self):
"""Hook to set deterministic RNG state before dataset generation.
Framework-specific subclasses may
override this to apply deterministic RNG settings (for example,
PyTorch benchmarks implement this to call their deterministic setup
when requested). This is called from _preprocess() before
_generate_dataset().
"""
return None
@abstractmethod
def _init_dataloader(self):
"""Initialize the dataloader.
......@@ -221,6 +232,12 @@ def _preprocess(self):
self._result.set_return_code(ReturnCode.DISTRIBUTED_SETTING_INIT_FAILURE)
return False
# Invoke model-specific deterministic seeding hook before dataset generation
try:
self.set_deterministic_seed()
except Exception:
logger.info('set_deterministic_seed() hook failed or not implemented for model: %s', self._name)
# Set sample_count aligned with batch_size.
self._args.sample_count = math.ceil(self._args.sample_count / self._args.batch_size) * self._args.batch_size
......
......@@ -4,8 +4,9 @@
"""Module of the Pytorch model-benchmark base class."""
import os
from datetime import timedelta
import statistics
import time
from datetime import timedelta
import torch
import transformers
......@@ -13,11 +14,17 @@
import transformer_engine.pytorch as te
except ImportError:
te = None
from torch.utils.data import DataLoader
from torch.distributed import TCPStore, PrefixStore
from torch.utils.data import DataLoader
from superbench.common.utils import logger
from superbench.benchmarks import Framework, ReturnCode, DistributedBackend, DistributedImpl
from superbench.common import model_log_utils
from superbench.benchmarks import (
Framework,
ReturnCode,
DistributedBackend,
DistributedImpl,
)
from superbench.benchmarks.model_benchmarks.model_base import Optimizer, ModelBenchmark
......@@ -30,15 +37,248 @@ def __init__(self, name, parameters=''):
name (str): benchmark name.
parameters (str): benchmark parameters.
"""
# Set CUBLAS_WORKSPACE_CONFIG early, before parent init which might parse args
# This ensures it's set before any CUDA operations if determinism is enabled
if 'enable_determinism' in parameters:
os.environ.setdefault('CUBLAS_WORKSPACE_CONFIG', ':4096:8')
super().__init__(name, parameters)
self._framework = Framework.PYTORCH
torch.backends.cudnn.benchmark = True
self._model_run_losses = []
self._model_run_periodic = {}
def _judge_gpu_availability(self):
"""Judge GPUs' availability according to arguments and running environment."""
self._gpu_available = not self._args.no_gpu and torch.cuda.is_available()
def _enable_deterministic_training(self):
"""Enable deterministic training settings for reproducible results."""
# Set CUBLAS_WORKSPACE_CONFIG (should already be set in __init__, but ensure it's set as backup)
os.environ.setdefault('CUBLAS_WORKSPACE_CONFIG', ':4096:8')
if hasattr(self._args, 'deterministic_seed'):
import random
torch.manual_seed(self._args.deterministic_seed)
random.seed(self._args.deterministic_seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(self._args.deterministic_seed)
torch.use_deterministic_algorithms(True, warn_only=False)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
# Disable TF32 to remove potential numerical variability
try:
torch.backends.cuda.matmul.allow_tf32 = False
except Exception:
logger.warning('Failed to disable TF32 in cuda matmul')
try:
torch.backends.cudnn.allow_tf32 = False
except Exception:
logger.warning('Failed to disable TF32 in cuDNN')
# Force Scaled Dot-Product Attention to use deterministic math kernel
try:
torch.backends.cuda.enable_flash_sdp(False)
torch.backends.cuda.enable_mem_efficient_sdp(False)
except Exception:
logger.warning('SDP kernel backend configuration not available')
# Older PyTorch versions may not expose these APIs; ignore in that case
def record_determinism_fingerprint(self, curr_step, loss, logits, periodic, check_frequency):
"""Centralized logic for recording per-step loss and periodic fingerprints for deterministic runs.
Args:
curr_step (int): Current training step.
loss (torch.Tensor or float): Loss value for this step.
logits (torch.Tensor or float): Logits output for this step (sample 0).
periodic (dict): Dictionary to store periodic fingerprints ('loss', 'act_mean', 'step').
check_frequency (int): Frequency for fingerprint logging.
"""
enable_determinism = getattr(self._args, 'enable_determinism', False)
# If determinism is not enabled, skip determinism-specific logging to avoid unnecessary GPU syncs.
if not enable_determinism:
return
# Record per-step loss for determinism checks
loss_value = model_log_utils.record_step_loss(loss, curr_step, self._model_run_losses, logger)
# Record periodic fingerprint (loss and activation mean)
model_log_utils.record_periodic_fingerprint(
curr_step,
loss_value,
logits,
periodic,
check_frequency,
enable_determinism,
logger,
)
def _finalize_periodic_logging(self, periodic, info_key='loss'):
"""Finalize periodic logging and return info dict for training step."""
info = {info_key: periodic.get(info_key, [])}
if self._model_run_periodic and getattr(self._args, 'enable_determinism', False):
logger.warning(
'Deterministic periodic data is being overwritten by a subsequent precision/action run. '
"Only the last run's deterministic metrics will be reported. "
'Consider using a single precision when enable_determinism is set.'
)
self._model_run_periodic = dict(periodic)
return info
def add_parser_arguments(self):
"""Add PyTorch model benchmark-specific arguments to the argument parser."""
super().add_parser_arguments()
self._parser.add_argument(
'--deterministic_seed',
type=int,
default=42,
required=False,
help='Random seed for deterministic training.',
)
self._parser.add_argument(
'--enable_determinism',
action='store_true',
default=False,
help='Enable deterministic training for reproducible results.',
)
self._parser.add_argument(
'--check_frequency',
type=int,
default=100,
required=False,
help='How often (in steps) to run lightweight periodic checks/logs and evaluate early-stop conditions.',
)
def _post_run_model_log(self):
"""Add deterministic metrics to results.
Deterministic metrics (loss, activation mean) are stored in the results file alongside
other benchmark metrics. These can later be compared using `sb result diagnosis`.
"""
# Add deterministic metrics to result system (all ranks add their own metrics)
if getattr(self._args, 'enable_determinism', False):
self._add_deterministic_metrics_to_result()
def _add_deterministic_metrics_to_result(self):
"""Add deterministic fingerprints and losses to the benchmark result system.
This makes deterministic metrics visible in results-summary.json alongside
other benchmark metrics. In distributed training, metrics include rank information.
"""
# Add periodic fingerprints (loss, activation mean) to results
if self._model_run_periodic:
for key, values in self._model_run_periodic.items():
if isinstance(values, list) and values:
# Include rank in metric name for distributed training
if self._global_rank is not None:
metric_name = f'deterministic_{key}_rank{self._global_rank}'
else:
metric_name = f'deterministic_{key}'
# Add summarized result (mean of checkpointed values)
filtered_values = [v for v in values if v is not None]
if filtered_values:
self._result.add_result(metric_name, statistics.mean(filtered_values))
else:
# No valid (non-None) values recorded; record NaN to avoid StatisticsError
self._result.add_result(metric_name, float('nan'))
# Add count of deterministic checks performed
if self._model_run_periodic.get('step'):
if self._global_rank is not None:
metric_name = f'deterministic_check_count_rank{self._global_rank}'
else:
metric_name = 'deterministic_check_count'
self._result.add_result(metric_name, len(self._model_run_periodic['step']))
# Add configuration parameters for validation
self._add_determinism_config_to_result()
def _add_determinism_config_to_result(self):
"""Add benchmark configuration parameters as metrics for determinism validation.
These parameters are included in the results file so they can be compared
between runs using diagnosis rules. This ensures runs being compared used
identical configurations.
"""
# Configuration parameters to include in results for validation
config_params = {
'batch_size': getattr(self._args, 'batch_size', None),
'num_steps': getattr(self._args, 'num_steps', None),
'num_warmup': getattr(self._args, 'num_warmup', None),
'deterministic_seed': getattr(self._args, 'deterministic_seed', None),
'check_frequency': getattr(self._args, 'check_frequency', None),
'seq_len': getattr(self._args, 'seq_len', None),
'hidden_size': getattr(self._args, 'hidden_size', None),
'num_classes': getattr(self._args, 'num_classes', None),
'input_size': getattr(self._args, 'input_size', None),
'num_layers': getattr(self._args, 'num_layers', None),
'num_hidden_layers': getattr(self._args, 'num_hidden_layers', None),
'num_attention_heads': getattr(self._args, 'num_attention_heads', None),
'intermediate_size': getattr(self._args, 'intermediate_size', None),
}
for param_name, value in config_params.items():
if value is not None:
metric_name = f'deterministic_config_{param_name}'
self._result.add_result(metric_name, value)
def _create_target(self, num_classes):
"""Create target tensor for training, using a deterministic generator when determinism is enabled.
Args:
num_classes (int): Number of classes for random target generation.
Return:
torch.LongTensor: Target tensor of shape (batch_size,).
"""
generator = None
if getattr(self._args, 'enable_determinism', False) and hasattr(self._args, 'deterministic_seed'):
generator = torch.Generator()
generator.manual_seed(self._args.deterministic_seed + 1)
if generator is not None:
target = torch.LongTensor(self._args.batch_size).random_(num_classes, generator=generator)
else:
target = torch.LongTensor(self._args.batch_size).random_(num_classes)
if self._gpu_available:
target = target.cuda()
return target
def _preprocess(self):
"""Preprocess and apply PyTorch-specific defaults."""
preprocess_ok = super()._preprocess()
if not preprocess_ok:
return False
return True
def set_deterministic_seed(self):
"""Set deterministic RNGs centrally for PyTorch benchmarks.
This will set the seeds and deterministic flags prior to dataset generation
so per-model dataset generation is reproducible without each model needing
to call torch.manual_seed().
"""
if getattr(self._args, 'enable_determinism', False):
# Validate check_frequency before any deterministic operations
check_freq = getattr(self._args, 'check_frequency', 100)
if not isinstance(check_freq, int) or check_freq <= 0:
logger.error(
f'Invalid check_frequency={check_freq}. Must be a positive integer >= 1. '
'Defaulting to 100.'
)
self._args.check_frequency = 100
try:
self._enable_deterministic_training()
except Exception:
logger.error(
'Failed to enable deterministic training. '
'Disabling enable_determinism to avoid silently non-deterministic results.'
)
self._args.enable_determinism = False
def _set_force_fp32(self):
"""Set the config that controls whether full float32 precision will be used.
......@@ -150,6 +390,7 @@ def _init_dataloader(self):
if self._args.distributed_impl:
if self._args.distributed_impl == DistributedImpl.HOROVOD:
import horovod.torch as hvd
train_sampler = \
torch.utils.data.distributed.DistributedSampler(
self._dataset,
......@@ -347,18 +588,23 @@ def _timer(self):
def _benchmark(self):
"""Wrap super._benchmark with profiler context if enabled by environment variable.
Run the benchmark then handle post-run model log save/compare.
Set SB_ENABLE_PYTORCH_PROFILER='1' to enable profiling.
"""
# Check if this is a Nvidia GPU
if not (torch.cuda.is_available() and torch.version.cuda is not None):
return super()._benchmark()
ok = super()._benchmark()
self._post_run_model_log()
return ok
# Check if profiling is enabled via environment variable
enable_profiler = os.environ.get('SB_ENABLE_PYTORCH_PROFILER', '0') == '1'
if not enable_profiler:
# Run without profiling
return super()._benchmark()
ok = super()._benchmark()
self._post_run_model_log()
return ok
# Run with profiling enabled
logger.info('PyTorch profiler enabled for model: {}'.format(self._name))
......@@ -397,4 +643,6 @@ def _benchmark(self):
with open(diag_agent_dump_file_path, 'w') as f:
json.dump(diag_agent_events, f, sort_keys=True)
# Handle post-run model log save/compare regardless of profiling
self._post_run_model_log()
return ret
......@@ -151,9 +151,7 @@ def _create_model(self, precision):
)
return False
self._target = torch.LongTensor(self._args.batch_size).random_(self._args.num_classes)
if self._gpu_available:
self._target = self._target.cuda()
self._target = self._create_target(self._args.num_classes)
return True
......@@ -164,11 +162,11 @@ def _train_step(self, precision):
precision (Precision): precision of model and input data, such as float32, float16.
Return:
The step-time list of every training step.
A tuple of (step_times_ms, info) of every training step.
"""
duration = []
periodic = {'loss': [], 'act_mean': [], 'step': []}
curr_step = 0
check_frequency = 100
while True:
for idx, sample in enumerate(self._dataloader):
start = self._timer()
......@@ -182,17 +180,22 @@ def _train_step(self, precision):
output = self._model(sample)
else:
output = self._model(sample)
loss = self._loss_fn(output, self._target)
logits = output
# Use FP32 logits for loss only when determinism is enabled; otherwise
# keep logits in their native precision to preserve benchmark semantics.
enable_determinism = getattr(self._args, 'enable_determinism', False)
logits_for_loss = logits.float() if enable_determinism else logits
loss = self._loss_fn(logits_for_loss, self._target)
loss.backward()
self._optimizer.step()
end = self._timer()
curr_step += 1
if curr_step > self._args.num_warmup:
# Save the step time of every training/inference step, unit is millisecond.
duration.append((end - start) * 1000)
self.record_determinism_fingerprint(curr_step, loss, logits, periodic, self._args.check_frequency)
self._log_step_time(curr_step, precision, duration)
if self._is_finished(curr_step, end, check_frequency):
return duration
if self._is_finished(curr_step, end, self._args.check_frequency):
return duration, self._finalize_periodic_logging(periodic)
def _inference_step(self, precision):
"""Define the inference process.
......@@ -226,7 +229,7 @@ def _inference_step(self, precision):
# Save the step time of every training/inference step, unit is millisecond.
duration.append((end - start) * 1000)
self._log_step_time(curr_step, precision, duration)
if self._is_finished(curr_step, end):
if self._is_finished(curr_step, end, self._args.check_frequency):
return duration
......
......@@ -80,9 +80,7 @@ def _create_model(self, precision):
)
return False
self._target = torch.LongTensor(self._args.batch_size).random_(self._args.num_classes)
if self._gpu_available:
self._target = self._target.cuda()
self._target = self._create_target(self._args.num_classes)
return True
......@@ -93,11 +91,11 @@ def _train_step(self, precision):
precision (Precision): precision of model and input data, such as float32, float16.
Return:
The step-time list of every training step.
A tuple of (step_times_ms, info) of every training step.
"""
duration = []
periodic = {'loss': [], 'act_mean': [], 'step': []}
curr_step = 0
check_frequency = 100
while True:
for idx, sample in enumerate(self._dataloader):
sample = sample.to(dtype=getattr(torch, precision.value))
......@@ -108,7 +106,9 @@ def _train_step(self, precision):
start = self._timer()
self._optimizer.zero_grad()
output = self._model(sample)
loss = self._loss_fn(output, self._target)
enable_determinism = getattr(self._args, 'enable_determinism', False)
logits_for_loss = output.float() if enable_determinism else output
loss = self._loss_fn(logits_for_loss, self._target)
loss.backward()
self._optimizer.step()
end = self._timer()
......@@ -116,9 +116,10 @@ def _train_step(self, precision):
if curr_step > self._args.num_warmup:
# Save the step time of every training/inference step, unit is millisecond.
duration.append((end - start) * 1000)
self.record_determinism_fingerprint(curr_step, loss, output, periodic, self._args.check_frequency)
self._log_step_time(curr_step, precision, duration)
if self._is_finished(curr_step, end, check_frequency):
return duration
if self._is_finished(curr_step, end, self._args.check_frequency):
return duration, self._finalize_periodic_logging(periodic)
def _inference_step(self, precision):
"""Define the inference process.
......@@ -149,7 +150,7 @@ def _inference_step(self, precision):
# Save the step time of every training/inference step, unit is millisecond.
duration.append((end - start) * 1000)
self._log_step_time(curr_step, precision, duration)
if self._is_finished(curr_step, end):
if self._is_finished(curr_step, end, self._args.check_frequency):
return duration
......
......@@ -36,7 +36,7 @@ def forward(self, input):
Args:
input (torch.LongTensor): Indices of input sequence tokens in the vocabulary,
shape (batch_size, sequence_length).
shape (batch_size, sequence_length).
Return:
result (torch.FloatTensor): Last layer hidden-state of the first token of the sequence
......@@ -145,9 +145,7 @@ def _create_model(self, precision):
)
return False
self._target = torch.LongTensor(self._args.batch_size).random_(self._args.num_classes)
if self._gpu_available:
self._target = self._target.cuda()
self._target = self._create_target(self._args.num_classes)
return True
......@@ -158,11 +156,11 @@ def _train_step(self, precision):
precision (Precision): precision of model and input data, such as float32, float16.
Return:
The step-time list of every training step.
A tuple of (step_times_ms, info) of every training step.
"""
duration = []
periodic = {'loss': [], 'act_mean': [], 'step': []}
curr_step = 0
check_frequency = 100
while True:
for idx, sample in enumerate(self._dataloader):
start = self._timer()
......@@ -176,7 +174,12 @@ def _train_step(self, precision):
output = self._model(sample)
else:
output = self._model(sample)
loss = self._loss_fn(output[range(self._args.batch_size), -1], self._target)
logits = output[range(self._args.batch_size), -1]
# Use FP32 logits for loss only when determinism is enabled; otherwise
# keep logits in their native precision to preserve benchmark semantics.
enable_determinism = getattr(self._args, 'enable_determinism', False)
logits_for_loss = logits.float() if enable_determinism else logits
loss = self._loss_fn(logits_for_loss, self._target)
loss.backward()
self._optimizer.step()
end = self._timer()
......@@ -184,9 +187,10 @@ def _train_step(self, precision):
if curr_step > self._args.num_warmup:
# Save the step time of every training/inference step, unit is millisecond.
duration.append((end - start) * 1000)
self.record_determinism_fingerprint(curr_step, loss, logits, periodic, self._args.check_frequency)
self._log_step_time(curr_step, precision, duration)
if self._is_finished(curr_step, end, check_frequency):
return duration
if self._is_finished(curr_step, end, self._args.check_frequency):
return duration, self._finalize_periodic_logging(periodic)
def _inference_step(self, precision):
"""Define the inference process.
......@@ -220,7 +224,7 @@ def _inference_step(self, precision):
# Save the step time of every training/inference step, unit is millisecond.
duration.append((end - start) * 1000)
self._log_step_time(curr_step, precision, duration)
if self._is_finished(curr_step, end):
if self._is_finished(curr_step, end, self._args.check_frequency):
return duration
......
......@@ -165,9 +165,7 @@ def _create_model(self, precision):
)
return False
self._target = torch.LongTensor(self._args.batch_size).random_(self._args.num_classes)
if self._gpu_available:
self._target = self._target.cuda()
self._target = self._create_target(self._args.num_classes)
return True
......@@ -178,11 +176,11 @@ def _train_step(self, precision):
precision (Precision): precision of model and input data, such as float32, float16.
Return:
The step-time list of every training step.
A tuple of (step_times_ms, info) of every training step.
"""
duration = []
periodic = {'loss': [], 'act_mean': [], 'step': []}
curr_step = 0
check_frequency = 100
while True:
for idx, sample in enumerate(self._dataloader):
start = self._timer()
......@@ -196,17 +194,22 @@ def _train_step(self, precision):
output = self._model(sample)
else:
output = self._model(sample)
loss = self._loss_fn(output[range(self._args.batch_size), -1], self._target)
logits = output[range(self._args.batch_size), -1]
# Use FP32 logits for loss only when determinism is enabled; otherwise
# keep logits in their native precision to preserve benchmark semantics.
enable_determinism = getattr(self._args, 'enable_determinism', False)
logits_for_loss = logits.float() if enable_determinism else logits
loss = self._loss_fn(logits_for_loss, self._target)
loss.backward()
self._optimizer.step()
end = self._timer()
curr_step += 1
if curr_step > self._args.num_warmup:
# Save the step time of every training/inference step, unit is millisecond.
duration.append((end - start) * 1000)
self.record_determinism_fingerprint(curr_step, loss, logits, periodic, self._args.check_frequency)
self._log_step_time(curr_step, precision, duration)
if self._is_finished(curr_step, end, check_frequency):
return duration
if self._is_finished(curr_step, end, self._args.check_frequency):
return duration, self._finalize_periodic_logging(periodic)
def _inference_step(self, precision):
"""Define the inference process.
......@@ -237,10 +240,9 @@ def _inference_step(self, precision):
end = self._timer()
curr_step += 1
if curr_step > self._args.num_warmup:
# Save the step time of every training/inference step, unit is millisecond.
duration.append((end - start) * 1000)
self._log_step_time(curr_step, precision, duration)
if self._is_finished(curr_step, end):
if self._is_finished(curr_step, end, self._args.check_frequency):
return duration
......
......@@ -120,9 +120,7 @@ def _create_model(self, precision):
)
return False
self._target = torch.LongTensor(self._args.batch_size).random_(self._args.num_classes)
if self._gpu_available:
self._target = self._target.cuda()
self._target = self._create_target(self._args.num_classes)
return True
......@@ -133,11 +131,11 @@ def _train_step(self, precision):
precision (Precision): precision of model and input data, such as float32, float16.
Return:
The step-time list of every training step.
A tuple of (step_times_ms, info) of every training step.
"""
duration = []
periodic = {'loss': [], 'act_mean': [], 'step': []}
curr_step = 0
check_frequency = 100
while True:
for idx, sample in enumerate(self._dataloader):
sample = sample.to(dtype=getattr(torch, precision.value))
......@@ -148,17 +146,19 @@ def _train_step(self, precision):
start = self._timer()
self._optimizer.zero_grad()
output = self._model(sample)
loss = self._loss_fn(output, self._target)
enable_determinism = getattr(self._args, 'enable_determinism', False)
logits_for_loss = output.float() if enable_determinism else output
loss = self._loss_fn(logits_for_loss, self._target)
loss.backward()
self._optimizer.step()
end = self._timer()
curr_step += 1
if curr_step > self._args.num_warmup:
# Save the step time of every training/inference step, unit is millisecond.
duration.append((end - start) * 1000)
self.record_determinism_fingerprint(curr_step, loss, output, periodic, self._args.check_frequency)
self._log_step_time(curr_step, precision, duration)
if self._is_finished(curr_step, end, check_frequency):
return duration
if self._is_finished(curr_step, end, self._args.check_frequency):
return duration, self._finalize_periodic_logging(periodic)
def _inference_step(self, precision):
"""Define the inference process.
......@@ -189,7 +189,7 @@ def _inference_step(self, precision):
# Save the step time of every training/inference step, unit is millisecond.
duration.append((end - start) * 1000)
self._log_step_time(curr_step, precision, duration)
if self._is_finished(curr_step, end):
if self._is_finished(curr_step, end, self._args.check_frequency):
return duration
......
......@@ -134,7 +134,26 @@ def _create_model(self, precision):
Args:
precision (Precision): precision of model and input data, such as float32, float16.
"""
self._config = MixtralConfig(
self._config = self._build_config()
if not self._check_fp8_support(precision):
return False
try:
self._model = self._instantiate_model()
self._postprocess_model(precision)
except Exception as e:
logger.error(
'Create model with specified precision failed - model: {}, precision: {}, message: {}.'.format(
self._name, precision, str(e)
)
)
return False
self._setup_target()
return True
def _build_config(self):
return MixtralConfig(
hidden_size=self._args.hidden_size,
num_hidden_layers=self._args.num_hidden_layers,
num_attention_heads=self._args.num_attention_heads,
......@@ -144,46 +163,42 @@ def _create_model(self, precision):
router_aux_loss_coef=self._args.router_aux_loss_coef,
)
def _check_fp8_support(self, precision):
enable_fp8 = precision.name.startswith('FP8_')
if enable_fp8 and te is None:
logger.error(
f'Create model with fp8 failed - model: {self._name}, precision: {precision},'
' message: Cannot find transformer_engine.'
f'Create model with fp8 failed - model: {self._name}, precision: {precision}, '
'message: Cannot find transformer_engine.'
)
return False
if enable_fp8 and not self._gpu_available:
logger.error(
f'Create model with fp8 failed - model: {self._name}, precision: {precision},'
' message: FP8 is only supported on GPU.'
f'Create model with fp8 failed - model: {self._name}, precision: {precision}, '
'message: FP8 is only supported on GPU.'
)
return False
return True
try:
self._model = MixtralBenchmarkModel(self._config, self._args.num_classes)
if enable_fp8:
self._fp8_recipe = DelayedScaling(
fp8_format=Format[precision.name.strip('FP8_')],
amax_history_len=16,
amax_compute_algo='max',
)
self._to_te_model(self._model.to(dtype=torch.float16))
else:
self._model = self._model.to(dtype=getattr(torch, precision.value))
if self._gpu_available:
self._model = self._model.cuda()
except Exception as e:
logger.error(
'Create model with specified precision failed - model: {}, precision: {}, message: {}.'.format(
self._name, precision, str(e)
)
)
return False
def _instantiate_model(self):
return MixtralBenchmarkModel(self._config, self._args.num_classes)
self._target = torch.LongTensor(self._args.batch_size).random_(self._args.num_classes)
def _postprocess_model(self, precision):
enable_fp8 = precision.name.startswith('FP8_')
if enable_fp8:
self._fp8_recipe = DelayedScaling(
fp8_format=Format[precision.name.strip('FP8_')],
amax_history_len=16,
amax_compute_algo='max',
)
self._to_te_model(self._model.to(dtype=torch.float16))
else:
self._model = self._model.to(dtype=getattr(torch, precision.value))
if self._gpu_available:
self._target = self._target.cuda()
self._model = self._model.cuda()
return True
def _setup_target(self):
"""Set up target tensor using the shared deterministic-aware helper."""
self._target = self._create_target(self._args.num_classes)
def _train_step(self, precision):
"""Define the training process.
......@@ -192,11 +207,11 @@ def _train_step(self, precision):
precision (Precision): precision of model and input data, such as float32, float16.
Return:
The step-time list of every training step.
A tuple of (step_times_ms, info) of every training step.
"""
duration = []
periodic = {'loss': [], 'act_mean': [], 'step': []}
curr_step = 0
check_frequency = 100
while True:
for idx, sample in enumerate(self._dataloader):
start = self._timer()
......@@ -210,17 +225,22 @@ def _train_step(self, precision):
output = self._model(sample)
else:
output = self._model(sample)
loss = self._loss_fn(output[range(self._args.batch_size), -1], self._target)
logits = output[range(self._args.batch_size), -1]
# Use FP32 logits for loss only when determinism is enabled; otherwise
# keep logits in their native precision to preserve benchmark semantics.
enable_determinism = getattr(self._args, 'enable_determinism', False)
logits_for_loss = logits.float() if enable_determinism else logits
loss = self._loss_fn(logits_for_loss, self._target)
loss.backward()
self._optimizer.step()
end = self._timer()
curr_step += 1
if curr_step > self._args.num_warmup:
# Save the step time of every training/inference step, unit is millisecond.
duration.append((end - start) * 1000)
self.record_determinism_fingerprint(curr_step, loss, logits, periodic, self._args.check_frequency)
self._log_step_time(curr_step, precision, duration)
if self._is_finished(curr_step, end, check_frequency):
return duration
if self._is_finished(curr_step, end, self._args.check_frequency):
return duration, self._finalize_periodic_logging(periodic)
def _inference_step(self, precision):
"""Define the inference process.
......@@ -254,5 +274,5 @@ def _inference_step(self, precision):
# Save the step time of every training/inference step, unit is millisecond.
duration.append((end - start) * 1000)
self._log_step_time(curr_step, precision, duration)
if self._is_finished(curr_step, end):
if self._is_finished(curr_step, end, self._args.check_frequency):
return duration
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Utility functions for deterministic model training and validation."""
def record_step_loss(loss, curr_step, losses_list, logger=None):
"""Record per-step loss value for determinism tracking.
Args:
loss: Loss tensor or float value.
curr_step (int): Current training step.
losses_list (list): List to append loss values to.
logger: Optional logger for warnings.
Returns:
float: Converted loss value, or None if conversion failed.
"""
try:
v = float(loss.detach().item()) if hasattr(loss, 'detach') else float(loss)
losses_list.append(v)
return v
except Exception:
if logger:
logger.info(f'Unable to convert loss to float at step {curr_step}')
losses_list.append(None)
return None
def _record_loss_fingerprint(curr_step, loss_value, periodic_dict, logger):
"""Record loss fingerprint at current step."""
try:
if 'loss' in periodic_dict and isinstance(periodic_dict['loss'], list):
periodic_dict['loss'].append(loss_value if loss_value is not None else None)
else:
periodic_dict['loss'] = [loss_value if loss_value is not None else None]
if logger:
logger.info(f'Loss at step {curr_step}: {loss_value}')
periodic_dict.setdefault('step', []).append(curr_step)
except Exception:
if logger:
logger.warning(f'Unable to log loss at curr_step {curr_step}')
def _record_activation_fingerprint(curr_step, logits, periodic_dict, logger):
"""Record activation mean fingerprint at current step."""
try:
if logits is not None:
act_mean = (
float(logits[0].detach().float().mean().item()) if hasattr(logits[0], 'detach') else float(logits[0])
)
if logger:
logger.info(f'ActMean at step {curr_step}: {act_mean}')
periodic_dict.setdefault('act_mean', []).append(act_mean)
else:
periodic_dict.setdefault('act_mean', []).append(None)
except Exception:
if logger:
logger.warning(f'Unable to log act_mean at curr_step {curr_step}')
periodic_dict.setdefault('act_mean', []).append(None)
def record_periodic_fingerprint(
curr_step, loss_value, logits, periodic_dict, check_frequency, enable_determinism, logger=None
):
"""Record periodic fingerprints (loss and activation mean) for deterministic runs.
Args:
curr_step (int): Current training step.
loss_value: Pre-converted loss float value (or None).
logits: Logits tensor for activation fingerprint.
periodic_dict (dict): Dictionary to store periodic data ('loss', 'act_mean', 'step').
check_frequency (int): Frequency for fingerprint logging.
enable_determinism (bool): Whether determinism is enabled.
logger: Optional logger for info/warnings.
"""
# Defensively handle invalid check_frequency values to avoid ZeroDivisionError and
# undefined behavior for non-positive frequencies.
if check_frequency is None or check_frequency <= 0:
if logger:
logger.warning(
f'Invalid check_frequency={check_frequency} at step {curr_step}; '
'skipping periodic fingerprint recording.'
)
return
if not enable_determinism or (curr_step % check_frequency != 0):
return
_record_loss_fingerprint(curr_step, loss_value, periodic_dict, logger)
_record_activation_fingerprint(curr_step, logits, periodic_dict, logger)
......@@ -421,6 +421,7 @@ def __create_single_node_summary(self, node_path): # pragma: no cover # noqa:
results_summary = self.__merge_benchmark_metrics(results_summary, reduce_ops)
monitor_summary = self.__merge_monitor_metrics(node_path)
results_summary = {**results_summary, **monitor_summary}
with (node_path / 'results-summary.json').open(mode='w') as f:
json.dump(results_summary, f, indent=2)
......
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Unified test for deterministic fingerprinting across all major PyTorch model benchmarks."""
from tests.helper import decorator
import os
import pytest
from superbench.benchmarks import BenchmarkRegistry, Platform, Framework, ReturnCode
# Set CUBLAS_WORKSPACE_CONFIG early to ensure deterministic cuBLAS behavior
os.environ.setdefault('CUBLAS_WORKSPACE_CONFIG', ':4096:8')
# Set PYTORCH_CUDA_ALLOC_CONF to avoid memory fragmentation
os.environ.setdefault('PYTORCH_CUDA_ALLOC_CONF', 'expandable_segments:True')
def run_deterministic_benchmark(model_name, params):
"""Helper to launch a deterministic benchmark and return the result."""
parameters = params + ' --enable_determinism --deterministic_seed 42 --check_frequency 10'
context = BenchmarkRegistry.create_benchmark_context(
model_name,
platform=Platform.CUDA,
parameters=parameters,
framework=Framework.PYTORCH,
)
benchmark = BenchmarkRegistry.launch_benchmark(context)
return benchmark
MODELS = [
(
'resnet18',
'--batch_size 2 --image_size 32 --num_classes 2 --num_warmup 1 --num_steps 20 '
'--model_action train --precision float32',
),
(
'lstm',
'--batch_size 1 --num_classes 2 --seq_len 4 --num_warmup 1 --num_steps 20 '
'--model_action train '
'--precision float32',
),
(
'gpt2-small',
'--batch_size 1 --num_classes 2 --seq_len 4 --num_warmup 1 --num_steps 20 '
'--model_action train --precision float32',
),
pytest.param(
'llama2-7b',
'--batch_size 1 --seq_len 1 --num_warmup 1 --num_steps 20 --precision float32 --model_action train',
marks=pytest.mark.skip(
reason='Requires >26GB GPU memory for 7B model, and float16 is incompatible with deterministic mode'
),
),
(
'mixtral-8x7b',
'--batch_size 1 --seq_len 4 --num_warmup 1 --num_steps 20 --precision float32 '
'--hidden_size 128 --max_position_embeddings 32 '
'--intermediate_size 256 --model_action train',
),
(
'bert-base',
'--batch_size 1 --num_classes 2 --seq_len 4 --num_warmup 1 --num_steps 20 '
'--model_action train --precision float32',
),
]
@decorator.cuda_test
@decorator.pytorch_test
@pytest.mark.parametrize('model_name, params', MODELS)
def test_pytorch_model_determinism(model_name, params):
"""Parameterised Test for PyTorch model determinism.
Tests that deterministic metrics (loss, activation mean) are correctly recorded
when --enable_determinism is enabled. Comparison against baseline should be done
offline using `sb result diagnosis`.
"""
benchmark = run_deterministic_benchmark(model_name, params)
assert benchmark and benchmark.return_code == ReturnCode.SUCCESS
# Check args
assert benchmark._args.enable_determinism is True
assert benchmark._args.deterministic_seed == 42
assert benchmark._args.check_frequency == 10
# Check that detailed per-step fingerprints are captured in _model_run_periodic
periodic = benchmark._model_run_periodic
assert isinstance(periodic, dict), '_model_run_periodic should be a dict'
for key in ('loss', 'act_mean', 'step'):
assert key in periodic, f"Key '{key}' missing in _model_run_periodic, got keys: {list(periodic.keys())}"
assert isinstance(periodic[key], list) and len(periodic[key]) > 0, \
f"Expected non-empty list for periodic['{key}']"
# Verify loss values are reasonable (not None or inf)
import math
for loss_val in periodic['loss']:
assert loss_val is not None, 'Loss value should not be None'
assert isinstance(loss_val, (int, float)), f'Loss should be numeric, got {type(loss_val)}'
if not math.isnan(loss_val):
assert loss_val < 1e6, f'Loss seems unreasonably large: {loss_val}'
# Verify deterministic metrics are in result (summarized form)
result = benchmark._result.result
metric_keys = [k for k in result.keys() if 'deterministic_' in k]
assert len(metric_keys) > 0, f'Expected deterministic metrics in result, got keys: {list(result.keys())}'
# Verify configuration parameters are in results for validation
config_keys = [k for k in result.keys() if 'deterministic_config_' in k]
assert len(config_keys) > 0, 'Expected deterministic_config metrics in result'
# Verify specific config values match the arguments
# Result values are stored as lists, so compare against list-wrapped values
assert result.get('deterministic_config_deterministic_seed') == [42], \
'deterministic_seed config should match args'
assert result.get('deterministic_config_check_frequency') == [10], \
'check_frequency config should match args'
assert 'deterministic_config_batch_size' in result, \
'batch_size should be in config metrics'
@decorator.cuda_test
@decorator.pytorch_test
@pytest.mark.parametrize('model_name, params', MODELS)
def test_pytorch_model_nondeterministic_default(model_name, params):
"""Parameterised Test for PyTorch model to verify non-determinism is default."""
context = BenchmarkRegistry.create_benchmark_context(
model_name,
platform=Platform.CUDA,
parameters=params,
framework=Framework.PYTORCH,
)
benchmark = BenchmarkRegistry.launch_benchmark(context)
assert (benchmark and benchmark.return_code == ReturnCode.SUCCESS), 'Benchmark did not run successfully.'
args = benchmark._args
assert getattr(args, 'enable_determinism', False) is False, 'Expected enable_determinism to be False by default.'
assert (getattr(args, 'check_frequency', None) == 100), 'Expected check_frequency to be 100 by default.'
# Periodic fingerprints exist but are empty when not deterministic
assert hasattr(benchmark, '_model_run_periodic'), 'Benchmark missing _model_run_periodic attribute.'
periodic = benchmark._model_run_periodic
assert isinstance(periodic, dict), '_model_run_periodic should be a dict.'
for key in ('loss', 'act_mean', 'step'):
assert key in periodic, f"Key '{key}' missing in _model_run_periodic."
assert (len(periodic[key]) == 0), f"Expected empty list for periodic['{key}'], got {periodic[key]}."
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Tests for model_log_utils module."""
from unittest.mock import Mock
from superbench.common import model_log_utils
class TestRecordStepLoss:
"""Tests for record_step_loss function."""
def test_record_loss_conversion_failure(self):
"""Test exception handling when loss conversion fails."""
logger = Mock()
losses_list = []
# Create a mock object that raises exception on conversion
bad_loss = Mock()
bad_loss.detach.side_effect = RuntimeError('Conversion failed')
result = model_log_utils.record_step_loss(bad_loss, curr_step=5, losses_list=losses_list, logger=logger)
assert result is None
assert losses_list == [None]
logger.info.assert_called_once_with('Unable to convert loss to float at step 5')
def test_record_loss_success(self):
"""Test successful loss recording."""
logger = Mock()
losses_list = []
# Create a mock tensor with detach and item methods
loss = Mock()
loss.detach.return_value.item.return_value = 2.5
result = model_log_utils.record_step_loss(loss, curr_step=10, losses_list=losses_list, logger=logger)
assert result == 2.5
assert losses_list == [2.5]
def test_record_loss_from_float(self):
"""Test recording loss from plain float value."""
losses_list = []
result = model_log_utils.record_step_loss(1.234, curr_step=1, losses_list=losses_list, logger=None)
assert result == 1.234
assert losses_list == [1.234]
class TestRecordPeriodicFingerprint:
"""Tests for record_periodic_fingerprint function."""
def test_skips_when_determinism_disabled(self):
"""Test that fingerprint is not recorded when determinism is disabled."""
periodic_dict = {}
model_log_utils.record_periodic_fingerprint(
curr_step=100,
loss_value=1.0,
logits=None,
periodic_dict=periodic_dict,
check_frequency=10,
enable_determinism=False,
logger=None
)
assert periodic_dict == {}
def test_skips_when_not_at_frequency(self):
"""Test that fingerprint is not recorded when not at check frequency."""
periodic_dict = {}
model_log_utils.record_periodic_fingerprint(
curr_step=15,
loss_value=1.0,
logits=None,
periodic_dict=periodic_dict,
check_frequency=10,
enable_determinism=True,
logger=None
)
assert periodic_dict == {}
def test_records_at_frequency(self):
"""Test that fingerprint is recorded at check frequency."""
periodic_dict = {}
model_log_utils.record_periodic_fingerprint(
curr_step=20,
loss_value=1.5,
logits=None,
periodic_dict=periodic_dict,
check_frequency=10,
enable_determinism=True,
logger=None
)
assert 'loss' in periodic_dict
assert periodic_dict['loss'] == [1.5]
assert 'step' in periodic_dict
assert periodic_dict['step'] == [20]
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment