Commit fd5309bc authored by yangzhong's avatar yangzhong
Browse files

bert-large infer

parents
Pipeline #3012 canceled with stages
import argparse
import subprocess
def main(config_dir, config_name, args):
subprocess.run(
["optimum-benchmark", "--config-dir", f"{config_dir}", "--config-name", f"{config_name}"]
+ ["hydra/job_logging=disabled", "hydra/hydra_logging=disabled"]
+ args
)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--config-dir", type=str, required=True, help="The path to the config directory.")
parser.add_argument("--config-name", type=str, required=True, help="The config name.")
args, unknown = parser.parse_known_args()
main(args.config_dir, args.config_name, unknown)
gpustat==1.1.1
psutil==6.0.0
psycopg2==2.9.9
torch>=2.4.0
hf_xet
pandas>=1.5.0
\ No newline at end of file
benchmark_results/
benchmark_results_profiles/
# Benchmarking v2
A comprehensive benchmarking framework for transformer models that supports multiple execution modes (eager, compiled, kernelized), detailed performance metrics collection, and structured output format.
## Quick Start
### Running All Benchmarks
```bash
# Run all benchmarks with default settings
python run_benchmarks.py
# Specify output directory
python run_benchmarks.py --output-dir my_results
# Run with custom parameters
python run_benchmarks.py \
--warmup-iterations 5 \
--measurement-iterations 10 \
--num-tokens-to-generate 200
```
### Uploading Results to HuggingFace Dataset
You can automatically upload benchmark results to a HuggingFace Dataset for tracking and analysis:
```bash
# Upload to a public dataset with auto-generated run ID
python run_benchmarks.py --upload-to-hub username/benchmark-results
# Upload with a custom run ID for easy identification
python run_benchmarks.py --upload-to-hub username/benchmark-results --run-id experiment_v1
# Upload with custom HuggingFace token (if not set in environment)
python run_benchmarks.py --upload-to-hub username/benchmark-results --token hf_your_token_here
```
**Dataset Directory Structure:**
```
dataset_name/
├── 2025-01-15/
│ ├── runs/ # Non-scheduled runs (manual, PR, etc.)
│ │ └── 123-1245151651/ # GitHub run number and ID
│ │ └── benchmark_results/
│ │ ├── benchmark_summary_20250115_143022.json
│ │ └── model-name/
│ │ └── model-name_benchmark_20250115_143022.json
│ └── benchmark_results_abc123de/ # Scheduled runs (daily CI)
│ ├── benchmark_summary_20250115_143022.json
│ └── model-name/
│ └── model-name_benchmark_20250115_143022.json
└── 2025-01-16/
└── ...
```
**Authentication for Uploads:**
For uploading results, you need a HuggingFace token with write permissions to the target dataset. You can provide the token in several ways (in order of precedence):
1. Command line: `--token hf_your_token_here`
3. Environment variable: `HF_TOKEN`
### Running Specific Benchmarks
```bash
# Include only specific benchmarks
python run_benchmarks.py --include llama
# Exclude specific benchmarks
python run_benchmarks.py --exclude old_benchmark
## Output Format
Results are saved as JSON files with the following structure:
```json
{
"model_name": "llama_2_7b",
"benchmark_scenarios": [
{
"scenario_name": "eager_variant",
"metadata": {
"timestamp": "2025-01-XX...",
"commit_id": "abc123...",
"hardware_info": {
"gpu_name": "NVIDIA A100",
"gpu_memory_total": 40960,
"cpu_count": 64
},
"config": {
"variant": "eager",
"warmup_iterations": 3,
"measurement_iterations": 5
}
},
"measurements": {
"latency": {
"mean": 2.45,
"median": 2.43,
"std": 0.12,
"min": 2.31,
"max": 2.67,
"p95": 2.61,
"p99": 2.65
},
"time_to_first_token": {
"mean": 0.15,
"std": 0.02
},
"tokens_per_second": {
"mean": 87.3,
"unit": "tokens/sec"
}
},
"gpu_metrics": {
"gpu_utilization_mean": 85.2,
"gpu_memory_used_mean": 12450
}
}
]
}
```
### Debug Mode
```bash
python run_benchmarks.py --log-level DEBUG
```
## Contributing
To add new benchmarks:
1. Create a new file in `benches/`
2. Implement the `ModelBenchmark` interface
3. Add a runner function (`run_<benchmark_name>` or `run_benchmark`)
4. run_benchmarks.py
\ No newline at end of file
import hashlib
import json
import logging
from typing import Any
KERNELIZATION_AVAILABLE = False
try:
from kernels import Mode, kernelize # noqa: F401
KERNELIZATION_AVAILABLE = True
except ImportError:
pass
logger = logging.getLogger(__name__)
class BenchmarkConfig:
"""Configuration for a single benchmark scenario."""
def __init__(
self,
warmup_iterations: int = 5,
measurement_iterations: int = 20,
gpu_monitoring: bool = True, # NOTE: you may want to disable this at times as we have obsvered it could heavily slow down benchmarks on AMD
batch_size: int = 1,
sequence_length: int = 128,
num_tokens_to_generate: int = 128,
attn_implementation: str = "eager",
sdpa_backend: str | None = None,
compile_mode: str | None = None,
compile_options: dict[str, Any] | None = None,
kernelize: bool = False,
name: str | None = None,
skip_validity_check: bool = False,
) -> None:
# Benchmark parameters
self.warmup_iterations = warmup_iterations
self.measurement_iterations = measurement_iterations
self.gpu_monitoring = gpu_monitoring
# Input parameters
self.batch_size = batch_size
self.sequence_length = sequence_length
self.num_tokens_to_generate = num_tokens_to_generate
# Generation parameters
self.attn_implementation = attn_implementation
self.sdpa_backend = sdpa_backend
# Optimization parameters
self.compile_mode = compile_mode
self.compile_options = compile_options if compile_options is not None else {}
self.kernelize = kernelize
# Constant parameters
self.dtype = "torch.bfloat16"
self.device = "cuda"
self.check_validity(skip_validity_check)
self.name = name if name is not None else self.infer_name()
def check_validity(self, skip_validity_check: bool = False) -> None:
if skip_validity_check:
return
# Flash attention does not support compile mode, so we turn it off # FIXME: it would be better to support it
is_fa = self.attn_implementation == "flash_attention_2"
is_fa |= self.attn_implementation == "sdpa" and self.sdpa_backend == "flash_attention"
if is_fa:
logger.warning("Flash attention does not support compile mode. Turning off compile mode.")
self.compile_mode = None
@property
def hash(self) -> str:
return hashlib.sha256(json.dumps(self.to_dict()).encode()).hexdigest()
def infer_name(self, compact: bool = True) -> str:
"""Infer a human-readable name for the benchmark config, either compact or verbose."""
if compact:
iter_str = f"w{self.warmup_iterations}_i{self.measurement_iterations}"
gpu_monitor_str = "monitored" if self.gpu_monitoring else "unmonitored"
dimensions_str = f"b{self.batch_size}_s{self.sequence_length}_n{self.num_tokens_to_generate}"
attn_code = self.attn_implementation
attn_code += f"_{self.sdpa_backend}" if self.attn_implementation == "sdpa" else ""
compile_str = f"compiled_{self.compile_mode}" if self.compile_mode is not None else "uncompiled"
kernelize_str = "kernelized" if self.kernelize else "unkernelized"
sep = "-"
else:
iter_str = f"{self.warmup_iterations} warmup, {self.measurement_iterations} iterations"
gpu_monitor_str = ("with" if self.gpu_monitoring else "no") + " GPU monitoring"
dimensions_str = f"batch size {self.batch_size}, sequence length {self.sequence_length}, {self.num_tokens_to_generate} generated tokens"
attn_code = f"{self.attn_implementation} attention"
attn_code += f" with {self.sdpa_backend} backend" if self.attn_implementation == "sdpa" else ""
compile_str = "compiled" if self.compile_mode is not None else "not compiled"
kernelize_str = "kernelized" if self.kernelize else "not kernelized"
sep = ", "
return sep.join([iter_str, gpu_monitor_str, dimensions_str, attn_code, compile_str, kernelize_str])
def to_dict(self) -> dict[str, Any]:
return {
"name": self.name,
"warmup_iterations": self.warmup_iterations,
"measurement_iterations": self.measurement_iterations,
"gpu_monitoring": self.gpu_monitoring,
"batch_size": self.batch_size,
"sequence_length": self.sequence_length,
"num_tokens_to_generate": self.num_tokens_to_generate,
"attn_implementation": self.attn_implementation,
"sdpa_backend": self.sdpa_backend,
"compile_mode": self.compile_mode,
"compile_options": self.compile_options | {}, # to avoid inplace modification of the original dict
"kernelize": self.kernelize,
}
@classmethod
def from_dict(cls, data: dict[str, Any], skip_validity_check: bool = False) -> "BenchmarkConfig":
return cls(
warmup_iterations=data.get("warmup_iterations", 5),
measurement_iterations=data.get("measurement_iterations", 20),
gpu_monitoring=data.get("gpu_monitoring", False),
batch_size=data.get("batch_size", 1),
sequence_length=data.get("sequence_length", 128),
num_tokens_to_generate=data.get("num_tokens_to_generate", 128),
attn_implementation=data.get("attn_implementation", "eager"),
sdpa_backend=data.get("sdpa_backend"),
compile_mode=data.get("compile_mode"),
compile_options=data.get("compile_options"),
kernelize=data.get("kernelize", False),
name=data.get("name"),
skip_validity_check=skip_validity_check,
)
def cross_generate_configs(
attn_impl_and_sdpa_backend: list[tuple[str, str | None]],
compiled_mode: list[str | None],
kernelized: list[bool],
warmup_iterations: int = 5,
measurement_iterations: int = 20,
batch_size: int = 1,
sequence_length: int = 128,
num_tokens_to_generate: int = 128,
gpu_monitoring: bool = True,
) -> list[BenchmarkConfig]:
# Create kwargs common to all configs
kwargs = {
"warmup_iterations": warmup_iterations,
"measurement_iterations": measurement_iterations,
"batch_size": batch_size,
"sequence_length": sequence_length,
"num_tokens_to_generate": num_tokens_to_generate,
"gpu_monitoring": gpu_monitoring,
}
# Cross-generate all combinations of attn_implementation, compiled_mode, and kernelized
configs = []
for attn_implementation, sdpa_backend in list(dict.fromkeys(attn_impl_and_sdpa_backend)):
for cm in list(dict.fromkeys(compiled_mode)):
for kernelize_on in list(dict.fromkeys(kernelized)):
config = BenchmarkConfig(
attn_implementation=attn_implementation,
sdpa_backend=sdpa_backend,
compile_mode=cm,
kernelize=kernelize_on,
**kwargs,
)
configs.append(config)
return configs
def generate_all_configs(
warmup_iterations: int = 5,
measurement_iterations: int = 20,
batch_size: int = 1,
sequence_length: int = 128,
num_tokens_to_generate: int = 128,
gpu_monitoring: bool = True,
) -> list[BenchmarkConfig]:
all_attn_implementations = [
("flash_attention_2", None),
("eager", None),
("sdpa", "math"),
("sdpa", "flash_attention"),
("flex_attention", None),
]
return cross_generate_configs(
attn_impl_and_sdpa_backend=all_attn_implementations,
compiled_mode=[None, "default", "reduce-overhead", "max-autotune", "max-autotune-no-cudagraphs"],
kernelized=[False, KERNELIZATION_AVAILABLE],
warmup_iterations=warmup_iterations,
measurement_iterations=measurement_iterations,
batch_size=batch_size,
sequence_length=sequence_length,
num_tokens_to_generate=num_tokens_to_generate,
gpu_monitoring=gpu_monitoring,
)
def generate_main_configs(
warmup_iterations: int = 5,
measurement_iterations: int = 20,
batch_size: int = 1,
sequence_length: int = 128,
num_tokens_to_generate: int = 128,
) -> list[BenchmarkConfig]:
# Create kwargs common to all configs
kwargs = {
"warmup_iterations": warmup_iterations,
"measurement_iterations": measurement_iterations,
"batch_size": batch_size,
"sequence_length": sequence_length,
"num_tokens_to_generate": num_tokens_to_generate,
}
return [ # TODO: test max-autotune instead of default
BenchmarkConfig(attn_implementation="flex_attention", compile_mode="default", gpu_monitoring=False, **kwargs),
BenchmarkConfig(attn_implementation="flex_attention", compile_mode="default", gpu_monitoring=True, **kwargs),
BenchmarkConfig(attn_implementation="eager", compile_mode="default", gpu_monitoring=True, **kwargs),
BenchmarkConfig(attn_implementation="flash_attention_2", gpu_monitoring=True, **kwargs),
]
import gc
import json
import logging
import os
import pathlib
import re
import tempfile
import time
from contextlib import nullcontext
from datetime import datetime
from queue import Queue
from typing import Any
import torch
from datasets import Dataset
from huggingface_hub import HfApi
from tqdm import trange
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
CompileConfig,
GenerationConfig,
GenerationMixin,
)
from transformers.generation.streamers import BaseStreamer
from .benchmark_config import BenchmarkConfig
from .data_classes import BenchmarkMetadata, BenchmarkResult, GPURawMetrics, pretty_print_dict
from .hardware_metrics import GPUMonitor
try:
from kernels import Mode, kernelize # noqa: F401
except ImportError:
kernelize = None
Mode = None
DEFAULT_PROMPT = "\n".join([
"The French Revolution was a period of political and societal change in France that began with the Estates General of 1789 and ended with the Coup of 18 Brumaire on 9 November 1799.",
"Many of the revolution's ideas are considered fundamental principles of liberal democracy, and its values remain central to modern French political discourse.",
"It was caused by a combination of social, political, and economic factors which the existing regime proved unable to manage.",
"Financial crisis and widespread social distress led to the convocation of the Estates General in May 1789, its first meeting since 1614.",
"The representatives of the Third Estate broke away and re-constituted themselves as a National Assembly in June.",
"The Storming of the Bastille in Paris on 14 July led to a series of radical measures by the Assembly, including the abolition of feudalism, state control over the Catholic Church in France, and issuing the Declaration of the Rights of Man and of the Citizen.",
"The next three years were dominated by a struggle for political control.",
"King Louis XVI's attempted flight to Varennes in June 1791 further discredited the monarchy, and military defeats after the outbreak of the French Revolutionary Wars in April 1792 led to the insurrection of 10 August 1792.",
"As a result, the monarchy was replaced by the French First Republic in September, followed by the execution of Louis XVI himself in January 1793.",
"After another revolt in June 1793, the constitution was suspended, and political power passed from the National Convention to the Committee of Public Safety, dominated by radical Jacobins led by Maximilien Robespierre.",
"About 16,000 people were sentenced by the Revolutionary Tribunal and executed in the Reign of Terror, which ended in July 1794 with the Thermidorian Reaction.",
"Weakened by external threats and internal opposition, the Committee of Public Safety was replaced in November 1795 by the Directory.",
"Its instability ended in the coup of 18 Brumaire and the establishment of the Consulate, with Napoleon Bonaparte as First Consul.",
]) # fmt: skip
PUSH_TO_HUB_TOKEN = os.getenv("PUSH_TO_HUB_TOKEN", None)
def compact_json_numeric_arrays(data: dict):
# Match arrays that contain only numbers (ints/floats), whitespace, commas, and newlines
pattern = r"\[\s*\n\s*((?:\d+(?:\.\d+)?\s*,\s*)*\d+(?:\.\d+)?)\s*\n\s*\]"
def replace_numeric_array(match):
# Get the array content
content = match.group(1)
# Remove extra whitespace but keep commas
compact_content = re.sub(r"\s+", " ", content).strip()
return f"[{compact_content}]"
return re.sub(pattern, replace_numeric_array, json.dumps(data, indent=4, default=str), flags=re.DOTALL)
def get_git_revision() -> str:
base_path = pathlib.Path(__file__).parent.parent.parent
git_dir = base_path / ".git"
with (git_dir / "HEAD").open("r") as head:
ref = head.readline().split(" ")[-1].strip()
with (git_dir / ref).open("r") as git_hash:
return git_hash.readline().strip()
def get_sdpa_backend(backend_name: str | None) -> torch.nn.attention.SDPBackend | None:
"""Get the SDPA backend enum from string name."""
if backend_name is None:
return None
try:
backend_map = {
"math": torch.nn.attention.SDPBackend.MATH,
"flash_attention": torch.nn.attention.SDPBackend.FLASH_ATTENTION,
"efficient_attention": torch.nn.attention.SDPBackend.EFFICIENT_ATTENTION,
"cudnn_attention": torch.nn.attention.SDPBackend.CUDNN_ATTENTION,
}
return backend_map.get(backend_name.lower())
except AttributeError:
# torch.nn.attention.SDPBackend not available in older torch versions
return None
def flush_memory():
"""Flush GPU memory and run garbage collection."""
gc.collect()
# Dynamo resets
torch._dynamo.reset()
torch._dynamo.reset_code_caches()
if hasattr(torch._inductor, "codecache"):
# Clear FX graph cache
if hasattr(torch._inductor.codecache, "FxGraphCache"):
torch._inductor.codecache.FxGraphCache.clear()
# Clear PyCodeCache
if hasattr(torch._inductor.codecache, "PyCodeCache"):
torch._inductor.codecache.PyCodeCache.cache_clear()
# Clear TritonFuture cache (for async compilation)
if hasattr(torch._inductor.codecache, "TritonFuture"):
if hasattr(torch._inductor.codecache.TritonFuture, "_compile_cache"):
torch._inductor.codecache.TritonFuture._compile_cache.clear()
# Clear CUDA cache
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.reset_max_memory_allocated()
torch.cuda.reset_peak_memory_stats()
torch.cuda.synchronize()
gc.collect()
class BenchmarkStreamer(BaseStreamer):
def __init__(self, **kwargs) -> None:
self.timeout = kwargs.pop("timeout", 10)
self.timestamps = []
self.text_queue = Queue()
self.stop_signal = None
def put(self, value):
"""Receives tokens and logs the timestamp of the generation."""
self.timestamps.append(time.perf_counter())
self.text_queue.put(value)
def end(self):
self.timestamps.append(time.perf_counter())
self.text_queue.put(self.stop_signal)
def __iter__(self):
return self
def __next__(self):
value = self.text_queue.get(timeout=self.timeout)
if value == self.stop_signal:
raise StopIteration()
else:
return value
class BenchmarkRunner:
"""Main benchmark runner that coordinates benchmark execution."""
def __init__(
self,
logger: logging.Logger,
output_dir: str | None = None,
branch_name: str | None = None,
commit_id: str | None = None,
commit_message: str | None = None,
) -> None:
# Those stay constant for the whole run
self.logger = logger
if output_dir is None:
output_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "benchmark_results")
self.output_dir = output_dir
self.branch_name = branch_name
self.commit_id = get_git_revision() if commit_id is None else commit_id
self.commit_message = commit_message
os.makedirs(self.output_dir, exist_ok=True)
self.profile_dir = None
# Attributes that are reset for each model
self._setup_for = ""
# Attributes that are reset for each run
self.model: GenerationMixin | None = None
def cleanup(self) -> None:
del self.model
self.model = None
flush_memory()
def setup_benchmark(self, model_id: str, config: BenchmarkConfig) -> None:
# Some attributes only need to be set once per model
if self._setup_for != model_id:
self.tokenizer = AutoTokenizer.from_pretrained(model_id)
# We set the EOS token to the padding token for open-ended generation
self.tokenizer.eos_token = self.tokenizer.pad_token
self._setup_for = model_id
# Prepare inputs
self.inputs = self.tokenizer(
[DEFAULT_PROMPT for _ in range(config.batch_size)],
return_tensors="pt",
max_length=config.sequence_length,
truncation=True,
return_attention_mask=True,
).to(config.device)
self.inputs["use_cache"] = True
# Prepare generation config
gen_config = GenerationConfig(
do_sample=False, top_p=1.0, temperature=1.0, max_new_tokens=config.num_tokens_to_generate
)
# Prepare compile config
if config.compile_mode is not None:
gen_config.compile_config = CompileConfig(mode=config.compile_mode, options=config.compile_options)
gen_config.cache_implementation = "static"
# Load model
self.logger.debug(f"Loading model {model_id} on device {config.device}...")
dtype = getattr(torch, config.dtype.removeprefix("torch."))
self.model = AutoModelForCausalLM.from_pretrained(
model_id, dtype=dtype, attn_implementation=config.attn_implementation, generation_config=gen_config
)
self.model = self.model.eval().to(config.device)
# Kernelize the model if needed
if config.kernelize and kernelize is not None and Mode is not None:
self.model = kernelize(self.model, mode=Mode.INFERENCE)
def run_benchmark(
self, model_id: str, config: BenchmarkConfig, num_tokens_to_profile: int = 0
) -> dict[str, Any] | None:
"""Run a single benchmark with the given model ID and config."""
sdpa_ctx = nullcontext()
if config.attn_implementation == "sdpa":
sdpa_backend = get_sdpa_backend(config.sdpa_backend)
sdpa_ctx = torch.nn.attention.sdpa_kernel(sdpa_backend)
with sdpa_ctx, torch.no_grad():
self.logger.info(f"Running benchmark scenario: {config.name}")
# Quick validation: try one measurement first to see if this scenario works
flush_memory()
e2e_latency, token_generation_times, shape_and_decoded_output, gpu_metrics = self.time_generate(
max_new_tokens=1, gpu_monitor=None
)
if e2e_latency < 0:
self.logger.warning(f"Skipping config {config.name}: {e2e_latency = } (no GPU monitoring)")
return None
# Warmup runs
self.logger.info(f"Warming up with {config.warmup_iterations} iterations...")
for _ in trange(config.warmup_iterations):
_ = self.time_generate(max_new_tokens=config.num_tokens_to_generate)
self.logger.info("Warmup over.")
# Measurement runs
result = BenchmarkResult()
self.logger.info(f"Benchmarking with {config.measurement_iterations} iterations.")
for _ in trange(config.measurement_iterations):
e2e_latency, token_generation_times, shape_and_decoded_output, gpu_metrics = self.time_generate(
max_new_tokens=config.num_tokens_to_generate,
gpu_monitor=(GPUMonitor(logger=self.logger) if config.gpu_monitoring else None),
)
result.accumulate(e2e_latency, token_generation_times, shape_and_decoded_output, gpu_metrics)
self.logger.info("Benchmarking done. Cleaning up.")
# Profile if needed
if num_tokens_to_profile > 0:
self.profile_generate(num_tokens_to_profile, config.name)
return {
"metadata": BenchmarkMetadata(
model_id=model_id,
branch_name=self.branch_name,
commit_id=self.commit_id,
commit_message=self.commit_message,
),
"measurements": result,
"config": config,
}
def time_generate(
self,
max_new_tokens: int,
gpu_monitor: GPUMonitor | None = None,
) -> tuple[float, list[float], str, GPURawMetrics | None]:
"""Time the latency of a call to model.generate() with the given (inputs) and (max_new_tokens)."""
# Prepare gpu monitoring if needed
if gpu_monitor is not None:
gpu_monitor.start()
# Prepare streamer
streamer = BenchmarkStreamer()
# Generate and time
wall_time_0 = time.perf_counter()
outputs = self.model.generate(
**self.inputs,
max_new_tokens=max_new_tokens,
streamer=streamer,
)
wall_time_1 = time.perf_counter()
# Stop gpu monitoring if needed
gpu_metrics = gpu_monitor.stop_and_collect() if gpu_monitor is not None else None
# Check if generation had the right number of tokens
input_tokens = self.inputs["input_ids"].size(-1)
batch_size, output_tokens = outputs.shape
new_tokens = output_tokens - input_tokens
if new_tokens != max_new_tokens:
raise RuntimeError(f"Generated {new_tokens} tokens, expected {max_new_tokens}")
# Decode outputs
decoded_output = self.tokenizer.decode(outputs[0, input_tokens:], skip_special_tokens=True)
shape_and_decoded_output = f"{tuple(outputs.shape)} | {decoded_output}"
# Compute intermediate quantities
e2e_latency = wall_time_1 - wall_time_0
token_generation_times = [t - wall_time_0 for t in streamer.timestamps[1:]]
return e2e_latency, token_generation_times, shape_and_decoded_output, gpu_metrics
def profile_generate(self, num_tokens_to_profile: int, config_name: str) -> None:
"""Profile the latency of a call to model.generate() with the given (inputs) and (max_new_tokens)."""
profiler = torch.profiler.profile(
activities=[torch.profiler.ProfilerActivity.CPU, torch.profiler.ProfilerActivity.CUDA],
record_shapes=True,
)
with profiler as prof:
_ = self.model.generate(
**self.inputs,
max_new_tokens=num_tokens_to_profile,
)
if self.profile_dir is None:
self.profile_dir = self.output_dir + "_profiles"
os.makedirs(self.profile_dir, exist_ok=True)
prof.export_chrome_trace(f"{self.profile_dir}/{config_name}.json")
def run_benchmarks(
self,
model_id: str,
benchmark_configs: list[BenchmarkConfig],
num_tokens_to_profile: int = 0,
pretty_print_summary: bool = True,
) -> tuple[str, dict[str, Any]]:
"""Run multiple benchmarks for the given model ID and list of benchmark configs."""
all_results = {}
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
start_time = time.perf_counter()
n_configs = len(benchmark_configs)
for i, config in enumerate(benchmark_configs):
# Handle SDPA backend if not determined by the config (needs to be done before skipping duplicates)
if config.attn_implementation == "sdpa" and config.sdpa_backend is None:
default_backend = "flash_attention" # FIXME: torch has a _cur_sdpa_kernel_backends but it fails
self.logger.warning(f"No SDPA backend provided, using {default_backend} instead.")
config.sdpa_backend = default_backend
# Skip if already run
if config.hash in all_results:
self.logger.info(f"Skipping duplicate config {config.name} for model {model_id} ({i + 1}/{n_configs})")
continue
# Otherwise, run the benchmark
self.setup_benchmark(model_id, config)
self.logger.info(
f"Running benchmark of model {model_id} with scenario: {config.name} ({i + 1}/{n_configs})"
)
# Launch benchmark in a try/except block to avoid stopping the whole run if one benchmark fails
try:
results = self.run_benchmark(model_id, config, num_tokens_to_profile)
if results is not None:
all_results[config.hash] = results
except Exception as e:
self.logger.error(f"Error running with scenario: {config.name}:\n{repr(e)}")
# Cleanup model and save results
self.cleanup()
self.save_results(model_id, all_results, timestamp=timestamp)
if pretty_print_summary:
print()
print("=" * 100)
print(f"Finished benchmarks in {time.perf_counter() - start_time:.2f} seconds")
print(f"Total number of benchmarks: {len(all_results)}")
if len(all_results) > 0:
print("First run metadata:")
first_key = list(all_results.keys())[0]
first_metadata = all_results[first_key]["metadata"].to_dict()
hardware_info = first_metadata.pop("hardware_info")
pretty_print_dict(first_metadata | hardware_info, tabs=1)
for result in all_results.values():
print("=" * 100)
print(f"Config: {result['config'].infer_name(compact=False)}\n")
result["measurements"].pprint(batch_size=result["config"].batch_size, tabs=1)
print("=" * 100)
return (timestamp, all_results)
def save_results(self, model_name: str, results: dict, timestamp: str = "") -> str:
"""Save benchmark results to JSON file."""
# Create model-specific subdirectory
model_name = model_name.replace("/", "_")
model_dir = os.path.join(self.output_dir, model_name)
os.makedirs(model_dir, exist_ok=True)
# Create filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") if not timestamp else timestamp
filename = f"{model_name}_benchmark_{timestamp}.json"
filepath = os.path.join(model_dir, filename)
# Convert results to dict
converted_results = {}
for cfg_hash in results.keys():
converted_results[cfg_hash] = {
"metadata": results[cfg_hash]["metadata"].to_dict(),
"measurements": results[cfg_hash]["measurements"].to_dict(),
"config": results[cfg_hash]["config"].to_dict(),
}
# Save to JSON file
with open(filepath, "w") as f:
f.write(compact_json_numeric_arrays(converted_results))
self.logger.info(f"Results saved to {filepath}")
return filepath
def push_results_to_hub(self, dataset_id: str, results: dict[Any, Any], timestamp: str) -> None:
if PUSH_TO_HUB_TOKEN is None:
raise ValueError(
"PUSH_TO_HUB_TOKEN is not set, cannot push results to the Hub. When setting dataset_id, please also set the PUSH_TO_HUB_TOKEN environment variable."
)
n_results = len(results)
self.logger.info(f"Pushing {n_results} results to: {dataset_id}")
rows = []
for cfg_hash, entry in results.items():
row = {
"benchmark_config_hash": cfg_hash,
"config": entry["config"].to_dict(),
"measurements": entry["measurements"].to_dict(),
"metadata": entry["metadata"].to_dict(),
}
rows.append(row)
ds = Dataset.from_list(rows)
with tempfile.TemporaryDirectory() as tmp:
jsonl_path = os.path.join(tmp, "data.jsonl")
with open(jsonl_path, "w") as f:
json_lines = []
for ex in ds:
json_lines.append(json.dumps(ex, ensure_ascii=False))
f.write("\n".join(json_lines))
api = HfApi()
# NOTE: we expect the repository to already exist
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") if not timestamp else timestamp
file_name = f"benchmark_run_{timestamp}.jsonl"
api.upload_file(
path_or_fileobj=jsonl_path,
path_in_repo=file_name,
repo_id=dataset_id,
repo_type="dataset",
token=PUSH_TO_HUB_TOKEN,
)
self.logger.info(f"Succesfully uploaded results to: {dataset_id}")
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any
import numpy as np
from .hardware_metrics import GPURawMetrics, HardwareInfo
def compute_basic_statistics(measurements: list[float]) -> dict[str, float]:
return {
"avg": np.mean(measurements),
"std": np.std(measurements),
"min": np.min(measurements),
"med": np.median(measurements),
"max": np.max(measurements),
"p95": np.percentile(measurements, 95),
}
def add_unit_to_duration(stats: dict[str, float]) -> dict[str, str]:
for key in list(stats.keys()):
value = stats[key]
if value > 3600:
stats[key] = f"{(value / 3600):.2f}hr"
elif value > 60:
stats[key] = f"{(value / 60):.2f}min"
elif value > 1:
stats[key] = f"{value:.2f}s"
elif value > 1e-3:
stats[key] = f"{(value * 1e3):.2f}ms"
elif value > 1e-6:
stats[key] = f"{(value * 1e6):.2f}us"
else:
stats[key] = f"{(value * 1e9):.2f}ns"
return stats
def equalize_lengths_and_collate(stats: list[dict[str, str]]) -> list[str]:
keys = ["avg", "std", "min", "med", "max", "p95"]
for key in keys:
max_length = max(len(stat[key]) for stat in stats)
for stat in stats:
stat[key] = stat[key].ljust(max_length, " ")
return [" ".join([f"{key}={stat[key]}" for key in keys]) for stat in stats]
def pretty_print_dict(data: dict[str, Any], tabs: int = 0) -> None:
max_key_length = max([len(key) for key in data.keys()])
for key, value in data.items():
tabs_str = " " * tabs
padded_key = key.ljust(max_key_length + 1, ".")
print(f"{tabs_str}{padded_key}: {value}")
@dataclass
class BenchmarkMetadata:
"""Metadata collected for each benchmark run."""
model_id: str
timestamp: str
branch_name: str
commit_id: str
commit_message: str
hardware_info: HardwareInfo
def __init__(self, model_id: str, commit_id: str, branch_name: str = "main", commit_message: str = "") -> None:
self.model_id = model_id
self.timestamp = datetime.now(timezone.utc).isoformat()
self.branch_name = branch_name
self.commit_id = commit_id
self.commit_message = commit_message
self.hardware_info = HardwareInfo()
def to_dict(self) -> dict[str, Any]:
return {
"model_id": self.model_id,
"timestamp": self.timestamp,
"branch_name": self.branch_name,
"commit_id": self.commit_id,
"commit_message": self.commit_message,
"hardware_info": self.hardware_info.to_dict(),
}
class BenchmarkResult:
"""Result from a series of benchmark runs."""
def __init__(self) -> None:
self.e2e_latency = []
self.token_generation_times = [] # time at which each token was generated (relative to start of the generation)
self.shape_and_decoded_outputs = []
self.gpu_metrics = []
def accumulate(
self,
e2e_latency: float,
token_generation_times: list[float],
shape_and_decoded_output: str,
gpu_metrics: GPURawMetrics | None,
) -> None:
self.e2e_latency.append(e2e_latency)
self.token_generation_times.append(token_generation_times)
self.shape_and_decoded_outputs.append(shape_and_decoded_output)
self.gpu_metrics.append(gpu_metrics)
def to_dict(self) -> dict[str, None | int | float]:
# Save GPU metrics as None if it contains only None values
if all(gm is None for gm in self.gpu_metrics):
gpu_metrics = None
else:
gpu_metrics = [gm.to_dict() for gm in self.gpu_metrics]
return {
"e2e_latency": self.e2e_latency,
"token_generation_times": self.token_generation_times,
"shape_and_decoded_outputs": self.shape_and_decoded_outputs,
"gpu_metrics": gpu_metrics,
}
@classmethod
def from_dict(cls, data: dict[str, None | int | float]) -> "BenchmarkResult":
# Handle GPU metrics, which is saved as None if it contains only None values
if data["gpu_metrics"] is None:
gpu_metrics = [None for _ in range(len(data["e2e_latency"]))]
else:
gpu_metrics = [GPURawMetrics.from_dict(gm) for gm in data["gpu_metrics"]]
# Create a new instance and accumulate the data
new_instance = cls()
for i in range(len(data["e2e_latency"])):
new_instance.accumulate(
e2e_latency=data["e2e_latency"][i],
token_generation_times=data["token_generation_times"][i],
shape_and_decoded_output=data["shape_and_decoded_outputs"][i],
gpu_metrics=gpu_metrics[i],
)
return new_instance
def get_measured_ttft(self) -> list[float]:
return [dt[0] for dt in self.token_generation_times if len(dt) > 0]
def get_measured_itl(self) -> list[float]:
return [(dt[-1] - dt[0]) / (len(dt) - 1) for dt in self.token_generation_times if len(dt) > 1]
def get_throughput(self, batch_size: int) -> float:
return [
batch_size * len(dt) / e2e_latency
for e2e_latency, dt in zip(self.e2e_latency, self.token_generation_times)
]
def pprint(self, batch_size: int = 0, tabs: int = 0) -> None:
stats_to_collate = [
add_unit_to_duration(compute_basic_statistics(self.e2e_latency)),
add_unit_to_duration(compute_basic_statistics(self.get_measured_ttft())),
add_unit_to_duration(compute_basic_statistics(self.get_measured_itl())),
]
if batch_size > 0:
throughput_stats = compute_basic_statistics(self.get_throughput(batch_size))
stats_to_collate.append({key: f"{value:.2f}tok/s" for key, value in throughput_stats.items()})
collated_stats = equalize_lengths_and_collate(stats_to_collate)
dict_to_pprint = {
"E2E Latency": collated_stats[0],
"Time to First Token": collated_stats[1],
"Inter-Token Latency": collated_stats[2],
}
if batch_size > 0:
dict_to_pprint["Throughput"] = collated_stats[3]
pretty_print_dict(dict_to_pprint, tabs=tabs)
import json
import logging
import subprocess
import sys
import threading
import time
from dataclasses import dataclass
from enum import Enum
from logging import Logger
import gpustat
import psutil
import torch
# Data class to hold the hardware information
def get_device_name_and_memory_total() -> tuple[str, float]:
"""Returns the name and memory total of GPU 0."""
device_name = torch.cuda.get_device_properties(0).name
device_memory_total = torch.cuda.get_device_properties(0).total_memory / 1024**3
return device_name, device_memory_total
class HardwareInfo:
"""A class to hold information about the hardware."""
def __init__(self) -> None:
# Retrieve GPU stats
try:
self.gpu_name, self.gpu_memory_total_gb = get_device_name_and_memory_total()
except Exception:
self.gpu_name, self.gpu_memory_total_gb = None, None
# Retrieve python, torch and CUDA version
self.python_version = f"{sys.version.split()[0]}"
self.torch_version = torch.__version__
if hasattr(torch, "cuda") and torch.cuda.is_available():
self.cuda_version = torch.version.cuda
else:
self.cuda_version = None
# Retrieve general hardware information
self.cpu_count = psutil.cpu_count()
self.memory_total_mb = int(psutil.virtual_memory().total / (1024 * 1024))
def to_dict(self) -> dict[str, None | int | float | str]:
return {
"gpu_name": self.gpu_name,
"gpu_memory_total_gb": self.gpu_memory_total_gb,
"python_version": self.python_version,
"torch_version": self.torch_version,
}
# Functions to get information about the GPU
def get_amd_gpu_stats() -> tuple[int, float]:
"""Returns the utilization and memory used of an AMD GPU, both in percent"""
rocm_smi_output = subprocess.check_output(["rocm-smi", "--json", "--showuse", "--showmeminfo", "VRAM"])
gpu_stats = json.loads(rocm_smi_output.decode("utf-8"))
gpu_stats = [
(card_id, stats["GPU use (%)"], stats["VRAM Total Used Memory (B)"]) for card_id, stats in gpu_stats.items()
]
gpu_stats.sort(key=lambda x: x[1], reverse=True)
return int(gpu_stats[0][1]), float(gpu_stats[0][2]) / 1024**3
def get_nvidia_gpu_stats() -> tuple[int, float]:
"""Returns the utilization and memory used of an NVIDIA GPU, both in percent"""
gpu_stats = gpustat.GPUStatCollection.new_query()
gpu_stats = gpu_stats[0]
return int(gpu_stats["utilization.gpu"]), float(gpu_stats["memory.used"]) / 1024**3
class GPUStatsCollector:
"""A class to get statistics about the GPU. It serves as a wrapper that holds the GPU total memory and its name,
which is used to call the right function to get the utilization and memory used."""
def __init__(self) -> None:
self.device_name, self.device_memory_total = get_device_name_and_memory_total()
# Monkey patch the get_utilization_and_memory_used method based on the GPU type
if "amd" in self.device_name.lower():
self.get_utilization_and_memory_used = get_amd_gpu_stats
elif "nvidia" in self.device_name.lower():
self.get_utilization_and_memory_used = get_nvidia_gpu_stats
else:
raise RuntimeError(f"Unsupported GPU: {self.device_name}")
def get_measurements(self) -> tuple[int, float]:
"""Get the utilization and memory used of the GPU, both in percent"""
raise NotImplementedError("This method is meant to be monkey patched during __init__")
# Simple data classes to hold the raw GPU metrics
class GPUMonitoringStatus(Enum):
"""Status of GPU monitoring."""
SUCCESS = "success"
FAILED = "failed"
NO_GPUS_AVAILABLE = "no_gpus_available"
NO_SAMPLES_COLLECTED = "no_samples_collected"
@dataclass
class GPURawMetrics:
"""Raw values for GPU utilization and memory used."""
utilization: list[float] # in percent
memory_used: list[float] # in GB
timestamps: list[float] # in seconds
timestamp_0: float # in seconds
monitoring_status: GPUMonitoringStatus
def to_dict(self) -> dict[str, None | int | float | str]:
return {
"utilization": self.utilization,
"memory_used": self.memory_used,
"timestamps": self.timestamps,
"timestamp_0": self.timestamp_0,
"monitoring_status": self.monitoring_status.value,
}
# Main class, used to monitor the GPU utilization during benchmark execution
class GPUMonitor:
"""Monitor GPU utilization during benchmark execution."""
def __init__(self, sample_interval_sec: float = 0.1, logger: Logger | None = None):
self.sample_interval_sec = sample_interval_sec
self.logger = logger if logger is not None else logging.getLogger(__name__)
self.num_available_gpus = torch.cuda.device_count()
if self.num_available_gpus == 0:
raise RuntimeError("No GPUs detected by torch.cuda.device_count().")
self.gpu_stats_getter = GPUStatsCollector()
def start(self):
"""Start monitoring GPU metrics."""
# Clear the stop event to enable monitoring
self.stop_event = threading.Event()
self.gpu_utilization = []
self.gpu_memory_used = []
self.timestamps = []
self.thread = threading.Thread(target=self._monitor_loop)
self.thread.start()
self.logger.debug("GPU monitoring started")
def stop_and_collect(self) -> GPURawMetrics:
"""Stop monitoring and return collected metrics."""
self.stop_event.set()
self.thread.join()
if self.gpu_utilization:
timestamp_0 = self.timestamps[0]
metrics = GPURawMetrics(
utilization=self.gpu_utilization,
memory_used=self.gpu_memory_used,
timestamps=[t - timestamp_0 for t in self.timestamps],
timestamp_0=timestamp_0,
monitoring_status=GPUMonitoringStatus.SUCCESS,
)
self.logger.debug(f"GPU monitoring completed: {len(self.gpu_utilization)} samples collected")
else:
metrics = GPURawMetrics(monitoring_status=GPUMonitoringStatus.NO_SAMPLES_COLLECTED)
return metrics
def _monitor_loop(self):
"""Background monitoring loop using threading.Event for communication."""
while not self.stop_event.is_set():
utilization, memory_used = self.gpu_stats_getter.get_utilization_and_memory_used()
self.gpu_utilization.append(utilization)
self.gpu_memory_used.append(memory_used)
self.timestamps.append(time.time())
if self.stop_event.wait(timeout=self.sample_interval_sec):
break
numpy>=1.21.0
psutil>=5.8.0
gpustat>=1.0.0
torch>=2.0.0
transformers>=4.30.0
datasets>=2.10.0
huggingface_hub>=0.16.0
#!/usr/bin/env python3
# Copyright 2025 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Top-level benchmarking script that automatically discovers and runs all benchmarks
in the ./benches directory, organizing outputs into model-specific subfolders.
"""
import argparse
import logging
import sys
import uuid
from framework.benchmark_config import BenchmarkConfig, generate_all_configs, generate_main_configs
from framework.benchmark_runner import BenchmarkRunner
if __name__ == "__main__":
# Parse arguments
parser = argparse.ArgumentParser()
parser.add_argument("--output-dir", type=str, default=None, help="Output dir for benchmark results")
parser.add_argument("--log-level", type=str, choices=["DEBUG", "INFO", "WARNING", "ERROR"], default="INFO")
parser.add_argument("--model-id", type=str, help="Specific model ID to benchmark (if supported by benchmarks)")
parser.add_argument("--warmup", "-w", type=int, default=3, help="Number of warmup iterations")
parser.add_argument("--iterations", "-i", type=int, default=10, help="Number of measurement iterations")
parser.add_argument("--batch-size", "-b", type=int, nargs="+", help="Batch size")
parser.add_argument("--sequence-length", "-s", type=int, nargs="+", help="Sequence length")
parser.add_argument("--num-tokens-to-generate", "-n", type=int, nargs="+", help="Number of tokens to generate")
parser.add_argument("--cross-generate", action="store_true", help="Cross-generate all combinations of configs")
parser.add_argument("--num-tokens-to-profile", "-p", type=int, default=0, help="Number of tokens to profile")
parser.add_argument("--branch-name", type=str, help="Git branch name")
parser.add_argument("--commit-id", type=str, help="Git commit ID (if not provided, will auto-detect from git)")
parser.add_argument("--commit-message", type=str, help="Git commit message")
parser.add_argument(
"--no-gpu-monitoring", action="store_true", help="Disables GPU monitoring during benchmark runs"
)
parser.add_argument(
"--push-result-to-dataset",
type=str,
default=None,
help="Name of the dataset to push results to. If not provided, results are not pushed to the Hub.",
)
args = parser.parse_args()
# Setup logging
benchmark_run_uuid = str(uuid.uuid4())[:8]
numeric_level = getattr(logging, args.log_level.upper())
handlers = [logging.StreamHandler(sys.stdout)]
logging.basicConfig(
level=numeric_level, format="[%(levelname)s - %(asctime)s] %(name)s: %(message)s", handlers=handlers
)
logger = logging.getLogger("benchmark_v2")
logger.info("Starting benchmark discovery and execution")
logger.info(f"Benchmark run UUID: {benchmark_run_uuid}")
logger.info(f"Output directory: {args.output_dir}")
# Error out if one of the arguments is not provided
if len(args.batch_size) * len(args.sequence_length) * len(args.num_tokens_to_generate) == 0:
raise ValueError(
"At least one of the arguments --batch-size, --sequence-length, or --num-tokens-to-generate is required"
)
# If there is only one (batch_size, sequence_length, num_tokens_to_generate), we benchmark across configs
elif len(args.batch_size) * len(args.sequence_length) * len(args.num_tokens_to_generate) == 1:
if args.cross_generate:
benchmark_configs = generate_all_configs(
warmup_iterations=args.warmup,
measurement_iterations=args.iterations,
batch_size=args.batch_size[0],
sequence_length=args.sequence_length[0],
num_tokens_to_generate=args.num_tokens_to_generate[0],
gpu_monitoring=not args.no_gpu_monitoring,
)
else:
benchmark_configs = generate_main_configs(
warmup_iterations=args.warmup,
measurement_iterations=args.iterations,
batch_size=args.batch_size[0],
sequence_length=args.sequence_length[0],
num_tokens_to_generate=args.num_tokens_to_generate[0],
)
# Otherwise, we benchmark across all combinations of dimensions
else:
main_config = generate_main_configs(
warmup_iterations=args.warmup,
measurement_iterations=args.iterations,
batch_size=args.batch_size[0],
sequence_length=args.sequence_length[0],
num_tokens_to_generate=args.num_tokens_to_generate[0],
)[0]
benchmark_configs = []
for num_tokens_to_generate in args.num_tokens_to_generate:
for sequence_length in args.sequence_length:
for batch_size in args.batch_size:
cfg_dict = main_config.to_dict()
cfg_dict["batch_size"] = batch_size
cfg_dict["sequence_length"] = sequence_length
cfg_dict["num_tokens_to_generate"] = num_tokens_to_generate
cfg_dict.pop("name")
benchmark_configs.append(BenchmarkConfig.from_dict(cfg_dict))
runner = BenchmarkRunner(
logger,
args.output_dir,
args.branch_name,
args.commit_id,
args.commit_message,
)
timestamp, results = runner.run_benchmarks(
args.model_id,
benchmark_configs,
args.num_tokens_to_profile,
pretty_print_summary=True,
)
dataset_id = args.push_result_to_dataset
if dataset_id is not None and len(results) > 0:
runner.push_results_to_hub(
dataset_id,
results,
timestamp,
)
# Copyright 2020 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# tests directory-specific settings - this file is run automatically
# by pytest before any tests are run
import doctest
import os
import sys
import warnings
from os.path import abspath, dirname, join
import _pytest
import pytest
from transformers.testing_utils import (
HfDoctestModule,
HfDocTestParser,
is_torch_available,
patch_testing_methods_to_collect_info,
patch_torch_compile_force_graph,
)
NOT_DEVICE_TESTS = {
"test_tokenization",
"test_tokenization_mistral_common",
"test_processing",
"test_beam_constraints",
"test_configuration_utils",
"test_data_collator",
"test_trainer_callback",
"test_trainer_utils",
"test_feature_extraction",
"test_image_processing",
"test_image_processor",
"test_image_transforms",
"test_optimization",
"test_retrieval",
"test_config",
"test_from_pretrained_no_checkpoint",
"test_keep_in_fp32_modules",
"test_gradient_checkpointing_backward_compatibility",
"test_gradient_checkpointing_enable_disable",
"test_torch_save_load",
"test_forward_signature",
"test_model_get_set_embeddings",
"test_model_main_input_name",
"test_correct_missing_keys",
"test_can_use_safetensors",
"test_load_save_without_tied_weights",
"test_tied_weights_keys",
"test_model_weights_reload_no_missing_tied_weights",
"test_can_load_ignoring_mismatched_shapes",
"test_model_is_small",
"ModelTest::test_pipeline_", # None of the pipeline tests from PipelineTesterMixin (of which XxxModelTest inherits from) are running on device
"ModelTester::test_pipeline_",
"/repo_utils/",
"/utils/",
}
# allow having multiple repository checkouts and not needing to remember to rerun
# `pip install -e '.[dev]'` when switching between checkouts and running tests.
git_repo_path = abspath(join(dirname(__file__), "src"))
sys.path.insert(1, git_repo_path)
# silence FutureWarning warnings in tests since often we can't act on them until
# they become normal warnings - i.e. the tests still need to test the current functionality
warnings.simplefilter(action="ignore", category=FutureWarning)
def pytest_configure(config):
config.addinivalue_line("markers", "is_pipeline_test: mark test to run only when pipelines are tested")
config.addinivalue_line("markers", "is_staging_test: mark test to run only in the staging environment")
config.addinivalue_line("markers", "accelerate_tests: mark test that require accelerate")
config.addinivalue_line("markers", "not_device_test: mark the tests always running on cpu")
config.addinivalue_line("markers", "torch_compile_test: mark test which tests torch compile functionality")
config.addinivalue_line("markers", "torch_export_test: mark test which tests torch export functionality")
os.environ["DISABLE_SAFETENSORS_CONVERSION"] = "true"
def pytest_collection_modifyitems(items):
for item in items:
if any(test_name in item.nodeid for test_name in NOT_DEVICE_TESTS):
item.add_marker(pytest.mark.not_device_test)
def pytest_addoption(parser):
from transformers.testing_utils import pytest_addoption_shared
pytest_addoption_shared(parser)
def pytest_terminal_summary(terminalreporter):
from transformers.testing_utils import pytest_terminal_summary_main
make_reports = terminalreporter.config.getoption("--make-reports")
if make_reports:
pytest_terminal_summary_main(terminalreporter, id=make_reports)
def pytest_sessionfinish(session, exitstatus):
# If no tests are collected, pytest exists with code 5, which makes the CI fail.
if exitstatus == 5:
session.exitstatus = 0
# Doctest custom flag to ignore output.
IGNORE_RESULT = doctest.register_optionflag("IGNORE_RESULT")
OutputChecker = doctest.OutputChecker
class CustomOutputChecker(OutputChecker):
def check_output(self, want, got, optionflags):
if IGNORE_RESULT & optionflags:
return True
return OutputChecker.check_output(self, want, got, optionflags)
doctest.OutputChecker = CustomOutputChecker
_pytest.doctest.DoctestModule = HfDoctestModule
doctest.DocTestParser = HfDocTestParser
if is_torch_available():
import torch
# The flag below controls whether to allow TF32 on cuDNN. This flag defaults to True.
# We set it to `False` for CI. See https://github.com/pytorch/pytorch/issues/157274#issuecomment-3090791615
torch.backends.cudnn.allow_tf32 = False
# patch `torch.compile`: if `TORCH_COMPILE_FORCE_FULLGRAPH=1` (or values considered as true, e.g. yes, y, etc.),
# the patched version will always run with `fullgraph=True`.
patch_torch_compile_force_graph()
if os.environ.get("PATCH_TESTING_METHODS_TO_COLLECT_OUTPUTS", "").lower() in ("yes", "true", "on", "y", "1"):
patch_testing_methods_to_collect_info()
# Dockers for `transformers`
In this folder you will find various docker files, and some subfolders.
- dockerfiles (ex: `consistency.dockerfile`) present under `~/docker` are used for our "fast" CIs. You should be able to use them for tasks that only need CPU. For example `torch-light` is a very light weights container (703MiB).
- subfolders contain dockerfiles used for our `slow` CIs, which *can* be used for GPU tasks, but they are **BIG** as they were not specifically designed for a single model / single task. Thus the `~/docker/transformers-pytorch-gpu` includes additional dependencies to allow us to run ALL model tests (say `librosa` or `tesseract`, which you do not need to run LLMs)
Note that in both case, you need to run `uv pip install -e .`, which should take around 5 seconds. We do it outside the dockerfile for the need of our CI: we checkout a new branch each time, and the `transformers` code is thus updated.
We are open to contribution, and invite the community to create dockerfiles with potential arguments that properly choose extras depending on the model's dependencies! :hugs:
FROM python:3.10-slim
ENV PYTHONDONTWRITEBYTECODE=1
USER root
ARG REF=main
RUN apt-get update && apt-get install -y time git g++ pkg-config make git-lfs
ENV UV_PYTHON=/usr/local/bin/python
RUN pip install uv && uv pip install --no-cache-dir -U pip setuptools GitPython
RUN uv pip install --no-cache-dir --upgrade 'torch<2.9' 'torchaudio' 'torchvision' --index-url https://download.pytorch.org/whl/cpu
RUN uv pip install --no-cache-dir pypi-kenlm
RUN uv pip install --no-cache-dir "git+https://github.com/huggingface/transformers.git@${REF}#egg=transformers[quality,testing,torch-speech,vision]"
RUN git lfs install
RUN uv pip uninstall transformers
RUN apt-get clean && rm -rf /var/lib/apt/lists/* && apt-get autoremove && apt-get autoclean
FROM python:3.10-slim
ENV PYTHONDONTWRITEBYTECODE=1
ARG REF=main
USER root
RUN apt-get update && apt-get install -y libsndfile1-dev espeak-ng time git cmake wget xz-utils build-essential g++5 libprotobuf-dev protobuf-compiler git-lfs curl
ENV UV_PYTHON=/usr/local/bin/python
RUN pip --no-cache-dir install uv && uv pip install --no-cache-dir -U pip setuptools
RUN wget https://github.com/ku-nlp/jumanpp/releases/download/v2.0.0-rc3/jumanpp-2.0.0-rc3.tar.xz
RUN tar xvf jumanpp-2.0.0-rc3.tar.xz
RUN mkdir jumanpp-2.0.0-rc3/bld
WORKDIR ./jumanpp-2.0.0-rc3/bld
RUN wget -LO catch.hpp https://github.com/catchorg/Catch2/releases/download/v2.13.8/catch.hpp
RUN mv catch.hpp ../libs/
RUN cmake .. -DCMAKE_INSTALL_PREFIX=/usr/local
RUN make install -j 10
WORKDIR /
RUN uv pip install --no-cache --upgrade 'torch<2.9' --index-url https://download.pytorch.org/whl/cpu
RUN uv pip install --no-cache-dir --no-deps accelerate --extra-index-url https://download.pytorch.org/whl/cpu
RUN uv pip install --no-cache-dir "git+https://github.com/huggingface/transformers.git@${REF}#egg=transformers[ja,testing,sentencepiece,spacy,ftfy,rjieba]" unidic unidic-lite
# spacy is not used so not tested. Causes to failures. TODO fix later
RUN uv run python -m unidic download
# fetch test data and hub objects within CircleCI docker images to reduce even more connections
# we don't need a full clone of `transformers` to run `fetch_hub_objects_for_ci.py`
# the data are downloaded to the directory `/test_data` and during CircleCI's CI runtime, we need to move them to the root of `transformers`
RUN mkdir test_data && cd test_data && curl -O https://raw.githubusercontent.com/huggingface/transformers/${REF}/utils/fetch_hub_objects_for_ci.py && python3 fetch_hub_objects_for_ci.py
RUN uv pip uninstall transformers
RUN apt-get clean && rm -rf /var/lib/apt/lists/*
RUN apt remove -y g++ cmake xz-utils libprotobuf-dev protobuf-compiler
FROM python:3.10-slim
ENV PYTHONDONTWRITEBYTECODE=1
ARG REF=main
USER root
RUN apt-get update && apt-get install -y --no-install-recommends libsndfile1-dev espeak-ng time git g++ cmake pkg-config openssh-client git-lfs ffmpeg curl
ENV UV_PYTHON=/usr/local/bin/python
RUN pip --no-cache-dir install uv && uv pip install --no-cache-dir -U pip setuptools
RUN uv pip install --no-cache-dir 'torch<2.9' 'torchaudio' 'torchvision' 'torchcodec<0.8' --index-url https://download.pytorch.org/whl/cpu
RUN uv pip install --no-deps timm accelerate --extra-index-url https://download.pytorch.org/whl/cpu
RUN uv pip install --no-cache-dir librosa "git+https://github.com/huggingface/transformers.git@${REF}#egg=transformers[sklearn,sentencepiece,vision,testing]" seqeval albumentations jiwer
# fetch test data and hub objects within CircleCI docker images to reduce even more connections
# we don't need a full clone of `transformers` to run `fetch_hub_objects_for_ci.py`
# the data are downloaded to the directory `/test_data` and during CircleCI's CI runtime, we need to move them to the root of `transformers`
RUN mkdir test_data && cd test_data && curl -O https://raw.githubusercontent.com/huggingface/transformers/${REF}/utils/fetch_hub_objects_for_ci.py && python3 fetch_hub_objects_for_ci.py
RUN uv pip uninstall transformers
RUN apt-get clean && rm -rf /var/lib/apt/lists/*
FROM python:3.10-slim
ENV PYTHONDONTWRITEBYTECODE=1
ARG REF=main
USER root
RUN apt-get update && apt-get install -y libsndfile1-dev espeak-ng time git libgl1 g++ tesseract-ocr git-lfs curl
ENV UV_PYTHON=/usr/local/bin/python
RUN pip --no-cache-dir install uv && uv pip install --no-cache-dir -U pip setuptools
RUN uv pip install --no-cache-dir 'torch<2.9' 'torchaudio' 'torchvision' --index-url https://download.pytorch.org/whl/cpu
RUN uv pip install --no-cache-dir --no-deps timm accelerate
RUN uv pip install -U --no-cache-dir pytesseract python-Levenshtein opencv-python nltk
# RUN uv pip install --no-cache-dir natten==0.15.1+torch210cpu -f https://shi-labs.com/natten/wheels
RUN uv pip install --no-cache-dir "git+https://github.com/huggingface/transformers.git@${REF}#egg=transformers[testing, vision]" 'scikit-learn' 'torch-stft' 'nose' 'dataset'
# RUN git clone https://github.com/facebookresearch/detectron2.git
# RUN python3 -m pip install --no-cache-dir -e detectron2
RUN uv pip install 'git+https://github.com/facebookresearch/detectron2.git@92ae9f0b92aba5867824b4f12aa06a22a60a45d3' --no-build-isolation
# fetch test data and hub objects within CircleCI docker images to reduce even more connections
# we don't need a full clone of `transformers` to run `fetch_hub_objects_for_ci.py`
# the data are downloaded to the directory `/test_data` and during CircleCI's CI runtime, we need to move them to the root of `transformers`
RUN mkdir test_data && cd test_data && curl -O https://raw.githubusercontent.com/huggingface/transformers/${REF}/utils/fetch_hub_objects_for_ci.py && python3 fetch_hub_objects_for_ci.py
RUN uv pip uninstall transformers
RUN apt-get clean && rm -rf /var/lib/apt/lists/*
FROM python:3.10-slim
ENV PYTHONDONTWRITEBYTECODE=1
ARG REF=main
USER root
RUN apt-get update && apt-get install -y --no-install-recommends libsndfile1-dev espeak-ng time git pkg-config openssh-client git ffmpeg curl
ENV UV_PYTHON=/usr/local/bin/python
RUN pip --no-cache-dir install uv && uv pip install --no-cache-dir -U pip setuptools
RUN uv pip install --no-cache-dir 'torch<2.9' 'torchaudio' 'torchvision' 'torchcodec<0.8' --index-url https://download.pytorch.org/whl/cpu
RUN uv pip install --no-deps timm accelerate --extra-index-url https://download.pytorch.org/whl/cpu
RUN uv pip install --no-cache-dir librosa "git+https://github.com/huggingface/transformers.git@${REF}#egg=transformers[sklearn,sentencepiece,vision,testing]"
# fetch test data and hub objects within CircleCI docker images to reduce even more connections
# we don't need a full clone of `transformers` to run `fetch_hub_objects_for_ci.py`
# the data are downloaded to the directory `/test_data` and during CircleCI's CI runtime, we need to move them to the root of `transformers`
RUN mkdir test_data && cd test_data && curl -O https://raw.githubusercontent.com/huggingface/transformers/${REF}/utils/fetch_hub_objects_for_ci.py && python3 fetch_hub_objects_for_ci.py
RUN uv pip uninstall transformers
FROM python:3.10-slim
ENV PYTHONDONTWRITEBYTECODE=1
ARG REF=main
USER root
RUN apt-get update && apt-get install -y time git
ENV UV_PYTHON=/usr/local/bin/python
RUN pip install uv
RUN uv pip install --no-cache-dir -U pip setuptools GitPython "git+https://github.com/huggingface/transformers.git@${REF}#egg=transformers[ruff]" urllib3
RUN apt-get install -y jq curl && apt-get clean && rm -rf /var/lib/apt/lists/*
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment