Commit 2a7c435f authored by sunzhq2's avatar sunzhq2
Browse files

init

parent 59a0ec90
#!/bin/bash
## 包含推理
# evalscope eval \
# --model 'qwen3-8B' \
# --api-url 'http://0.0.0.0:8000/v1/chat/completions' \
# --api-key 'EMPTY' \
# --datasets 'math_500' \
# --dataset-args '{"math_500": {"local_path": "/data1/sunzhq/llm-benchmark/MATH-500"}}' \
# --eval-batch-size 32 \
# --generation-config '{"batch_size": 32, "temperature": 0.0}' \
# --timeout 1800 \
evalscope eval \
--use-cache /data1/sunzhq/llm-benchmark/tools/evalscope-data \
--datasets math_500 \
--model-id qwen3-8B \
--no-timestamp \
--rerun-review
# --dataset-args '{"math_500": {"local_path": "/data1/sunzhq/llm-benchmark/MATH-500", "subset_list": ["Level 2"]}}' \
\ No newline at end of file
#!/bin/bash
output_dir=/data1/sunzhq/llm-benchmark/tools/evalscope-data
rm -rf ${output_dir}
python convert_data.py \
--perf /data1/sunzhq/llm-benchmark/results-1/performance_results/qwen3_8b_math500_perf.json \
--test /data1/sunzhq/llm-benchmark/MATH-500/test.jsonl \
--output ${output_dir}/predictions/qwen3-8B
# Copyright (c) Alibaba, Inc. and its affiliates.
"""
Default evaluator implementation for running benchmark evaluations.
This module provides the DefaultEvaluator class which orchestrates the entire
evaluation process including data loading, model inference, metric calculation,
and report generation.
"""
import os
import traceback
from collections import defaultdict
from dataclasses import dataclass
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
from evalscope.api.dataset import Dataset, Sample
from evalscope.api.evaluator import CacheManager, Evaluator, TaskState
from evalscope.api.metric import AggScore, SampleScore
from evalscope.api.registry import register_evaluator
from evalscope.constants import HEARTBEAT_INTERVAL_SEC
from evalscope.evaluator.batch_reviewer import BatchReviewer
from evalscope.report import Report, gen_table
from evalscope.utils.function_utils import run_in_threads_with_progress
from evalscope.utils.logger import get_logger
if TYPE_CHECKING:
from evalscope.api.benchmark import DataAdapter
from evalscope.api.model import Model
from evalscope.config import TaskConfig
from evalscope.utils.io_utils import OutputsStructure
logger = get_logger()
@dataclass
class _WorkItem:
"""
A work item for the unified evaluation pool.
Represents a single unit of work in the evaluation pipeline:
- If ``task_state`` is None: run full predict + review for ``sample``.
- If ``task_state`` is provided: prediction is already cached; run review only.
"""
subset: str
"""Subset this item belongs to."""
sample: Optional[Sample] = None
"""Sample to predict. Set when prediction is required."""
task_state: Optional[TaskState] = None
"""Cached task state. Set when only review is required."""
@property
def needs_predict(self) -> bool:
"""True when prediction has not yet been computed."""
return self.task_state is None
@dataclass
class _PoolContext:
"""
Carries state from :meth:`~DefaultEvaluator._collect_work_items` through
the evaluation phases, acting as the shared data contract between phases.
Attributes:
work_items: Work items to process in the unified pool.
cached_scores_by_subset: Fully cached (predict + review) scores per subset.
review_pending_by_subset: TaskStates whose prediction is cached but review
is still needed; populated only for ``use_batch_scoring`` benchmarks.
model_prediction_dir: Shared parent directory for prediction JSONL files.
total_cached: Number of samples already fully cached (skipped in pool).
"""
work_items: List[_WorkItem]
cached_scores_by_subset: Dict[str, List[SampleScore]]
review_pending_by_subset: Dict[str, List[TaskState]]
model_prediction_dir: str
total_cached: int
@property
def grand_total(self) -> int:
"""Total sample count across all subsets: cached + work-pool items."""
return self.total_cached + len(self.work_items)
@register_evaluator('default')
class DefaultEvaluator(Evaluator):
"""
Default Evaluator for running evaluations on benchmarks.
This evaluator handles the complete evaluation pipeline:
1. Loading datasets from all subsets
2. Flattening all subsets into a single unified work pool
3. Running model inference and metric calculation atomically per sample
4. Writing results to per-subset JSONL cache as each sample completes
5. Aggregating scores and generating the final report
Args:
benchmark: The data adapter for loading and processing data.
model: The model to be evaluated.
outputs: The output structure for saving evaluation results.
task_config: The task configuration.
"""
def __init__(
self,
benchmark: 'DataAdapter',
model: 'Model',
outputs: 'OutputsStructure',
task_config: 'TaskConfig',
):
# Store core components needed for evaluation
self.benchmark = benchmark
self.model = model
self.outputs = outputs
self.task_config = task_config
# Extract frequently used identifiers
self.benchmark_name = benchmark.name
"""Name of the benchmark being evaluated."""
self.model_name = task_config.model_id
"""ID of the model being evaluated."""
self.use_cache = task_config.use_cache
"""Whether to use cache for predictions."""
# Initialize cache manager for storing and retrieving cached results
self.cache_manager = CacheManager(
outputs=outputs,
model_name=self.model_name,
benchmark_name=self.benchmark_name,
)
# Initialize batch reviewer for benchmarks that use batch scoring
self.batch_reviewer = BatchReviewer(
benchmark=benchmark,
cache_manager=self.cache_manager,
task_config=task_config,
)
def eval(self) -> Report:
"""
Run the complete evaluation process.
All subsets are merged into a **single unified work pool** so slow
samples in one subset never block samples in another. Each sample is
predicted and reviewed atomically; results are persisted to per-subset
JSONL caches as they complete.
Returns:
Report: The complete evaluation report.
"""
logger.info(f'Start loading benchmark dataset: {self.benchmark_name}')
dataset_dict = {k: v for k, v in self.benchmark.load_dataset().items() if len(v) > 0}
if not dataset_dict:
logger.warning(f'No samples found in any subset of {self.benchmark_name}. Skipping.')
self.finalize()
return {}
subset_list = list(dataset_dict.keys())
logger.info(f'Start evaluating {len(dataset_dict)} subsets of {self.benchmark_name}: {subset_list}')
# Phase 1 – build unified work pool from all subsets
context = self._collect_work_items(dataset_dict)
logger.info(
f'Unified pool: {len(context.work_items)} items to process, '
f'{context.total_cached} already fully cached '
f'({context.grand_total} total across all subsets).'
)
# Phase 2 – execute unified thread pool (single progress bar)
results_by_subset = self._run_pool(context)
logger.info(f'Unified pool finished for {self.benchmark_name}.')
# Phase 3 – aggregate scores per subset (batch review happens here too)
agg_score_dict = self._aggregate_scores(dataset_dict, context, results_by_subset)
# Phase 4 – generate report
if not agg_score_dict:
logger.warning(
f'No valid scores generated for {self.benchmark_name} '
'(all samples filtered or empty subsets). Skipping report generation.'
)
report = {}
else:
logger.info('Generating report...')
report = self.get_report(agg_score_dict)
self.finalize()
logger.info(f'Benchmark {self.benchmark_name} evaluation finished.')
return report
# ------------------------------------------------------------------ #
# Phase helpers #
# ------------------------------------------------------------------ #
def _collect_work_items(self, dataset_dict: Dict[str, Dataset]) -> _PoolContext:
"""
Phase 1 – classify every sample across all subsets into a work tier.
Each sample falls into exactly one of three tiers:
1. **Fully cached** (predict + review on disk) → skipped; its score is
stored in ``cached_scores_by_subset``.
2. **Predict cached, review pending** → review-only :class:`_WorkItem`
(or placed in ``review_pending_by_subset`` for batch-scoring).
3. **Uncached** → full predict + review :class:`_WorkItem`.
Args:
dataset_dict: Mapping of subset name → dataset.
Returns:
A :class:`_PoolContext` ready for :meth:`_run_pool` and
:meth:`_aggregate_scores`.
"""
work_items: List[_WorkItem] = []
cached_scores_by_subset: Dict[str, List[SampleScore]] = defaultdict(list)
review_pending_by_subset: Dict[str, List[TaskState]] = defaultdict(list)
for subset, dataset in dataset_dict.items():
cached_pred_states, remaining_dataset = (
self.cache_manager.filter_prediction_cache(subset, dataset) if self.use_cache else ([], dataset)
)
if self.benchmark.use_batch_scoring:
# Prediction runs in the pool; review is deferred until all
# task_states for this subset are available (after pool).
for sample in remaining_dataset:
work_items.append(_WorkItem(subset=subset, sample=sample))
if self.use_cache and not self.task_config.rerun_review:
cached_scores, need_review = self.cache_manager.filter_review_cache(subset, cached_pred_states)
cached_scores_by_subset[subset].extend(cached_scores)
review_pending_by_subset[subset].extend(need_review)
else:
self.cache_manager.delete_review_cache(subset)
review_pending_by_subset[subset].extend(cached_pred_states)
else:
# Predict + review happen atomically per sample inside the pool.
if self.use_cache and not self.task_config.rerun_review:
cached_scores, need_review = self.cache_manager.filter_review_cache(subset, cached_pred_states)
cached_scores_by_subset[subset].extend(cached_scores)
for ts in need_review: # Tier 2: review-only items
work_items.append(_WorkItem(subset=subset, task_state=ts))
else:
self.cache_manager.delete_review_cache(subset)
for ts in cached_pred_states: # Prediction cached, review cleared
work_items.append(_WorkItem(subset=subset, task_state=ts))
# for sample in remaining_dataset: # Tier 3: full predict+review
# work_items.append(_WorkItem(subset=subset, sample=sample))
# 当启用 rerun_review 时,强制跳过所有需要推理的样本
if self.task_config.rerun_review:
if remaining_dataset:
logger.warning(
f'[Rerun review mode] Skipping {len(remaining_dataset)} samples in subset {subset!r} '
'due to missing cached predictions. They will NOT be inferred.'
)
else:
for sample in remaining_dataset: # Tier 3: full predict+review
work_items.append(_WorkItem(subset=subset, sample=sample))
model_prediction_dir = os.path.dirname(self.cache_manager.get_prediction_cache_path(next(iter(dataset_dict))))
total_cached = sum(len(v) for v in cached_scores_by_subset.values())
return _PoolContext(
work_items=work_items,
cached_scores_by_subset=cached_scores_by_subset,
review_pending_by_subset=review_pending_by_subset,
model_prediction_dir=model_prediction_dir,
total_cached=total_cached,
)
def _run_pool(self, context: _PoolContext) -> Dict[str, List[Tuple[TaskState, Optional[SampleScore]]]]:
"""
Phase 2 – execute the unified work pool under a single progress bar.
Each item is processed by :meth:`_process_work_item`; results are
immediately persisted by :meth:`_persist_result` and accumulated
into a per-subset bucket for downstream aggregation.
Args:
context: Pool context produced by :meth:`_collect_work_items`.
Returns:
Mapping of subset name → ``(task_state, sample_score)`` pairs in
completion order. ``sample_score`` is ``None`` for batch-scoring
benchmarks (review is deferred to :meth:`_aggregate_scores`).
"""
results_by_subset: Dict[str, List[Tuple[TaskState, Optional[SampleScore]]]] = \
defaultdict(list)
def worker(item: _WorkItem) -> Tuple[TaskState, Optional[SampleScore]]:
return self._process_work_item(item, context.model_prediction_dir)
def on_result(item: _WorkItem, result: Tuple[TaskState, Optional[SampleScore]]) -> None:
self._persist_result(item, *result)
results_by_subset[item.subset].append(result)
def on_error(item: _WorkItem, exc: Exception) -> None:
tb_str = traceback.format_exc()
logger.error(f'Processing item in subset={item.subset!r} failed: {exc}\nTraceback:\n{tb_str}')
if self.task_config.ignore_errors:
logger.warning('Error ignored, continuing with next sample.')
return
raise exc
run_in_threads_with_progress(
context.work_items,
worker,
desc=f'Evaluating[{self.benchmark_name}]',
max_workers=self.task_config.eval_batch_size,
log_interval=HEARTBEAT_INTERVAL_SEC,
on_result=on_result,
on_error=on_error,
skip_failed=True,
initial=context.total_cached,
total=context.grand_total,
)
return results_by_subset
def _process_work_item(self, item: _WorkItem, model_prediction_dir: str) -> Tuple[TaskState, Optional[SampleScore]]:
"""
Process a single work item: predict (if needed) then review.
Called concurrently inside the thread pool by :meth:`_run_pool`.
Override this method to inject custom logic around inference or scoring.
Args:
item: The work item to process.
model_prediction_dir: Directory for storing prediction output files.
Returns:
``(task_state, sample_score)`` where ``sample_score`` is ``None``
for batch-scoring benchmarks (review deferred).
"""
task_state = (
self._predict_sample(item.sample, model_prediction_dir) if item.needs_predict else item.task_state
)
sample_score = (None if self.benchmark.use_batch_scoring else self._review_task_state(task_state))
return task_state, sample_score
def _persist_result(
self,
item: _WorkItem,
task_state: TaskState,
sample_score: Optional[SampleScore],
) -> None:
"""
Persist a completed item’s results to the on-disk cache.
Called in the **main thread** by :meth:`_run_pool` immediately after
each item completes (no concurrent writes). Override to add custom
persistence logic.
Args:
item: The originating work item.
task_state: The completed task state (prediction output).
sample_score: The review score, or ``None`` for batch-scoring.
"""
if item.needs_predict:
model_result = self.cache_manager.save_prediction_cache(
item.subset, task_state, self.benchmark.save_metadata
)
logger.debug(f'Model result: \n{model_result.pretty_print()}')
if sample_score is not None:
review_result = self.cache_manager.save_review_cache(
subset=item.subset,
task_state=task_state,
sample_score=sample_score,
save_metadata=self.benchmark.save_metadata,
)
logger.debug(f'Review result: \n{review_result.pretty_print()}')
def _aggregate_scores(
self,
dataset_dict: Dict[str, Dataset],
context: _PoolContext,
results_by_subset: Dict[str, List[Tuple[TaskState, Optional[SampleScore]]]],
) -> Dict[str, List[AggScore]]:
"""
Phase 3 – aggregate per-sample scores into subset-level metrics.
For standard benchmarks the pool scores are combined with cached scores
and passed directly to ``benchmark.aggregate_scores``.
For batch-scoring benchmarks :meth:`BatchReviewer.review_subset` is invoked
first to produce final per-sample scores from all collected task states.
Args:
dataset_dict: Subset iteration order.
context: Pool context from :meth:`_collect_work_items`.
results_by_subset: Per-subset pool results from :meth:`_run_pool`.
Returns:
Mapping of subset name → aggregated scores. Empty subsets omitted.
"""
agg_score_dict: Dict[str, List[AggScore]] = {}
for subset in dataset_dict:
cached_scores = context.cached_scores_by_subset.get(subset, [])
pool_results = results_by_subset.get(subset, [])
if self.benchmark.use_batch_scoring:
pending = context.review_pending_by_subset.get(subset, [])
new_task_states = [ts for ts, _ in pool_results]
batch_scores = self.batch_reviewer.review_subset(
subset, pending + new_task_states, review_fn=self._review_task_state
)
all_scores = cached_scores + batch_scores
else:
new_scores = [sc for _, sc in pool_results if sc is not None]
all_scores = cached_scores + new_scores
if not all_scores:
logger.info(f'No valid scores generated for subset: {subset}, skipping.')
continue
logger.info(f'Aggregating scores for subset: {subset}')
agg_score_dict[subset] = self.benchmark.aggregate_scores(sample_scores=all_scores)
return agg_score_dict
def _predict_sample(self, sample: Sample, model_prediction_dir: str) -> TaskState:
"""
Run model inference on a single sample.
Args:
sample: The sample to predict.
model_prediction_dir: Directory for storing model predictions.
Returns:
TaskState: The task state containing the prediction result.
"""
logger.debug(f'\n{sample.pretty_print()}')
task_state = self.benchmark.run_inference(model=self.model, sample=sample, output_dir=model_prediction_dir)
return task_state
def _review_task_state(self, task_state: TaskState) -> SampleScore:
"""
Compute evaluation metrics for a single task state.
Args:
task_state: The task state to review.
Returns:
SampleScore: The evaluation score for the task state.
"""
sample_score = self.benchmark.calculate_metrics(task_state=task_state)
return sample_score
def get_report(self, agg_score_dict: Dict[str, List[AggScore]]) -> Report:
"""
Generate a comprehensive evaluation report from aggregated scores.
This method handles:
1. Creating the evaluation report from scores
2. Generating and displaying a summary table
3. Optionally generating detailed analysis
4. Saving the report to file
Args:
agg_score_dict: Dictionary mapping subset names to their aggregated scores.
Returns:
Report: The complete evaluation report.
"""
assert agg_score_dict, 'No scores to generate report from.'
# Get paths for saving the report
report_path = self.cache_manager.get_report_path()
report_file = self.cache_manager.get_report_file()
# Generate the main evaluation report using benchmark-specific logic
report = self.benchmark.generate_report(
scores=agg_score_dict, model_name=self.model_name, output_dir=report_path
)
# Generate and display a summary table of results
try:
report_table = gen_table(report_list=[report], add_overall_metric=self.benchmark.add_overall_metric)
logger.info(f'\n{self.benchmark_name} report table:'
f'\n{report_table} \n')
except Exception:
logger.error('Failed to generate report table.')
# Generate detailed analysis if requested in configuration
if self.task_config.analysis_report:
logger.info('Generating report analysis, please wait ...')
analysis = report.generate_analysis(self.task_config)
logger.info(f'Report analysis:\n{analysis}')
else:
logger.info('Skipping report analysis (`analysis_report=False`).')
# Save the complete report to file
report.to_json(report_file)
logger.info(f'Dump report to: {report_file} \n')
return report
def finalize(self, *args, **kwargs):
self.benchmark.finalize(*args, **kwargs)
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""
This module defines a framework for sampling benchmark requests from various
datasets. Each dataset subclass of BenchmarkDataset must implement sample
generation. Supported dataset types include:
- ShareGPT
- Random (synthetic)
- Sonnet
- BurstGPT
- HuggingFace
- VisionArena
"""
import argparse
import ast
import base64
import io
import json
import logging
import math
import random
from abc import ABC, abstractmethod
from collections.abc import Iterator, Mapping
from contextlib import suppress
from copy import deepcopy
from dataclasses import dataclass
from functools import cache
from io import BytesIO
from typing import Any, Callable, Optional, Union, cast
import numpy as np
from PIL import Image
from transformers import PreTrainedTokenizerBase
from typing_extensions import deprecated
from vllm.lora.request import LoRARequest
from vllm.lora.utils import get_adapter_absolute_path
from vllm.multimodal import MultiModalDataDict
from vllm.multimodal.image import convert_image_mode
from vllm.transformers_utils.tokenizer import AnyTokenizer
from vllm.utils import PlaceholderModule
try:
from datasets import load_dataset
except ImportError:
datasets = PlaceholderModule("datasets")
load_dataset = datasets.placeholder_attr("load_dataset")
try:
import pandas as pd
except ImportError:
pd = PlaceholderModule("pandas")
try:
import librosa
except ImportError:
librosa = PlaceholderModule("librosa")
try:
from vllm.utils import FlexibleArgumentParser
except ImportError:
from argparse import ArgumentParser as FlexibleArgumentParser
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Data Classes
# -----------------------------------------------------------------------------
# @dataclass
# class SampleRequest:
# """
# Represents a single inference request for benchmarking.
# """
# prompt: Union[str, list[str]]
# prompt_len: int
# expected_output_len: int
# multi_modal_data: Optional[
# Union[MultiModalDataDict, dict, list[dict]]
# ] = None
# lora_request: Optional[LoRARequest] = None
# request_id: Optional[str] = None
@dataclass
class SampleRequest:
"""Represents a single inference request for benchmarking."""
prompt: Union[str, list[str]]
prompt_len: int
expected_output_len: int
multi_modal_data: Optional[Union[MultiModalDataDict, dict, list[dict]]] = None
lora_request: Optional[LoRARequest] = None
request_id: Optional[str] = None
# 新增字段:用于存储采样参数和元数据
sampling_params: Optional[dict] = None
metadata: Optional[dict] = None
# -----------------------------------------------------------------------------
# Benchmark Dataset Base Class
# -----------------------------------------------------------------------------
class BenchmarkDataset(ABC):
DEFAULT_SEED = 0
IS_MULTIMODAL = False
def __init__(
self,
dataset_path: Optional[str] = None,
random_seed: int = DEFAULT_SEED,
) -> None:
"""
Initialize the BenchmarkDataset with an optional dataset path and random
seed.
Args:
dataset_path (Optional[str]): Path to the dataset. If None, it
indicates that a default or random dataset might be used.
random_seed (int): Seed value for reproducible shuffling or
sampling. Defaults to DEFAULT_SEED.
"""
self.dataset_path = dataset_path
# Set the random seed, ensuring that a None value is replaced with the
# default seed.
self.random_seed = (random_seed
if random_seed is not None else self.DEFAULT_SEED)
self.data = None
def apply_multimodal_chat_transformation(
self,
prompt: str,
mm_content: Optional[
Union[MultiModalDataDict, dict, list[dict]]
] = None) -> list[dict]:
"""
Transform a prompt and optional multimodal content into a chat format.
This method is used for chat models that expect a specific conversation
format.
"""
content = [{"text": prompt, "type": "text"}]
if mm_content is not None:
if isinstance(mm_content, list):
content.extend(cast(list[dict[str, Any]], mm_content))
elif isinstance(mm_content, dict):
content.append(mm_content)
else:
raise TypeError(
"Could not process multimodal content of type: " +
f"{type(mm_content)}"
)
return [{"role": "user", "content": content}]
def load_data(self) -> None:
"""
Load data from the dataset path into self.data.
This method must be overridden by subclasses since the method to load
data will vary depending on the dataset format and source.
Raises:
NotImplementedError: If a subclass does not implement this method.
"""
# TODO (jenniferzhao): add support for downloading data
raise NotImplementedError(
"load_data must be implemented in subclasses.")
def get_random_lora_request(
self,
max_loras: Optional[int] = None,
lora_path: Optional[str] = None,
) -> Optional[LoRARequest]:
"""
Optionally select a random LoRA request.
This method is used when LoRA parameters are provided. It randomly
selects a LoRA based on max_loras.
Args:
max_loras (Optional[int]): The maximum number of LoRAs available.
If `None`, LoRA is not used.
lora_path (Optional[str]): Path to the LoRA parameters on disk.
If `None`, LoRA is not used.
Returns:
A new [`LoRARequest`][vllm.lora.request.LoRARequest]
(or `None` if not applicable).
"""
if max_loras is None or lora_path is None:
return None
# Generate a random LoRA ID in the range [1, max_loras].
lora_id = random.randint(1, max_loras)
lora_request = LoRARequest(
lora_name=str(lora_id),
lora_int_id=lora_id,
lora_path=lora_path_on_disk(lora_path),
)
return lora_request
@abstractmethod
def sample(self, tokenizer: PreTrainedTokenizerBase,
num_requests: int,
request_id_prefix: str = "",
no_oversample: bool = False) -> list[SampleRequest]:
"""
Abstract method to generate sample requests from the dataset.
Subclasses must override this method to implement dataset-specific logic
for generating a list of SampleRequest objects.
Args:
tokenizer (PreTrainedTokenizerBase): The tokenizer to be used
for processing the dataset's text.
num_requests (int): The number of sample requests to generate.
request_id_prefix (str): The prefix of request_id.
Returns:
list[SampleRequest]: A list of sample requests generated from the
dataset.
"""
raise NotImplementedError("sample must be implemented in subclasses.")
def maybe_oversample_requests(
self,
requests: list[SampleRequest],
num_requests: int,
request_id_prefix: str = "",
no_oversample: bool = False,
) -> None:
"""
Oversamples the list of requests if its size is less than the desired
number.
Args:
requests (List[SampleRequest]): The current list of sampled
requests.
num_requests (int): The target number of requests.
request_id_prefix (str): The prefix applied to generated request
identifiers.
"""
if no_oversample:
logger.info("Skipping oversampling. " \
"Total samples: %d.", len(requests))
return
if len(requests) < num_requests:
random.seed(self.random_seed)
additional = deepcopy(
random.choices(requests, k=num_requests - len(requests))
)
for i in range(len(additional)):
req = additional[i]
req.request_id = request_id_prefix + str(len(requests) + i)
requests.extend(additional)
logger.info("Oversampled requests to reach %d total samples.",
num_requests)
# -----------------------------------------------------------------------------
# Utility Functions and Global Caches
# -----------------------------------------------------------------------------
def is_valid_sequence(
prompt_len: int,
output_len: int,
min_len: int = 4,
max_prompt_len: int = 1024,
max_total_len: int = 2048,
skip_min_output_len_check: bool = False,
) -> bool:
"""
Validate a sequence based on prompt and output lengths.
Default pruning criteria are copied from the original `sample_hf_requests`
and `sample_sharegpt_requests` functions in benchmark_serving.py, as well as
from `sample_requests` in benchmark_throughput.py.
"""
# Check for invalid conditions
prompt_too_short = prompt_len < min_len
output_too_short = (not skip_min_output_len_check) and (output_len
< min_len)
prompt_too_long = prompt_len > max_prompt_len
combined_too_long = (prompt_len + output_len) > max_total_len
# Return True if none of the invalid conditions are met
return not (prompt_too_short or output_too_short or prompt_too_long
or combined_too_long)
@cache
def lora_path_on_disk(lora_path: str) -> str:
return get_adapter_absolute_path(lora_path)
# Global cache for LoRA tokenizers.
lora_tokenizer_cache: dict[int, AnyTokenizer] = {}
def process_image(image: Any) -> Mapping[str, Any]:
"""
Process a single image input and return a multimedia content dictionary.
Supports the following input types:
1. Dictionary with raw image bytes: - Expects a dict with a 'bytes' key
containing raw image data. - Loads the bytes as a PIL.Image.Image.
2. PIL.Image.Image input: - Converts the image to RGB. - Saves the image as
a JPEG in memory. - Encodes the JPEG data as a base64 string. - Returns
a dictionary with the image as a base64 data URL.
3. String input: - Treats the string as a URL or local file path. -
Prepends "file://" if the string doesn't start with "http://" or
"file://". - Returns a dictionary with the image URL.
Raises:
ValueError: If the input is not a supported type.
"""
if isinstance(image, dict) and 'bytes' in image:
image = Image.open(BytesIO(image['bytes']))
if isinstance(image, Image.Image):
image = convert_image_mode(image, "RGB")
with io.BytesIO() as image_data:
image.save(image_data, format="JPEG")
image_base64 = base64.b64encode(
image_data.getvalue()).decode("utf-8")
return {
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_base64}"
},
}
if isinstance(image, str):
image_url = (image if image.startswith(
("http://", "https://", "file://")) else f"file://{image}")
return {"type": "image_url", "image_url": {"url": image_url}}
raise ValueError(f"Invalid image input {image}. Must be a PIL.Image.Image"
" or str or dictionary with raw image bytes.")
def process_video(video: Any) -> Mapping[str, Any]:
"""
Process a single video input and return a multimedia content dictionary.
Supports the following input types:
1. Dictionary with raw video bytes: - Expects a dict with a 'bytes' key
containing raw video data.
2. String input: - Treats the string as a URL or local file path. -
Prepends "file://" if the string doesn't start with "http://" or
"file://". - Returns a dictionary with the image URL.
Raises:
ValueError: If the input is not a supported type.
"""
if isinstance(video, dict) and 'bytes' in video:
video_bytes = video['bytes']
video_base64 = base64.b64encode(video_bytes).decode("utf-8")
return {
"type": "video_url",
"video_url": {
"url": f"data:video/mp4;base64,{video_base64}"
},
}
if isinstance(video, str):
video_url = (video if video.startswith(
("http://", "https://", "file://")) else f"file://{video}")
return {"type": "video_url", "video_url": {"url": video_url}}
raise ValueError(
f"Invalid video input {video}. Must be a string of local path/remote url, or a dictionary with raw video bytes in the form of `{{'bytes': raw_video_bytes}}`." # noqa: E501
)
# -----------------------------------------------------------------------------
# Random Dataset Implementation (Synthetic Data)
# -----------------------------------------------------------------------------
class RandomDataset(BenchmarkDataset):
"""
Synthetic text-only dataset for serving/throughput benchmarks.
Strategy:
- Sample input/output token lengths per request from integer-uniform ranges
around configured means (controlled by range_ratio).
- Prepend a fixed random prefix of length prefix_len.
- Generate the remaining tokens as a reproducible sequence:
(offset + index + arange(input_len)) % vocab_size.
- Decode then re-encode/truncate to ensure prompt token counts match.
- Uses numpy.default_rng seeded with random_seed for reproducible sampling.
"""
# Default values copied from benchmark_serving.py for the random dataset.
DEFAULT_PREFIX_LEN = 0
DEFAULT_RANGE_RATIO = 0.0
DEFAULT_INPUT_LEN = 1024
DEFAULT_OUTPUT_LEN = 128
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
# Use numpy's default_rng for deterministic sampling
# Do not use random.seed() or np.random.seed() elsewhere in this class.
# This ensures that the RNG is isolated from global RNG state.
self._rng = np.random.default_rng(self.random_seed)
def sample(
self,
tokenizer: PreTrainedTokenizerBase,
num_requests: int,
request_id_prefix: str = "",
no_oversample: bool = False,
prefix_len: int = DEFAULT_PREFIX_LEN,
range_ratio: float = DEFAULT_RANGE_RATIO,
input_len: int = DEFAULT_INPUT_LEN,
output_len: int = DEFAULT_OUTPUT_LEN,
batchsize: int = 1,
**kwargs,
) -> list[SampleRequest]:
input_lens, output_lens, offsets = self.get_sampling_params(
num_requests, range_ratio, input_len, output_len, tokenizer
)
# Generate prefix once
prefix_token_ids = self.get_prefix(tokenizer, prefix_len)
vocab_size = tokenizer.vocab_size
requests = []
for i in range(num_requests):
prompt, total_input_len = self.generate_token_sequence(
tokenizer=tokenizer,
prefix_token_ids=prefix_token_ids,
prefix_len=prefix_len,
vocab_size=vocab_size,
input_len=int(input_lens[i]),
offset=int(offsets[i]),
index=i,
)
requests.append(
SampleRequest(
prompt=prompt,
prompt_len=total_input_len,
expected_output_len=int(output_lens[i]),
request_id=request_id_prefix + str(i),
)
)
# only used for embeddings benchmark.
if batchsize > 1:
batch_requests = []
# Create batched requests
for i in range(0, num_requests, batchsize):
batch = requests[i : i + batchsize]
batch_requests.append(
SampleRequest(
prompt=[req.prompt for req in batch],
prompt_len=sum(req.prompt_len for req in batch),
expected_output_len=0,
request_id=request_id_prefix + str(i // batchsize),
)
)
requests = batch_requests
return requests
def get_prefix(
self, tokenizer: PreTrainedTokenizerBase, prefix_len: int
) -> list[int]:
"""
Get the prefix for the dataset.
"""
return (
self._rng.integers(
0, tokenizer.vocab_size, size=prefix_len).tolist()
if prefix_len > 0
else []
)
def get_sampling_params(
self,
num_requests: int,
range_ratio: float,
input_len: int,
output_len: int,
tokenizer: PreTrainedTokenizerBase,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Get the sampling parameters for the dataset.
"""
# Enforce range_ratio < 1
if not (0.0 <= range_ratio < 1.0):
raise ValueError("range_ratio must be in [0, 1).")
num_special_tokens = int(tokenizer.num_special_tokens_to_add())
real_input_len = max(0, int(input_len) - num_special_tokens)
# Bounds use floor for low and ceil for high
input_low = math.floor(real_input_len * (1 - range_ratio))
input_high = math.ceil(real_input_len * (1 + range_ratio))
output_low = math.floor(output_len * (1 - range_ratio))
output_high = math.ceil(output_len * (1 + range_ratio))
# Ensure the lower bound for output length is at least 1 to
# prevent sampling 0 tokens.
output_low = max(output_low, 1)
if input_low > input_high:
raise ValueError(
"Invalid input sampling interval: "
f"low={input_low} > high={input_high}"
)
if output_low > output_high:
raise ValueError(
"Invalid output sampling interval: "
f"low={output_low} > high={output_high}"
)
logger.info(
"Sampling input_len from [%s, %s] and output_len from [%s, %s]",
input_low,
input_high,
output_low,
output_high,
)
input_lens = self._rng.integers(input_low, input_high + 1,
size=num_requests)
output_lens = self._rng.integers(output_low, output_high + 1,
size=num_requests)
offsets = self._rng.integers(0, tokenizer.vocab_size,
size=num_requests)
return input_lens, output_lens, offsets
def generate_token_sequence(
self,
*,
tokenizer: PreTrainedTokenizerBase,
prefix_token_ids: list[int],
prefix_len: int,
vocab_size: int,
input_len: int,
offset: int,
index: int,
) -> tuple[str, int]:
"""
Returns (prompt, total_input_len).
NOTE: After decoding the prompt we have to encode and decode it again.
This is done because in some cases N consecutive tokens
give a string tokenized into != N number of tokens.
For example for GPT2Tokenizer:
[6880, 6881] -> ['Ġcalls', 'here'] ->
[1650, 939, 486] -> ['Ġcall', 'sh', 'ere']
To avoid uncontrolled change of the prompt length,
the encoded sequence is truncated before being decoded again.
"""
# Build the inner sequence by sampling sequentially from the vocab
inner_seq = ((offset + index + np.arange(input_len))
% vocab_size).tolist()
token_sequence = prefix_token_ids + inner_seq
# Decode, then re-encode and truncate to preserve token count invariants
prompt = tokenizer.decode(token_sequence)
total_input_len = prefix_len + int(input_len)
re_encoded_sequence = tokenizer.encode(
prompt, add_special_tokens=False)[:total_input_len]
prompt = tokenizer.decode(re_encoded_sequence)
total_input_len = len(re_encoded_sequence)
return prompt, total_input_len
# -----------------------------------------------------------------------------
# MultiModalDataset Implementation
# -----------------------------------------------------------------------------
class RandomMultiModalDataset(RandomDataset):
"""
Synthetic multimodal dataset (text + images) that extends RandomDataset.
Status:
- Images: supported via synthetic RGB data.
- Video: not yet supported (TODO: implement video generation method).
- Audio: not yet supported.
Sampling overview:
1) Number of items per request is sampled uniformly from the integer range
[floor(n·(1−r)), ceil(n·(1+r))], where n is the base count and r is
`num_mm_items_range_ratio` in [0, 1]. r=0 keeps it fixed; r=1 allows 0.
The maximum is further clamped to the sum of per-modality limits.
2) Each item’s modality and shape is sampled from `bucket_config`, a dict
mapping (height, width, num_frames) → probability. We treat
`num_frames`=1 as image and and `num_frames` > 1 as video.
Entries with zero probability are removed and the rest are renormalized
to sum to 1.
3) Per-modality hard caps are enforced via `limit_mm_per_prompt`.
When a modality reaches its cap, all of its buckets are excluded and the
remaining probabilities are renormalized.
Example bucket configuration:
{(256, 256, 1): 0.5, (720, 1280, 1): 0.4, (720, 1280, 16): 0.1}
- Two image buckets (`num_frames`=1) and one video bucket
(`num_frames`=16).
OBS.: Only image sampling is supported for now.
"""
IS_MULTIMODAL = True
# NOTE: video sampling is WIP. Setting it to 0.
DEFAULT_LIMIT_MM_PER_PROMPT = {"image": 255, "video": 0}
DEFAULT_BASE_ITEMS_PER_REQUEST = 1
DEFAULT_NUM_MM_ITEMS_RANGE_RATIO = 0.0
DEFAULT_MM_ITEM_BUCKET_CONFIG = {
(256, 256, 1): 0.5,
(720, 1280, 1): 0.5,
(720, 1280, 16): 0.0,
}
DEFAULT_ENABLE_MULTIMODAL_CHAT = False
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
def generate_synthetic_image(self, width: int, height: int) -> Image.Image:
"""Generate synthetic PIL image with random RGB values.
NOTE: iid pixel sampling results in worst-case compression
(good for stressing I/O), but very unlike real photos.
We could consider a “low-freq” mode (e.g., noise blur)
to emulate network realism instead of max stress.
"""
random_pixels = self._rng.integers(
0,
256,
(height, width, 3),
dtype=np.uint8,
)
return Image.fromarray(random_pixels)
def generate_synthetic_video(self, width: int,
height: int,
num_frames: int) -> Any:
"""Generate synthetic video with random values.
TODO: Finish this method.
"""
raise NotImplementedError("Video sampling is WIP.")
def map_config_to_modality(self, config: tuple[int, int, int]) -> str:
"""Map the configuration to the modality."""
if config[-1] == 1:
return "image"
elif config[-1] > 1:
return "video"
else:
raise ValueError(f"Invalid multimodal item configuration: {config}")
def normalize_bucket_config(self, bucket_config: dict[tuple[int, int, int],
float]) -> dict[tuple[int, int, int], float]:
"""
Remove zero probability entries
and normalize the bucket config to sum to 1.
"""
# Raise error if value is negative
if any(v < 0 for v in bucket_config.values()):
raise ValueError("Bucket config values must be non-negative.")
# Remove zero probability entries
bucket_config = {k: v for k, v in bucket_config.items() if v > 0}
# if bucket config is empty, raise error
if not bucket_config:
raise ValueError("Got invalid bucket config. "
"Bucket config values must be non-zero.")
# Normalize the remaining bucket config to sum to 1
total = sum(bucket_config.values())
return {k: v / total for k, v in bucket_config.items()}
def generate_mm_item(self,
mm_item_config: tuple[int, int, int],
) -> Mapping[str, Any]:
"""
Create synthetic images and videos and
apply process_image/process_video respectively.
This follows the OpenAI API chat completions
https://github.com/openai/openai-python
"""
if self.map_config_to_modality(mm_item_config) == "image":
return process_image(self.generate_synthetic_image(
mm_item_config[1],
mm_item_config[0]))
elif self.map_config_to_modality(mm_item_config) == "video":
return process_video(self.generate_synthetic_video(
mm_item_config[1],
mm_item_config[0],
mm_item_config[2]))
else:
raise ValueError(f"Invalid multimodal item configuration: "
f"{mm_item_config}")
def get_mm_item_sampling_params(
self,
base_items_per_request: int,
num_mm_items_range_ratio: float,
limit_mm_per_prompt: dict[str, int],
bucket_config: dict[tuple[int, int, int], float],
) -> tuple[int, int, dict[str, int], dict[tuple[int, int, int], float]]:
"""
Get the sampling parameters for the multimodal items.
"""
# Enforce num_mm_items_range_ratio <= 1
if not (0.0 <= num_mm_items_range_ratio <= 1.0):
raise ValueError("num_mm_items_range_ratio must be in [0, 1].")
# Ensure modalities to sample are in limit_mm_per_prompt
for k, v in bucket_config.items():
# get modality from bucket config
modality = self.map_config_to_modality(k)
if modality not in limit_mm_per_prompt:
raise ValueError(f"Modality {modality} is not in "
f"limit_mm_per_prompt: "
f"{limit_mm_per_prompt.keys()}")
# Remove zero probability entries
# and normalize bucket config to sum to 1
bucket_config = self.normalize_bucket_config(bucket_config)
logger.info(
"Normalized bucket config: %s", bucket_config,
)
# Only consider limit per prompt for modalities in bucket config
allowed_modalities = {self.map_config_to_modality(cfg)
for cfg in bucket_config}
limit_mm_per_prompt = {
k: v for k, v in limit_mm_per_prompt.items()
if k in allowed_modalities}
if not limit_mm_per_prompt:
raise ValueError("No valid limits for modalities present in "
"bucket_config.")
logger.info(
"Updated mm-limit-per-prompt: %s", limit_mm_per_prompt,
)
# Get max and min num mm items and ensure
# it is at most the sum of limit_mm_per_prompt for all modalities
max_num_mm_items = min(
sum(limit_mm_per_prompt.values()),
math.ceil(base_items_per_request * (1 + num_mm_items_range_ratio))
)
# Ensure min num mm items is at least 0
min_num_mm_items = max(
0,
math.floor(base_items_per_request * (1 - num_mm_items_range_ratio))
)
# Raise error if min num mm items is greater than max num mm items
if min_num_mm_items > max_num_mm_items:
raise ValueError(f"Min num mm items is greater than max mm items: "
f"{min_num_mm_items} > {max_num_mm_items}")
logger.info(
"Sampling number of multimodal items from [%s, %s]",
min_num_mm_items, max_num_mm_items,
)
return (
min_num_mm_items,
max_num_mm_items,
limit_mm_per_prompt,
bucket_config,
)
def get_mm_item_iterator(
self,
min_num_mm_items: int,
max_num_mm_items: int,
bucket_config: dict[tuple[int, int, int], float],
limit_mm_per_prompt: dict[str, int],
) -> Iterator[tuple[int,int, int]]:
"""
Iterator over the multimodal items for each request
whose size is between min_num_mm_items and max_num_mm_items.
Loop over the bucket config and sample a multimodal item.
Loop until the number of multimodal items sampled is equal to
request_num_mm_items or limit of multimodal items per prompt
for all modalities is reached.
Note:
- This function operates on a per-request shallow copy of
`bucket_config` (tuple->float). The original dict passed to
`sample` is not mutated. If this ever changes, a test
is implemented and will fail.
"""
# Get the number of multimodal items to sample
request_num_mm_items = int(
self._rng.integers(min_num_mm_items, max_num_mm_items + 1)
)
# If request_num_mm_items is 0, yield an empty iterator
if request_num_mm_items == 0:
return
# Initialize modality counters
modality_counter = {self.map_config_to_modality(k): 0
for k in bucket_config}
# Copy the bucket config to avoid modifying the original
bucket_config_copy = bucket_config.copy()
# Loop over the number of multimodal items to sample
while sum(modality_counter.values()) < request_num_mm_items:
# Sample a multimodal item config
mm_item_config = self._rng.choice(list(bucket_config_copy.keys()),
p=list(bucket_config_copy.values()))
modality = self.map_config_to_modality(mm_item_config)
# Check that modality count is less than limit per prompt
if modality_counter[modality] < limit_mm_per_prompt[modality]:
modality_counter[modality] += 1
yield (
mm_item_config
)
else:
# If the counter is greater than the limit per prompt
# set all multimodal items of this modality to 0
for k, v in bucket_config_copy.items():
if self.map_config_to_modality(k) == modality:
bucket_config_copy[k] = 0
# If all configs are 0, break the loop
# This should not happen as request_num_mm_items is at most
# the sum of limit_mm_per_prompt for all modalities
if all(v == 0 for v in bucket_config_copy.values()):
logger.warning("Exhausted all multimodal items "
"of modality %s",
modality)
break
# Renormalize the bucket config
bucket_config_copy = self.normalize_bucket_config(
bucket_config_copy)
def sample(
self,
tokenizer: PreTrainedTokenizerBase,
num_requests: int,
request_id_prefix: str = "",
no_oversample: bool = False,
prefix_len: int = RandomDataset.DEFAULT_PREFIX_LEN,
range_ratio: float = RandomDataset.DEFAULT_RANGE_RATIO,
input_len: int = RandomDataset.DEFAULT_INPUT_LEN,
output_len: int = RandomDataset.DEFAULT_OUTPUT_LEN,
limit_mm_per_prompt: dict[str, int] = DEFAULT_LIMIT_MM_PER_PROMPT,
base_items_per_request: int = DEFAULT_BASE_ITEMS_PER_REQUEST,
num_mm_items_range_ratio: float = DEFAULT_NUM_MM_ITEMS_RANGE_RATIO,
bucket_config: dict[tuple[int, int, int], float] =
DEFAULT_MM_ITEM_BUCKET_CONFIG,
enable_multimodal_chat: bool = DEFAULT_ENABLE_MULTIMODAL_CHAT,
**kwargs,
) -> list[SampleRequest]:
# NOTE: Video sampling is WIP. Raise error if video is in bucket config
# and probability is non-zero.
if any(self.map_config_to_modality(cfg) == "video" and p > 0
for cfg, p in bucket_config.items()):
raise NotImplementedError("Video sampling not implemented; "
"set its probability to 0.")
# Get the sampling parameters for the dataset
input_lens, output_lens, offsets = self.get_sampling_params(
num_requests, range_ratio, input_len, output_len, tokenizer
)
(
min_num_mm_items,
max_num_mm_items,
limit_mm_per_prompt,
bucket_config,
) = self.get_mm_item_sampling_params(
base_items_per_request,
num_mm_items_range_ratio,
limit_mm_per_prompt,
bucket_config,
)
# Generate prefix once
prefix_token_ids = self.get_prefix(tokenizer, prefix_len)
vocab_size = tokenizer.vocab_size
# Add synthetic multimodal items to each request
mm_requests = []
for i in range(num_requests):
prompt, total_input_len = self.generate_token_sequence(
tokenizer=tokenizer,
prefix_token_ids=prefix_token_ids,
prefix_len=prefix_len,
vocab_size=vocab_size,
input_len=int(input_lens[i]),
offset=int(offsets[i]),
index=i,
)
# Get multimodal item iterator for a given request
mm_item_iterator = self.get_mm_item_iterator(
min_num_mm_items,
max_num_mm_items,
bucket_config,
limit_mm_per_prompt,
)
mm_content = cast(list[dict[str, Any]], [
self.generate_mm_item(mm_item_config)
for mm_item_config in mm_item_iterator
])
if enable_multimodal_chat:
# NOTE: For now this option is only provided for completeness
# given that the serve.py benchmark currently does not use it.
mm_chat_prompt: Any = prompt
mm_chat_prompt = self.apply_multimodal_chat_transformation(
prompt, mm_content)
sample_request = SampleRequest(
prompt=mm_chat_prompt,
prompt_len=total_input_len,
expected_output_len=int(output_lens[i]),
multi_modal_data=None,
request_id=request_id_prefix + str(i),
)
else:
sample_request = SampleRequest(
prompt=prompt,
prompt_len=total_input_len,
expected_output_len=int(output_lens[i]),
multi_modal_data=mm_content,
request_id=request_id_prefix + str(i),
)
mm_requests.append(sample_request)
return mm_requests
# -----------------------------------------------------------------------------
# ShareGPT Dataset Implementation
# -----------------------------------------------------------------------------
class ShareGPTDataset(BenchmarkDataset):
"""
Implements the ShareGPT dataset. Loads data from a JSON file and generates
sample requests based on conversation turns.
"""
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.load_data()
def load_data(self) -> None:
if self.dataset_path is None:
raise ValueError("dataset_path must be provided for loading data.")
with open(self.dataset_path, encoding="utf-8") as f:
self.data = json.load(f)
# Filter entries with at least two conversation turns.
self.data = [
entry for entry in self.data
if "conversations" in entry and len(entry["conversations"]) >= 2
]
random.seed(self.random_seed)
random.shuffle(self.data)
def sample(
self,
tokenizer: PreTrainedTokenizerBase,
num_requests: int,
lora_path: Optional[str] = None,
max_loras: Optional[int] = None,
output_len: Optional[int] = None,
enable_multimodal_chat: bool = False,
request_id_prefix: str = "",
no_oversample: bool = False,
**kwargs,
) -> list:
samples: list = []
ind = 0
for entry in self.data:
if len(samples) >= num_requests:
break
prompt, completion = (
entry["conversations"][0]["value"],
entry["conversations"][1]["value"],
)
lora_request = self.get_random_lora_request(
max_loras=max_loras, lora_path=lora_path)
prompt_ids = tokenizer(prompt).input_ids
completion_ids = tokenizer(completion).input_ids
prompt_len = len(prompt_ids)
new_output_len = (len(completion_ids)
if output_len is None else output_len)
if not is_valid_sequence(prompt_len,
new_output_len,
skip_min_output_len_check=output_len
is not None):
continue
if image_path := entry.get("image"):
mm_content = process_image(image_path)
elif video_path := entry.get("video"):
mm_content = process_video(video_path)
else:
mm_content = None
if enable_multimodal_chat:
prompt = self.apply_multimodal_chat_transformation(
prompt, mm_content)
samples.append(
SampleRequest(
prompt=prompt,
prompt_len=prompt_len,
expected_output_len=new_output_len,
lora_request=lora_request,
multi_modal_data=mm_content,
request_id=request_id_prefix + str(ind),
))
ind += 1
self.maybe_oversample_requests(samples,
num_requests,
request_id_prefix,
no_oversample)
return samples
class _ValidateDatasetArgs(argparse.Action):
"""Argparse action to validate dataset name and path compatibility."""
def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, self.dest, values)
# Get current values of both dataset_name and dataset_path
dataset_name = getattr(namespace, 'dataset_name', 'random')
dataset_path = getattr(namespace, 'dataset_path', None)
# Validate the combination
if dataset_name == "random" and dataset_path is not None:
parser.error(
"Cannot use 'random' dataset with --dataset-path. "
"Please specify the appropriate --dataset-name (e.g., "
"'sharegpt', 'custom', 'sonnet') for your dataset file: "
f"{dataset_path}"
)
def add_dataset_parser(parser: FlexibleArgumentParser):
parser.add_argument("--seed", type=int, default=0)
parser.add_argument(
"--num-prompts",
type=int,
default=1000,
help="Number of prompts to process.",
)
parser.add_argument(
"--dataset-name",
type=str,
default="random",
action=_ValidateDatasetArgs,
choices=[
"sharegpt", "burstgpt", "sonnet", "random", "random-mm", "hf",
"custom", "prefix_repetition", "spec_bench"
],
help="Name of the dataset to benchmark on.",
)
parser.add_argument(
"--no-stream",
action="store_true",
help="Do not load the dataset in streaming mode.",
)
parser.add_argument(
"--dataset-path",
type=str,
default=None,
action=_ValidateDatasetArgs,
help="Path to the sharegpt/sonnet dataset. "
"Or the huggingface dataset ID if using HF dataset.",
)
parser.add_argument(
"--no-oversample",
action="store_true",
help="Do not oversample if the dataset has " \
"fewer samples than num-prompts.",
)
# group for dataset specific arguments
custom_group = parser.add_argument_group("custom dataset options")
custom_group.add_argument(
"--custom-output-len",
type=int,
default=256,
help=
"Number of output tokens per request, used only for custom dataset.",
)
custom_group.add_argument(
"--custom-skip-chat-template",
action="store_true",
help=
"Skip applying chat template to prompt, used only for custom dataset.",
)
spec_bench_group = parser.add_argument_group("spec bench dataset options")
spec_bench_group.add_argument(
"--spec-bench-output-len",
type=int,
default=256,
help=
"Num of output tokens per request, used only for spec bench dataset.",
)
spec_bench_group.add_argument(
"--spec-bench-category",
type=str,
default=None,
help=
"Category for spec bench dataset. If None, use all categories.",
)
sonnet_group = parser.add_argument_group("sonnet dataset options")
sonnet_group.add_argument(
"--sonnet-input-len",
type=int,
default=550,
help=
"Number of input tokens per request, used only for sonnet dataset.",
)
sonnet_group.add_argument(
"--sonnet-output-len",
type=int,
default=150,
help=
"Number of output tokens per request, used only for sonnet dataset.",
)
sonnet_group.add_argument(
"--sonnet-prefix-len",
type=int,
default=200,
help=
"Number of prefix tokens per request, used only for sonnet dataset.",
)
sharegpt_group = parser.add_argument_group("sharegpt dataset options")
sharegpt_group.add_argument(
"--sharegpt-output-len",
type=int,
default=None,
help="Output length for each request. Overrides the output length "
"from the ShareGPT dataset.",
)
blazedit_group = parser.add_argument_group("blazedit dataset options")
blazedit_group.add_argument(
"--blazedit-min-distance",
type=float,
default=0.0,
help=
"Minimum distance for blazedit dataset. Min: 0, Max: 1.0",
)
blazedit_group.add_argument(
"--blazedit-max-distance",
type=float,
default=1.0,
help=
"Maximum distance for blazedit dataset. Min: 0, Max: 1.0",
)
random_group = parser.add_argument_group("random dataset options")
random_group.add_argument(
"--random-input-len",
type=int,
default=1024,
help=
"Number of input tokens per request, used only for random sampling.",
)
random_group.add_argument(
"--random-output-len",
type=int,
default=128,
help=
"Number of output tokens per request, used only for random sampling.",
)
random_group.add_argument(
"--random-range-ratio",
type=float,
default=0.0,
help="Range ratio for sampling input/output length, "
"used only for random sampling. Must be in the range [0, 1) to define "
"a symmetric sampling range"
"[length * (1 - range_ratio), length * (1 + range_ratio)].",
)
random_group.add_argument(
"--random-prefix-len",
type=int,
default=0,
help=("Number of fixed prefix tokens before the random context "
"in a request. "
"The total input length is the sum of `random-prefix-len` and "
"a random "
"context length sampled from [input_len * (1 - range_ratio), "
"input_len * (1 + range_ratio)]."),
)
random_group.add_argument(
"--random-batch-size",
type=int,
default=1,
help=("Batch size for random sampling. "
"Only used for embeddings benchmark."),
)
# random multimodal dataset options
random_mm_group = parser.add_argument_group(
"random multimodal dataset options extended from random dataset")
random_mm_group.add_argument(
"--random-mm-base-items-per-request",
type=int,
default=RandomMultiModalDataset.DEFAULT_BASE_ITEMS_PER_REQUEST,
help=(
"Base number of multimodal items per request for random-mm. "
"Actual per-request count is sampled around this base using "
"--random-mm-num-mm-items-range-ratio."
),
)
random_mm_group.add_argument(
"--random-mm-num-mm-items-range-ratio",
type=float,
default=RandomMultiModalDataset.DEFAULT_NUM_MM_ITEMS_RANGE_RATIO,
help=(
"Range ratio r in [0, 1] for sampling items per request. "
"We sample uniformly from the closed integer range "
"[floor(n*(1-r)), ceil(n*(1+r))] "
"where n is the base items per request. "
"r=0 keeps it fixed; r=1 allows 0 items. The maximum is clamped "
"to the sum of per-modality limits from "
"--random-mm-limit-mm-per-prompt. "
"An error is raised if the computed min exceeds the max."
),
)
random_mm_group.add_argument(
"--random-mm-limit-mm-per-prompt",
type=json.loads,
default=RandomMultiModalDataset.DEFAULT_LIMIT_MM_PER_PROMPT,
help=(
"Per-modality hard caps for items attached per request, e.g. "
"'{\"image\": 3, \"video\": 0}'. The sampled per-request item "
"count is clamped to the sum of these limits. When a modality "
"reaches its cap, its buckets are excluded and probabilities are "
"renormalized."
"OBS.: Only image sampling is supported for now."
),
)
def _parse_mm_bucket_config(v: object) -> dict[tuple[int, int, int], float]:
# If already a dict (e.g., programmatic call), normalize keys
def normalize(d: dict) -> dict[tuple[int, int, int], float]:
out: dict[tuple[int, int, int], float] = {}
for k, val in d.items():
key = k
if isinstance(key, str):
with suppress(Exception):
key = ast.literal_eval(key)
if not (isinstance(key, tuple) and len(key) == 3
and all(isinstance(x, int) for x in key)):
raise ValueError(
f"Invalid bucket key {k!r}. Expected tuple (H, W, T)."
)
out[(int(key[0]), int(key[1]), int(key[2]))] = float(val)
return out
if isinstance(v, dict):
return normalize(v)
if isinstance(v, str):
# Python literal (supports tuple keys)
parsed = ast.literal_eval(v)
if not isinstance(parsed, dict):
raise ValueError("Bucket config must parse to a dict.")
return normalize(parsed)
raise ValueError("Unsupported value for --random-mm-bucket-config.")
random_mm_group.add_argument(
"--random-mm-bucket-config",
type=_parse_mm_bucket_config,
default=RandomMultiModalDataset.DEFAULT_MM_ITEM_BUCKET_CONFIG,
help=(
"The bucket config is a dictionary mapping a multimodal item"
"sampling configuration to a probability."
"Currently allows for 2 modalities: images and videos. "
"An bucket key is a tuple of (height, width, num_frames)"
"The value is the probability of sampling that specific item. "
"Example: "
"--random-mm-bucket-config "
"{(256, 256, 1): 0.5, (720, 1280, 1): 0.4, (720, 1280, 16): 0.10} "
"First item: images with resolution 256x256 w.p. 0.5"
"Second item: images with resolution 720x1280 w.p. 0.4 "
"Third item: videos with resolution 720x1280 and 16 frames w.p. 0.1"
"OBS.: If the probabilities do not sum to 1, they are normalized."
"OBS bis.: Only image sampling is supported for now."
),
)
hf_group = parser.add_argument_group("hf dataset options")
hf_group.add_argument("--hf-subset",
type=str,
default=None,
help="Subset of the HF dataset.")
hf_group.add_argument("--hf-split",
type=str,
default=None,
help="Split of the HF dataset.")
hf_group.add_argument(
"--hf-name",
type=str,
default=None,
help=(
"Name of the dataset on HuggingFace "
"(e.g., 'lmarena-ai/VisionArena-Chat'). "
"Specify this if your dataset-path is a local path."
),
)
hf_group.add_argument(
"--hf-output-len",
type=int,
default=None,
help="Output length for each request. Overrides the output lengths "
"from the sampled HF dataset.",
)
prefix_repetition_group = parser.add_argument_group(
"prefix repetition dataset options")
prefix_repetition_group.add_argument(
"--prefix-repetition-prefix-len",
type=int,
default=256,
help="Number of prefix tokens per request, used only for prefix "
"repetition dataset.",
)
prefix_repetition_group.add_argument(
"--prefix-repetition-suffix-len",
type=int,
default=256,
help="Number of suffix tokens per request, used only for prefix "
"repetition dataset. Total input length is prefix_len + suffix_len.",
)
prefix_repetition_group.add_argument(
"--prefix-repetition-num-prefixes",
type=int,
default=10,
help="Number of prefixes to generate, used only for prefix repetition "
"dataset. Prompts per prefix is num_requests // num_prefixes.",
)
prefix_repetition_group.add_argument(
"--prefix-repetition-output-len",
type=int,
default=128,
help="Number of output tokens per request, used only for prefix "
"repetition dataset.",
)
def get_samples(args, tokenizer) -> list[SampleRequest]:
if not hasattr(args, "request_id_prefix"):
args.request_id_prefix = ""
if args.dataset_name == "custom":
dataset = CustomDataset(dataset_path=args.dataset_path)
input_requests = dataset.sample(
num_requests=args.num_prompts,
tokenizer=tokenizer,
output_len=args.custom_output_len,
skip_chat_template=args.custom_skip_chat_template,
request_id_prefix=args.request_id_prefix,
no_oversample=args.no_oversample,
)
elif args.dataset_name == "sonnet":
dataset = SonnetDataset(dataset_path=args.dataset_path)
# For the "sonnet" dataset, formatting depends on the backend.
if args.backend == "openai-chat":
input_requests = dataset.sample(
num_requests=args.num_prompts,
input_len=args.sonnet_input_len,
output_len=args.sonnet_output_len,
prefix_len=args.sonnet_prefix_len,
tokenizer=tokenizer,
return_prompt_formatted=False,
request_id_prefix=args.request_id_prefix,
no_oversample=args.no_oversample,
)
else:
assert tokenizer.chat_template or tokenizer.default_chat_template, (
"Tokenizer/model must have chat template for sonnet dataset.")
input_requests = dataset.sample(
num_requests=args.num_prompts,
input_len=args.sonnet_input_len,
output_len=args.sonnet_output_len,
prefix_len=args.sonnet_prefix_len,
tokenizer=tokenizer,
return_prompt_formatted=True,
request_id_prefix=args.request_id_prefix,
no_oversample=args.no_oversample,
)
elif args.dataset_name == "hf":
# all following datasets are implemented from the
# HuggingFaceDataset base class
hf_kwargs = {}
if (
args.dataset_path in VisionArenaDataset.SUPPORTED_DATASET_PATHS
or args.hf_name in VisionArenaDataset.SUPPORTED_DATASET_PATHS
):
dataset_class = VisionArenaDataset
args.hf_split = "train"
args.hf_subset = None
elif (
args.dataset_path in MMVUDataset.SUPPORTED_DATASET_PATHS
or args.hf_name in MMVUDataset.SUPPORTED_DATASET_PATHS
):
dataset_class = MMVUDataset
args.hf_split = "validation"
args.hf_subset = None
elif (
args.dataset_path in InstructCoderDataset.SUPPORTED_DATASET_PATHS
or args.hf_name in InstructCoderDataset.SUPPORTED_DATASET_PATHS
):
dataset_class = InstructCoderDataset
args.hf_split = "train"
elif (
args.dataset_path in MTBenchDataset.SUPPORTED_DATASET_PATHS
or args.hf_name in MTBenchDataset.SUPPORTED_DATASET_PATHS
):
dataset_class = MTBenchDataset
args.hf_split = "train"
elif (
args.dataset_path in ConversationDataset.SUPPORTED_DATASET_PATHS
or args.hf_name in ConversationDataset.SUPPORTED_DATASET_PATHS
):
dataset_class = ConversationDataset
elif (
args.dataset_path in AIMODataset.SUPPORTED_DATASET_PATHS
or args.hf_name in AIMODataset.SUPPORTED_DATASET_PATHS
):
dataset_class = AIMODataset
args.hf_split = "train"
elif (
args.dataset_path
in NextEditPredictionDataset.SUPPORTED_DATASET_PATHS # noqa: E501
or args.hf_name in NextEditPredictionDataset.SUPPORTED_DATASET_PATHS
):
dataset_class = NextEditPredictionDataset
args.hf_split = "train"
elif (
args.dataset_path in ASRDataset.SUPPORTED_DATASET_PATHS
or args.hf_name in ASRDataset.SUPPORTED_DATASET_PATHS
):
dataset_class = ASRDataset
args.hf_split = "train"
elif args.dataset_path in BlazeditDataset.SUPPORTED_DATASET_PATHS:
dataset_class = BlazeditDataset
args.hf_split = "train"
hf_kwargs = {
"min_distance": args.blazedit_min_distance,
"max_distance": args.blazedit_max_distance,
}
elif (
args.dataset_path in MLPerfDataset.SUPPORTED_DATASET_PATHS
or args.hf_name in MLPerfDataset.SUPPORTED_DATASET_PATHS
):
dataset_class = MLPerfDataset
args.hf_split = "train"
else:
supported_datasets = set([
dataset_name for cls in HuggingFaceDataset.__subclasses__()
for dataset_name in cls.SUPPORTED_DATASET_PATHS
])
raise ValueError(
f"Unsupported dataset path: {args.dataset_path}. "
"Huggingface dataset only supports dataset_path"
f" from one of following: {supported_datasets}. "
"Please consider contributing if you would "
"like to add support for additional dataset formats.")
if dataset_class.IS_MULTIMODAL and args.backend not in [
"openai-chat",
"openai-audio",
]:
# multi-modal benchmark is only available on OpenAI Chat
# endpoint-type.
raise ValueError(
"Multi-modal content is only supported on 'openai-chat' and "
"'openai-audio' backends.")
input_requests = dataset_class(
dataset_path=args.dataset_path,
dataset_subset=args.hf_subset,
dataset_split=args.hf_split,
random_seed=args.seed,
no_stream=args.no_stream,
hf_name=args.hf_name,
).sample(
num_requests=args.num_prompts,
tokenizer=tokenizer,
output_len=args.hf_output_len,
request_id_prefix=args.request_id_prefix,
no_oversample=args.no_oversample,
**hf_kwargs
)
else:
# For datasets that follow a similar structure, use a mapping.
dataset_mapping = {
"spec_bench":
lambda: SpecBench(dataset_path=args.dataset_path,
category=args.spec_bench_category).sample(
num_requests=args.num_prompts,
tokenizer=tokenizer,
output_len=args.spec_bench_output_len,
request_id_prefix=args.request_id_prefix,
no_oversample=args.no_oversample,
),
"sharegpt": lambda: ShareGPTDataset(
random_seed=args.seed, dataset_path=args.dataset_path
).sample(
tokenizer=tokenizer,
num_requests=args.num_prompts,
output_len=args.sharegpt_output_len,
request_id_prefix=args.request_id_prefix,
no_oversample=args.no_oversample,
),
"burstgpt": lambda: BurstGPTDataset(
random_seed=args.seed, dataset_path=args.dataset_path
).sample(
tokenizer=tokenizer,
num_requests=args.num_prompts,
request_id_prefix=args.request_id_prefix,
no_oversample=args.no_oversample,
),
"random": lambda: RandomDataset(
random_seed=args.seed, dataset_path=args.dataset_path
).sample(
tokenizer=tokenizer,
num_requests=args.num_prompts,
prefix_len=args.random_prefix_len,
input_len=args.random_input_len,
output_len=args.random_output_len,
range_ratio=args.random_range_ratio,
request_id_prefix=args.request_id_prefix,
batchsize=args.random_batch_size,
no_oversample=args.no_oversample,
),
"random-mm":
lambda: RandomMultiModalDataset(
random_seed=args.seed, dataset_path=args.dataset_path
).sample(
tokenizer=tokenizer,
num_requests=args.num_prompts,
prefix_len=args.random_prefix_len,
range_ratio=args.random_range_ratio,
input_len=args.random_input_len,
output_len=args.random_output_len,
base_items_per_request=args.random_mm_base_items_per_request,
limit_mm_per_prompt=args.random_mm_limit_mm_per_prompt,
num_mm_items_range_ratio=args.random_mm_num_mm_items_range_ratio,
bucket_config=args.random_mm_bucket_config,
request_id_prefix=args.request_id_prefix,
no_oversample=args.no_oversample,
),
"prefix_repetition":
lambda: PrefixRepetitionRandomDataset(
random_seed=args.seed, dataset_path=args.dataset_path
).sample(
tokenizer=tokenizer,
num_requests=args.num_prompts,
prefix_len=args.prefix_repetition_prefix_len,
suffix_len=args.prefix_repetition_suffix_len,
num_prefixes=args.prefix_repetition_num_prefixes,
output_len=args.prefix_repetition_output_len,
request_id_prefix=args.request_id_prefix,
no_oversample=args.no_oversample,
),
}
try:
# Enforce endpoint compatibility for multimodal datasets.
if args.dataset_name == "random-mm" and args.backend not in [
"openai-chat"]:
raise ValueError(
"Multi-modal content (images) is only supported on "
"'openai-chat' backend."
)
input_requests = dataset_mapping[args.dataset_name]()
except KeyError as err:
raise ValueError(f"Unknown dataset: {args.dataset_name}") from err
return input_requests
# -----------------------------------------------------------------------------
# Custom Dataset Implementation
# -----------------------------------------------------------------------------
# class CustomDataset(BenchmarkDataset):
# """
# Implements the Custom dataset. Loads data from a JSONL file and generates
# sample requests based on conversation turns. E.g.,
# ```
# {"prompt": "What is the capital of India?"}
# {"prompt": "What is the capital of Iran?"}
# {"prompt": "What is the capital of China?"}
# ```
# """
# def __init__(self, **kwargs) -> None:
# super().__init__(**kwargs)
# self.load_data()
# def load_data(self) -> None:
# if self.dataset_path is None:
# raise ValueError("dataset_path must be provided for loading data.")
# # self.data will be a list of dictionaries
# # e.g., [{"prompt": "What is the capital of India?"}, ...]
# # This will be the standardized format which load_data()
# # has to convert into depending on the filetype of dataset_path.
# # sample() will assume this standardized format of self.data
# self.data = []
# # Load the JSONL file
# if self.dataset_path.endswith(".jsonl"):
# jsonl_data = pd.read_json(path_or_buf=self.dataset_path,
# lines=True)
# # check if the JSONL file has a 'prompt' column
# if "prompt" not in jsonl_data.columns:
# raise ValueError("JSONL file must contain a 'prompt' column.")
# # Convert each row to a dictionary and append to self.data
# # This will convert the DataFrame to a list of dictionaries
# # where each dictionary corresponds to a row in the DataFrame.
# # This is the standardized format we want for self.data
# for _, row in jsonl_data.iterrows():
# self.data.append(row.to_dict())
# else:
# raise NotImplementedError(
# "Only JSONL format is supported for CustomDataset.")
# random.seed(self.random_seed)
# random.shuffle(self.data)
# def sample(
# self,
# tokenizer: PreTrainedTokenizerBase,
# num_requests: int,
# lora_path: Optional[str] = None,
# max_loras: Optional[int] = None,
# output_len: Optional[int] = None,
# enable_multimodal_chat: bool = False,
# skip_chat_template: bool = False,
# request_id_prefix: str = "",
# no_oversample: bool = False,
# **kwargs,
# ) -> list:
# # load all data if needed
# self.num_available_samples = len(self.data)
# if num_requests <= 0:
# num_requests = self.num_available_samples
# logger.info("num_requests is set to 0 or negative, "
# "so using all available samples: %d",
# num_requests)
# sampled_requests = []
# for i, item in enumerate(self.data):
# if len(sampled_requests) >= num_requests:
# break
# prompt = item["prompt"]
# # apply template
# if not skip_chat_template:
# prompt = tokenizer.apply_chat_template(
# [{
# "role": "user",
# "content": prompt
# }],
# add_generation_prompt=True,
# tokenize=False,
# )
# prompt_len = len(tokenizer(prompt).input_ids)
# sampled_requests.append(
# SampleRequest(
# prompt=prompt,
# prompt_len=prompt_len,
# expected_output_len=output_len,
# request_id=request_id_prefix + str(i),
# ))
# self.maybe_oversample_requests(sampled_requests, num_requests,
# request_id_prefix, no_oversample)
# return sampled_requests
# -----------------------------------------------------------------------------
# Custom Dataset Implementation # -----------------------------------------------------------------------------
class CustomDataset(BenchmarkDataset):
"""Implements the Custom dataset.
Loads data from a JSONL file where each line is a JSON object.
Supports standard 'prompt' field, and also extracts 'sampling_params'
and 'metadata' if present for advanced benchmarking.
Example format:
{"prompt": "What is 2+2?", "sampling_params": {"max_tokens": 10}, "metadata": {"id": 1}}
"""
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.load_data()
def load_data(self) -> None:
if self.dataset_path is None:
raise ValueError("dataset_path must be provided for loading data.")
self.data = []
# Load the JSONL file
if self.dataset_path.endswith(".jsonl"):
# 使用 pandas 读取,或者直接用 json 模块逐行读取以保留 dict 结构
# 这里改用原生 json 读取以避免 pandas 对嵌套字典的自动处理问题
with open(self.dataset_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line:
item = json.loads(line)
# 验证必须的 prompt 字段
if "prompt" not in item:
raise ValueError("Each line must contain a 'prompt' key.")
self.data.append(item) # 保留完整的字典结构
else:
raise NotImplementedError("Only JSONL format is supported for CustomDataset.")
# 打乱数据(基于 prompt 内容,不影响 metadata)
random.seed(self.random_seed)
# 我们只打乱列表顺序,每个 item 内部的 metadata 依然对应原来的 prompt
random.shuffle(self.data)
def sample(
self,
tokenizer: PreTrainedTokenizerBase,
num_requests: int,
lora_path: Optional[str] = None,
max_loras: Optional[int] = None,
output_len: Optional[int] = None,
enable_multimodal_chat: bool = False,
skip_chat_template: bool = False,
request_id_prefix: str = "",
no_oversample: bool = False,
**kwargs,
) -> list:
# load all data if needed
self.num_available_samples = len(self.data)
if num_requests <= 0:
num_requests = self.num_available_samples
logger.info("num_requests is set to 0 or negative, so using all available samples: %d", num_requests)
sampled_requests = []
for i, item in enumerate(self.data):
if len(sampled_requests) >= num_requests:
break
prompt = item["prompt"]
# 处理 Chat Template
if not skip_chat_template:
# 检查 prompt 是否已经是 list (OpenAI 格式)
if isinstance(prompt, list):
# 如果是 list,直接应用模板
prompt_formatted = tokenizer.apply_chat_template(
prompt,
add_generation_prompt=True,
tokenize=False
)
else:
# 如果是字符串,包装成标准消息
prompt_formatted = tokenizer.apply_chat_template(
[{ "role": "user", "content": prompt }],
add_generation_prompt=True,
tokenize=False
)
else:
prompt_formatted = prompt
# 计算 Prompt 长度
# 注意:这里 tokenize 的应该是最终发送给模型的文本
tokenize_input = prompt_formatted
prompt_len = len(tokenizer(tokenize_input).input_ids)
# 提取自定义参数 (如果存在)
# sampling_params 用于覆盖默认的采样参数
sampling_params = item.get("sampling_params", {})
# metadata 用于追踪样本来源或分类
metadata = item.get("metadata", {})
# 如果指定了全局 output_len,则优先使用全局设置
# 否则,尝试从 sampling_params 获取 max_tokens,或者留空由引擎决定
final_output_len = output_len
if final_output_len is None and "max_tokens" in sampling_params:
final_output_len = sampling_params["max_tokens"]
# 如果都没有,可以设为默认值或 None
elif final_output_len is None:
final_output_len = 256 # 默认回退值
sampled_requests.append(
SampleRequest(
prompt=prompt_formatted,
prompt_len=prompt_len,
expected_output_len=final_output_len,
request_id=request_id_prefix + str(i),
# 注入自定义字段
sampling_params=sampling_params,
metadata=metadata
)
)
self.maybe_oversample_requests(sampled_requests, num_requests, request_id_prefix, no_oversample)
return sampled_requests
# -----------------------------------------------------------------------------
# Spec Bench Dataset Implementation
# -----------------------------------------------------------------------------
class SpecBench(CustomDataset):
"""
Implements the SpecBench dataset: https://github.com/hemingkx/Spec-Bench
Download the dataset using:
wget https://raw.githubusercontent.com/hemingkx/Spec-Bench/refs/heads/main/data/spec_bench/question.jsonl
""" # noqa: E501
def __init__(self, **kwargs) -> None:
self.category = kwargs.pop("category", None)
super().__init__(**kwargs)
self.load_data()
def load_data(self) -> None:
if self.dataset_path is None:
raise ValueError("dataset_path must be provided for loading data.")
self.data = []
# Load the JSONL file
jsonl_data = pd.read_json(path_or_buf=self.dataset_path,
lines=True)
# check if the JSONL file has a 'turns' column
if "turns" not in jsonl_data.columns:
raise ValueError("JSONL file must contain a 'turns' column.")
for _, row in jsonl_data.iterrows():
# sample only from a specific category if specified
if (not self.category) or (self.category == row['category']):
prompt = row["turns"][0]
self.data.append({"prompt": prompt})
random.seed(self.random_seed)
random.shuffle(self.data)
def sample(self, **kwargs) -> list:
# leverage CustomDataset sample
kwargs["skip_chat_template"] = False
return super().sample(**kwargs)
# -----------------------------------------------------------------------------
# Sonnet Dataset Implementation
# -----------------------------------------------------------------------------
@deprecated(
"SonnetDataset is deprecated and will be removed in a future version.",
)
class SonnetDataset(BenchmarkDataset):
"""
Simplified implementation of the Sonnet dataset. Loads poem lines from a
text file and generates sample requests. Default values here copied from
`benchmark_serving.py` for the sonnet dataset.
"""
DEFAULT_PREFIX_LEN = 200
DEFAULT_INPUT_LEN = 550
DEFAULT_OUTPUT_LEN = 150
def __init__(
self,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.load_data()
def load_data(self) -> None:
if not self.dataset_path:
raise ValueError("dataset_path must be provided.")
with open(self.dataset_path, encoding="utf-8") as f:
self.data = f.readlines()
def sample(
self,
tokenizer,
num_requests: int,
prefix_len: int = DEFAULT_PREFIX_LEN,
input_len: int = DEFAULT_INPUT_LEN,
output_len: int = DEFAULT_OUTPUT_LEN,
return_prompt_formatted: bool = False,
request_id_prefix: str = "",
no_oversample: bool = False,
**kwargs,
) -> list:
# Calculate average token length for a poem line.
tokenized_lines = [tokenizer(line).input_ids for line in self.data]
avg_len = sum(len(tokens)
for tokens in tokenized_lines) / len(tokenized_lines)
# Build the base prompt.
base_prompt = "Pick as many lines as you can from these poem lines:\n"
base_msg = [{"role": "user", "content": base_prompt}]
base_fmt = tokenizer.apply_chat_template(base_msg,
add_generation_prompt=True,
tokenize=False)
base_offset = len(tokenizer(base_fmt).input_ids)
if input_len <= base_offset:
raise ValueError(
f"'input_len' must be higher than the base prompt length "
f"({base_offset}).")
# Determine how many poem lines to use.
num_input_lines = round((input_len - base_offset) / avg_len)
num_prefix_lines = max(round((prefix_len - base_offset) / avg_len), 0)
prefix_lines = self.data[:num_prefix_lines]
samples = []
ind = 0
while len(samples) < num_requests:
extra_lines = random.choices(self.data,
k=num_input_lines - num_prefix_lines)
prompt = f"{base_prompt}{''.join(prefix_lines + extra_lines)}"
msg = [{"role": "user", "content": prompt}]
prompt_formatted = tokenizer.apply_chat_template(
msg, add_generation_prompt=True, tokenize=False)
prompt_len = len(tokenizer(prompt_formatted).input_ids)
if prompt_len <= input_len:
samples.append(
SampleRequest(
prompt=prompt_formatted
if return_prompt_formatted else prompt,
prompt_len=prompt_len,
expected_output_len=output_len,
request_id=request_id_prefix + str(ind),
))
ind += 1
return samples
# -----------------------------------------------------------------------------
# BurstGPT Dataset Implementation
# -----------------------------------------------------------------------------
class BurstGPTDataset(BenchmarkDataset):
"""
Implements the BurstGPT dataset. Loads data from a CSV file and generates
sample requests based on synthetic prompt generation. Only rows with Model
"GPT-4" and positive response tokens are used.
"""
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.load_data()
def load_data(self, ):
if self.dataset_path is None:
raise ValueError("dataset_path must be provided for loading data.")
df = pd.read_csv(self.dataset_path)
# Filter to keep only GPT-4 rows.
gpt4_df = df[df["Model"] == "GPT-4"]
# Remove failed requests (where Response tokens is 0 or less).
gpt4_df = gpt4_df[gpt4_df["Response tokens"] > 0]
# Sample the desired number of rows.
self.data = gpt4_df
def _sample_loaded_data(self, num_requests: int) -> list:
if num_requests <= len(self.data):
data = self.data.sample(n=num_requests,
random_state=self.random_seed)
else:
data = self.data.sample(
n=num_requests,
random_state=self.random_seed,
replace=True,
)
# Convert the dataframe to a list of lists.
return data.values.tolist()
def sample(
self,
tokenizer: PreTrainedTokenizerBase,
num_requests: int,
max_loras: Optional[int] = None,
lora_path: Optional[str] = None,
request_id_prefix: str = "",
no_oversample: bool = False,
**kwargs,
) -> list[SampleRequest]:
samples = []
data = self._sample_loaded_data(num_requests=num_requests)
for i in range(num_requests):
input_len = int(data[i][2])
output_len = int(data[i][3])
lora_req = self.get_random_lora_request(
max_loras=max_loras, lora_path=lora_path)
vocab_size = tokenizer.vocab_size
# Generate a synthetic prompt: a list of token IDs computed as (i +
# j) modulo vocab_size.
token_ids = [(i + j) % vocab_size for j in range(input_len)]
prompt = tokenizer.decode(token_ids)
samples.append(
SampleRequest(
prompt=prompt,
prompt_len=input_len,
expected_output_len=output_len,
lora_request=lora_req,
request_id=request_id_prefix + str(i),
))
return samples
# -----------------------------------------------------------------------------
# HuggingFace Dataset Base Implementation
# -----------------------------------------------------------------------------
class HuggingFaceDataset(BenchmarkDataset):
"""Base class for datasets hosted on HuggingFace."""
SUPPORTED_DATASET_PATHS: Union[set[str], dict[str, Callable]] = set()
def __init__(
self,
dataset_path: str,
dataset_split: str,
no_stream: bool = False,
dataset_subset: Optional[str] = None,
hf_name: Optional[str] = None,
**kwargs,
) -> None:
super().__init__(dataset_path=dataset_path, **kwargs)
self.dataset_split = dataset_split
self.dataset_subset = dataset_subset
self.load_stream = not no_stream
self.hf_name = hf_name or dataset_path
self.load_data()
def load_data(self) -> None:
"""Load data from HuggingFace datasets."""
self.data = load_dataset(
self.dataset_path,
name=self.dataset_subset,
split=self.dataset_split,
streaming=self.load_stream,
)
self.data = self.data.shuffle(seed=self.random_seed)
# -----------------------------------------------------------------------------
# Conversation Dataset Implementation
# -----------------------------------------------------------------------------
class ConversationDataset(HuggingFaceDataset):
"""Dataset for conversation data with multimodal support."""
SUPPORTED_DATASET_PATHS = {
'lmms-lab/LLaVA-OneVision-Data', 'Aeala/ShareGPT_Vicuna_unfiltered'
}
IS_MULTIMODAL = True
def sample(self,
tokenizer: PreTrainedTokenizerBase,
num_requests: int,
output_len: Optional[int] = None,
enable_multimodal_chat: bool = False,
request_id_prefix: str = "",
no_oversample: bool = False,
**kwargs) -> list:
# Filter examples with at least 2 conversations
filtered_data = self.data.filter(
lambda x: len(x["conversations"]) >= 2)
sampled_requests = []
ind = 0
dynamic_output = output_len is None
for item in filtered_data:
if len(sampled_requests) >= num_requests:
break
conv = item["conversations"]
prompt, completion = conv[0]["value"], conv[1]["value"]
prompt_ids = tokenizer(prompt).input_ids
completion_ids = tokenizer(completion).input_ids
prompt_len = len(prompt_ids)
completion_len = len(completion_ids)
output_len = completion_len if dynamic_output else output_len
assert isinstance(output_len, int) and output_len > 0
if dynamic_output and not is_valid_sequence(
prompt_len, completion_len):
continue
mm_content = process_image(
item["image"]) if "image" in item else None
if enable_multimodal_chat:
# Note: when chat is enabled the request prompt_len is no longer
# accurate and we will be using request output to count the
# actual prompt len and output len
prompt = self.apply_multimodal_chat_transformation(
prompt, mm_content)
sampled_requests.append(
SampleRequest(
prompt=prompt,
prompt_len=prompt_len,
expected_output_len=output_len,
multi_modal_data=mm_content,
request_id=request_id_prefix + str(ind),
))
ind += 1
self.maybe_oversample_requests(sampled_requests, num_requests,
request_id_prefix, no_oversample)
return sampled_requests
# -----------------------------------------------------------------------------
# Vision Arena Dataset Implementation
# -----------------------------------------------------------------------------
class VisionArenaDataset(HuggingFaceDataset):
"""
Vision Arena Dataset.
"""
DEFAULT_OUTPUT_LEN = 128
SUPPORTED_DATASET_PATHS = {
"lmarena-ai/VisionArena-Chat":
lambda x: x["conversation"][0][0]["content"],
"lmarena-ai/vision-arena-bench-v0.1":
lambda x: x["turns"][0][0]["content"]
}
IS_MULTIMODAL = True
def sample(
self,
tokenizer: PreTrainedTokenizerBase,
num_requests: int,
output_len: Optional[int] = None,
enable_multimodal_chat: bool = False,
request_id_prefix: str = "",
no_oversample: bool = False,
**kwargs,
) -> list:
output_len = (output_len
if output_len is not None else self.DEFAULT_OUTPUT_LEN)
sampled_requests = []
for i, item in enumerate(self.data):
if len(sampled_requests) >= num_requests:
break
parser_fn = self.SUPPORTED_DATASET_PATHS.get(self.hf_name)
if parser_fn is None:
raise ValueError(f"Unsupported dataset path: {self.hf_name}")
prompt = parser_fn(item)
mm_content = process_image(item["images"][0])
prompt_len = len(tokenizer(prompt).input_ids)
if enable_multimodal_chat:
# Note: when chat is enabled the request prompt_len is no longer
# accurate and we will be using request output to count the
# actual prompt len
prompt = self.apply_multimodal_chat_transformation(
prompt, mm_content)
sampled_requests.append(
SampleRequest(
prompt=prompt,
prompt_len=prompt_len,
expected_output_len=output_len,
multi_modal_data=mm_content,
request_id=request_id_prefix + str(i),
))
self.maybe_oversample_requests(sampled_requests, num_requests,
request_id_prefix, no_oversample)
return sampled_requests
class MMVUDataset(HuggingFaceDataset):
"""
MMVU Dataset.
https://huggingface.co/datasets/yale-nlp/MMVU
"""
DEFAULT_OUTPUT_LEN = 128
SUPPORTED_DATASET_PATHS = {
"yale-nlp/MMVU":
lambda x: x["question"] + " " + (
" ".join(f"{k}.{v}" for k, v in x["choices"].items())
),
}
def sample(
self,
tokenizer: PreTrainedTokenizerBase,
num_requests: int,
output_len: Optional[int] = None,
enable_multimodal_chat: bool = False,
request_id_prefix: str = "",
no_oversample: bool = False,
**kwargs,
) -> list:
output_len = (output_len
if output_len is not None else self.DEFAULT_OUTPUT_LEN)
sampled_requests = []
for i, item in enumerate(self.data):
if len(sampled_requests) >= num_requests:
break
parser_fn = self.SUPPORTED_DATASET_PATHS.get(self.hf_name)
if parser_fn is None:
raise ValueError(f"Unsupported dataset path: {self.hf_name}")
prompt = parser_fn(item)
mm_content = process_video(item["video"])
prompt_len = len(tokenizer(prompt).input_ids)
if enable_multimodal_chat:
# Note: when chat is enabled the request prompt_len is no longer
# accurate and we will be using request output to count the
# actual prompt len
prompt = self.apply_multimodal_chat_transformation(
prompt, mm_content)
sampled_requests.append(
SampleRequest(
prompt=prompt,
prompt_len=prompt_len,
expected_output_len=output_len,
multi_modal_data=mm_content,
request_id=request_id_prefix + str(i),
))
self.maybe_oversample_requests(sampled_requests, num_requests,
request_id_prefix, no_oversample)
return sampled_requests
# -----------------------------------------------------------------------------
# Instruct Coder Dataset Implementation
# -----------------------------------------------------------------------------
class InstructCoderDataset(HuggingFaceDataset):
"""
InstructCoder Dataset.
https://huggingface.co/datasets/likaixin/InstructCoder
InstructCoder is the dataset designed for general code editing. It consists
of 114,239 instruction-input-output triplets, and covers multiple distinct
code editing scenario.
"""
DEFAULT_OUTPUT_LEN = 200 # this is the average default output length
SUPPORTED_DATASET_PATHS = {
"likaixin/InstructCoder",
}
def sample(self,
tokenizer: PreTrainedTokenizerBase,
num_requests: int,
output_len: Optional[int] = None,
enable_multimodal_chat: bool = False,
request_id_prefix: str = "",
no_oversample: bool = False,
**kwargs) -> list:
output_len = (output_len
if output_len is not None else self.DEFAULT_OUTPUT_LEN)
sampled_requests = []
for i, item in enumerate(self.data):
if len(sampled_requests) >= num_requests:
break
prompt = (
f"{item['input']}\n\n{item['instruction']} Just output "
"the code, do not include any explanation."
)
# apply template
prompt = tokenizer.apply_chat_template(
[{
"role": "user",
"content": prompt
}],
add_generation_prompt=True,
tokenize=False,
)
prompt_len = len(tokenizer(prompt).input_ids)
sampled_requests.append(
SampleRequest(
prompt=prompt,
prompt_len=prompt_len,
expected_output_len=output_len,
request_id=request_id_prefix + str(i),
))
self.maybe_oversample_requests(sampled_requests, num_requests,
request_id_prefix, no_oversample)
return sampled_requests
# -----------------------------------------------------------------------------
# MT-Bench Dataset Implementation
# -----------------------------------------------------------------------------
class MTBenchDataset(HuggingFaceDataset):
"""
MT-Bench Dataset.
https://huggingface.co/datasets/philschmid/mt-bench
We create a single turn dataset for MT-Bench.
This is similar to Spec decoding benchmark setup in vLLM
https://github.com/vllm-project/vllm/blob/9d98ab5ec/examples/offline_inference/eagle.py#L14-L18
""" # noqa: E501
DEFAULT_OUTPUT_LEN = 256 # avg len used in SD bench in vLLM
SUPPORTED_DATASET_PATHS = {
"philschmid/mt-bench",
}
def sample(
self,
tokenizer: PreTrainedTokenizerBase,
num_requests: int,
output_len: Optional[int] = None,
enable_multimodal_chat: bool = False,
request_id_prefix: str = "",
no_oversample: bool = False,
**kwargs,
) -> list:
output_len = (output_len
if output_len is not None else self.DEFAULT_OUTPUT_LEN)
sampled_requests = []
for i, item in enumerate(self.data):
if len(sampled_requests) >= num_requests:
break
prompt = item["turns"][0]
# apply template
prompt = tokenizer.apply_chat_template(
[{
"role": "user",
"content": prompt
}],
add_generation_prompt=True,
tokenize=False,
)
prompt_len = len(tokenizer(prompt).input_ids)
sampled_requests.append(
SampleRequest(
prompt=prompt,
prompt_len=prompt_len,
expected_output_len=output_len,
request_id=request_id_prefix + str(i),
))
self.maybe_oversample_requests(sampled_requests, num_requests,
request_id_prefix, no_oversample)
return sampled_requests
# -----------------------------------------------------------------------------
# Blazedit Dataset Implementation
# -----------------------------------------------------------------------------
class BlazeditDataset(HuggingFaceDataset):
"""
Blazedit Dataset.
https://github.com/ise-uiuc/blazedit
5k char version: vdaita/edit_5k_char
10k char version: vdaita/edit_10k_char
""" # noqa: E501
# 5k char version will have output as ~5k chars
# 10k char version will have output as ~10k chars
# Assuming 3 char per token, 10k chars will be 3333 tokens
# We set default to 4000 to be safe
DEFAULT_OUTPUT_LEN = 4000
SUPPORTED_DATASET_PATHS = {
"vdaita/edit_5k_char",
"vdaita/edit_10k_char",
}
def sample(
self,
tokenizer: PreTrainedTokenizerBase,
num_requests: int,
output_len: Optional[int] = None,
request_id_prefix: str = "",
no_oversample: bool = False,
min_distance: float = 0.0,
max_distance: float = 1.0,
**kwargs,
) -> list:
output_len = (output_len
if output_len is not None else self.DEFAULT_OUTPUT_LEN)
sampled_requests = []
for i, item in enumerate(self.data):
if len(sampled_requests) >= num_requests:
break
code = item["code"]
change_request = item["change_request"]
norm_distance = item["norm_distance"]
# compare the levenshtein distance normalized by code length
if norm_distance < min_distance or norm_distance > max_distance:
continue
# template copied from
# https://github.com/ise-uiuc/blazedit/blob/7765137e656fd62de877422d2e4cf8de51228054/dataset/create_refined_dataset.py#L94-L105 # noqa: E501
instruction = f"""Given a code file, please apply the change requests and generate the new file.
Original file:
```python
{code}
```
Change request:
{change_request}
Please generate the new code file in the "New file" section below.""" # noqa: E501
# apply template
prompt = tokenizer.apply_chat_template(
[{
"role": "user",
"content": instruction
}],
add_generation_prompt=True,
tokenize=False,
)
prompt_len = len(tokenizer(prompt).input_ids)
sampled_requests.append(
SampleRequest(
prompt=prompt,
prompt_len=prompt_len,
expected_output_len=output_len,
request_id=request_id_prefix + str(i),
))
self.maybe_oversample_requests(sampled_requests, num_requests,
request_id_prefix, no_oversample)
return sampled_requests
# -----------------------------------------------------------------------------
# AIMO Dataset Implementation
# -----------------------------------------------------------------------------
class AIMODataset(HuggingFaceDataset):
"""
Dataset class for processing a AIMO dataset with reasoning questions.
"""
SUPPORTED_DATASET_PATHS = {
"AI-MO/aimo-validation-aime", "AI-MO/NuminaMath-1.5",
"AI-MO/NuminaMath-CoT"
}
def sample(self,
tokenizer: PreTrainedTokenizerBase,
num_requests: int,
output_len: Optional[int] = None,
request_id_prefix: str = "",
no_oversample: bool = False,
**kwargs) -> list:
sampled_requests = []
ind = 0
dynamic_output = output_len is None
for item in self.data:
if len(sampled_requests) >= num_requests:
break
prompt, completion = item['problem'], item["solution"]
prompt_ids = tokenizer(prompt).input_ids
completion_ids = tokenizer(completion).input_ids
prompt_len = len(prompt_ids)
completion_len = len(completion_ids)
output_len = completion_len if dynamic_output else output_len
assert isinstance(output_len, int) and output_len > 0
if dynamic_output and not is_valid_sequence(prompt_len,
completion_len,
max_prompt_len=2048,
max_total_len=32000):
continue
sampled_requests.append(
SampleRequest(
prompt=prompt,
prompt_len=prompt_len,
expected_output_len=output_len,
multi_modal_data=None,
request_id=request_id_prefix + str(ind),
))
ind += 1
self.maybe_oversample_requests(sampled_requests, num_requests,
request_id_prefix, no_oversample)
return sampled_requests
# -----------------------------------------------------------------------------
# Next Edit Prediction Dataset Implementation
# -----------------------------------------------------------------------------
zeta_prompt = """### Instruction:
You are a code completion assistant and your task is to analyze user edits and then rewrite an excerpt that the user provides, suggesting the appropriate edits within the excerpt, taking into account the cursor location.
### User Edits:
{}
### User Excerpt:
{}
### Response:
""" # noqa: E501
def _format_zeta_prompt(
sample: dict,
original_start_marker: str = "<|editable_region_start|>") -> dict:
"""Format the zeta prompt for the Next Edit Prediction (NEP) dataset.
This function formats examples from the NEP dataset
into prompts and expected outputs. It could be
further extended to support more NEP datasets.
Args:
sample: The dataset sample containing events,
inputs, and outputs.
original_start_marker: The marker indicating the
start of the editable region. Defaults to
"<|editable_region_start|>".
Returns:
A dictionary with the formatted prompts and expected outputs.
"""
events = sample["events"]
input = sample["input"]
output = sample["output"]
prompt = zeta_prompt.format(events, input)
# following the original implementation, extract the focused region
# from the raw output
output_start_index = output.find(original_start_marker)
output_focused_region = output[output_start_index:]
expected_output = output_focused_region
return {"prompt": prompt, "expected_output": expected_output}
class NextEditPredictionDataset(HuggingFaceDataset):
"""
Dataset class for processing a Next Edit Prediction dataset.
"""
SUPPORTED_DATASET_PATHS = {
"zed-industries/zeta",
}
MAPPING_PROMPT_FUNCS = {
"zed-industries/zeta": _format_zeta_prompt,
}
def sample(self, tokenizer: PreTrainedTokenizerBase, num_requests: int,
request_id_prefix: str = "",
no_oversample: bool = False,
**kwargs):
formatting_prompt_func = self.MAPPING_PROMPT_FUNCS.get(self.hf_name)
if formatting_prompt_func is None:
raise ValueError(f"Unsupported dataset path: {self.hf_name}")
samples = []
for i, sample in enumerate(self.data):
sample = formatting_prompt_func(sample)
samples.append(
SampleRequest(
prompt=sample["prompt"],
prompt_len=len(tokenizer(sample["prompt"]).input_ids),
expected_output_len=len(
tokenizer(sample["expected_output"]).input_ids),
request_id=request_id_prefix + str(i),
))
if len(samples) >= num_requests:
break
self.maybe_oversample_requests(samples,
num_requests,
request_id_prefix,
no_oversample)
return samples
# -----------------------------------------------------------------------------
# ASR Dataset Implementation
# -----------------------------------------------------------------------------
class ASRDataset(HuggingFaceDataset):
"""
Dataset class for processing a ASR dataset for transcription.
Tested on the following set:
+----------------+----------------------------------------+--------------------------+-----------------------------+
| Dataset | Domain | Speaking Style | hf-subset |
+----------------+----------------------------------------+--------------------------+-----------------------------+
| TED-LIUM | TED talks | Oratory | release1, release2, release3|
| | | | release3-speaker-adaptation |
| VoxPopuli | European Parliament | Oratory | en, de, it, fr, ... |
| LibriSpeech | Audiobook | Narrated | "LIUM/tedlium" |
| GigaSpeech | Audiobook, podcast, YouTube | Narrated, spontaneous | xs, s, m, l, xl, dev, test |
| SPGISpeech | Financial meetings | Oratory, spontaneous | S, M, L, dev, test |
| AMI | Meetings | Spontaneous | ihm, sdm |
+----------------+----------------------------------------+--------------------------+-----------------------------+
""" # noqa: E501
SUPPORTED_DATASET_PATHS = {
"openslr/librispeech_asr",
"facebook/voxpopuli",
"LIUM/tedlium",
"edinburghcstr/ami",
"speechcolab/gigaspeech",
"kensho/spgispeech",
}
DEFAULT_OUTPUT_LEN = 128
IS_MULTIMODAL = True
# TODO Whisper-specific. Abstract interface when more models are supported.
TRANSCRIPTION_PREAMBLE = (
"<|startoftranscript|><|en|><|transcribe|><|notimestamps|>")
skip_long_audios: bool = True
def sample(
self,
tokenizer: PreTrainedTokenizerBase,
num_requests: int,
output_len: Optional[int] = None,
request_id_prefix: str = "",
no_oversample: bool = False,
**kwargs,
) -> list:
output_len = (output_len
if output_len is not None else self.DEFAULT_OUTPUT_LEN)
prompt = ASRDataset.TRANSCRIPTION_PREAMBLE
prompt_len = len(tokenizer(prompt).input_ids)
sampled_requests = []
ind = 0
skipped = 0
for item in self.data:
if len(sampled_requests) >= num_requests:
break
audio = item["audio"]
y, sr = audio["array"], audio["sampling_rate"]
duration_s = librosa.get_duration(y=y, sr=sr)
# Whisper max supported duration
if self.skip_long_audios and duration_s > 30:
skipped += 1
continue
mm_content = {"audio": (y, sr)}
sampled_requests.append(
SampleRequest(
prompt=prompt,
prompt_len=prompt_len,
expected_output_len=output_len,
multi_modal_data=mm_content,
request_id=request_id_prefix + str(ind),
))
ind += 1
if skipped:
logger.warning(
"%d samples discarded from dataset due to"
" their length being greater than"
" what Whisper supports.",
skipped,
)
self.maybe_oversample_requests(sampled_requests, num_requests,
request_id_prefix, no_oversample)
return sampled_requests
# -----------------------------------------------------------------------------
# MLPerf Dataset Implementation
# -----------------------------------------------------------------------------
class MLPerfDataset(HuggingFaceDataset):
"""
MLPerf Inference Dataset.
Dataset on HF:
https://huggingface.co/datasets/mgoin/mlperf-inference-llama2-data
https://huggingface.co/datasets/mgoin/mlperf-inference-llama3.1-data
Each record contains:
- "system_prompt": system role instruction.
- "question": user question.
- "output": reference answer.
We combine the system prompt and question into a chat-formatted prompt
(using the tokenizer's chat template) and set the expected output length to
the tokenized length of the provided reference answer.
"""
SUPPORTED_DATASET_PATHS = {
"mgoin/mlperf-inference-llama2-data",
"mgoin/mlperf-inference-llama3.1-data",
}
def sample(
self,
tokenizer: PreTrainedTokenizerBase,
num_requests: int,
output_len: Optional[int] = None,
request_id_prefix: str = "",
no_oversample: bool = False,
**kwargs,
) -> list[SampleRequest]:
# Force dynamic output length based on reference completion.
dynamic_output = output_len is None
sampled_requests: list[SampleRequest] = []
ind = 0
for item in self.data:
if len(sampled_requests) >= num_requests:
break
system_prompt = item["system_prompt"]
question = item["question"]
reference_answer = item["output"]
# Build chat-style prompt using tokenizer template, if available.
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": question},
]
prompt_formatted = tokenizer.apply_chat_template(
messages, add_generation_prompt=True, tokenize=False
)
prompt_len = len(tokenizer(prompt_formatted).input_ids)
# Determine output length from reference answer tokens.
ref_out_len = len(
tokenizer(reference_answer, add_special_tokens=False).input_ids
)
expected_output_len = ref_out_len if dynamic_output else output_len
# Validate sequence lengths.
if not is_valid_sequence(prompt_len, expected_output_len):
continue
sampled_requests.append(
SampleRequest(
prompt=prompt_formatted,
prompt_len=prompt_len,
expected_output_len=expected_output_len,
request_id=request_id_prefix + str(ind),
)
)
ind += 1
self.maybe_oversample_requests(sampled_requests, num_requests,
request_id_prefix, no_oversample)
return sampled_requests
# -----------------------------------------------------------------------------
# Prefix Repetition Dataset Implementation
# -----------------------------------------------------------------------------
class PrefixRepetitionRandomDataset(BenchmarkDataset):
# Default values copied from benchmark_serving.py for the repeated prefix
# dataset.
DEFAULT_PREFIX_LEN = 256
DEFAULT_SUFFIX_LEN = 256
DEFAULT_NUM_PREFIXES = 10
DEFAULT_OUTPUT_LEN = 128
def __init__(
self,
**kwargs,
) -> None:
super().__init__(**kwargs)
random.seed(self.random_seed)
np.random.seed(self.random_seed)
def sample(
self,
tokenizer: PreTrainedTokenizerBase,
num_requests: int,
prefix_len: int = DEFAULT_PREFIX_LEN,
suffix_len: int = DEFAULT_SUFFIX_LEN,
num_prefixes: int = DEFAULT_NUM_PREFIXES,
output_len: int = DEFAULT_OUTPUT_LEN,
request_id_prefix: str = "",
no_oversample: bool = False,
**kwargs,
) -> list[SampleRequest]:
vocab_size = tokenizer.vocab_size
prompts_per_prefix = num_requests // num_prefixes
if prompts_per_prefix == 0:
raise ValueError(
f"num_requests ({num_requests}) must be greater than or equal "
f"to num_prefixes ({num_prefixes})"
)
def _generate_exact_length_tokens(target_length: int) -> list[int]:
"""Generate tokens that decode and re-encode to exactly
target_length."""
# Generate random tokens
tokens = np.random.randint(
0, vocab_size, size=target_length).tolist()
text = tokenizer.decode(tokens)
re_encoded = tokenizer.encode(text, add_special_tokens=False)
if len(re_encoded) == target_length:
return re_encoded
elif len(re_encoded) < target_length:
# Recursively generate additional consistent tokens
needed = target_length - len(re_encoded)
extra_tokens = _generate_exact_length_tokens(needed)
return re_encoded + extra_tokens
else:
# Truncate to target length
return re_encoded[:target_length]
requests = []
for _ in range(num_prefixes):
prefix_tokens = _generate_exact_length_tokens(prefix_len)
for _ in range(prompts_per_prefix):
suffix_tokens = _generate_exact_length_tokens(suffix_len)
combined_tokens = prefix_tokens + suffix_tokens
prompt = tokenizer.decode(combined_tokens)
prompt_len = len(combined_tokens)
requests.append(
SampleRequest(
prompt=prompt,
prompt_len=prompt_len,
expected_output_len=output_len,
)
)
random.shuffle(requests)
return requests
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
r"""Benchmark online serving throughput.
On the server side, run one of the following commands
to launch the vLLM OpenAI API server:
vllm serve <your_model> <engine arguments>
On the client side, run:
vllm bench serve \
--backend <backend or endpoint type. Default 'openai'> \
--label <benchmark result label. Default using backend> \
--model <your_model> \
--dataset-name <dataset_name. Default 'random'> \
--request-rate <request_rate. Default inf> \
--num-prompts <num_prompts. Default 1000>
"""
import argparse
import asyncio
import gc
import importlib.util
import json
import os
import random
import shutil
import time
import warnings
from collections.abc import AsyncGenerator, Iterable
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any, Literal, Optional
import aiohttp
import numpy as np
from tqdm.asyncio import tqdm
from transformers import PreTrainedTokenizerBase
from vllm.benchmarks.datasets import (SampleRequest, add_dataset_parser,
get_samples)
from vllm.benchmarks.lib.endpoint_request_func import (
ASYNC_REQUEST_FUNCS, OPENAI_COMPATIBLE_BACKENDS, RequestFuncInput,
RequestFuncOutput)
from vllm.benchmarks.lib.ready_checker import wait_for_endpoint
from vllm.benchmarks.lib.utils import (convert_to_pytorch_benchmark_format,
write_to_json)
from vllm.transformers_utils.tokenizer import get_tokenizer
MILLISECONDS_TO_SECONDS_CONVERSION = 1000
TERM_PLOTLIB_AVAILABLE = ((importlib.util.find_spec("termplotlib") is not None)
and (shutil.which("gnuplot") is not None))
# TODO: Remove this in v0.11.0
class DeprecatedEndpointTypeAction(argparse.Action):
"""Argparse action for the deprecated --endpoint-type flag.
"""
def __call__(self, _, namespace, values, option_string=None):
warnings.warn(
"'--endpoint-type' is deprecated and will be removed in v0.11.0. "
"Please use '--backend' instead or remove this argument if you "
"have already set it.",
stacklevel=1,
)
setattr(namespace, self.dest, values)
class TaskType(Enum):
GENERATION = "generation"
EMBEDDING = "embedding"
@dataclass
class BenchmarkMetrics:
completed: int
total_input: int
total_output: int
request_throughput: float
request_goodput: float
output_throughput: float
total_token_throughput: float
mean_ttft_ms: float
median_ttft_ms: float
std_ttft_ms: float
percentiles_ttft_ms: list[tuple[float, float]]
mean_tpot_ms: float
median_tpot_ms: float
std_tpot_ms: float
percentiles_tpot_ms: list[tuple[float, float]]
mean_itl_ms: float
median_itl_ms: float
std_itl_ms: float
percentiles_itl_ms: list[tuple[float, float]]
# E2EL stands for end-to-end latency per request.
# It is the time taken on the client side from sending
# a request to receiving a complete response.
mean_e2el_ms: float
median_e2el_ms: float
std_e2el_ms: float
percentiles_e2el_ms: list[tuple[float, float]]
# Max output tokens per second and concurrent requests at that peak
max_output_tokens_per_s: float
max_concurrent_requests: int
@dataclass
class EmbedBenchmarkMetrics:
completed: int
total_input: int
request_throughput: float
total_token_throughput: float
mean_e2el_ms: float
std_e2el_ms: float
median_e2el_ms: float
percentiles_e2el_ms: float
def _get_current_request_rate(
ramp_up_strategy: Optional[Literal["linear", "exponential"]],
ramp_up_start_rps: Optional[int],
ramp_up_end_rps: Optional[int],
request_index: int,
total_requests: int,
request_rate: float,
) -> float:
if (ramp_up_strategy and ramp_up_start_rps is not None
and ramp_up_end_rps is not None):
progress = request_index / max(total_requests - 1, 1)
if ramp_up_strategy == "linear":
increase = (ramp_up_end_rps - ramp_up_start_rps) * progress
return ramp_up_start_rps + increase
elif ramp_up_strategy == "exponential":
ratio = ramp_up_end_rps / ramp_up_start_rps
return ramp_up_start_rps * (ratio**progress)
else:
raise ValueError(f"Unknown ramp-up strategy: {ramp_up_strategy}")
return request_rate
async def get_request(
input_requests: list[SampleRequest],
request_rate: float,
burstiness: float = 1.0,
ramp_up_strategy: Optional[Literal["linear", "exponential"]] = None,
ramp_up_start_rps: Optional[int] = None,
ramp_up_end_rps: Optional[int] = None,
) -> AsyncGenerator[tuple[SampleRequest, float], None]:
"""
Asynchronously generates requests at a specified rate
with OPTIONAL burstiness and OPTIONAL ramp-up strategy.
Args:
input_requests:
A list of input requests, each represented as a SampleRequest.
request_rate:
The rate at which requests are generated (requests/s).
burstiness (optional):
The burstiness factor of the request generation.
Only takes effect when request_rate is not inf.
Default value is 1, which follows a Poisson process.
Otherwise, the request intervals follow a gamma distribution.
A lower burstiness value (0 < burstiness < 1) results
in more bursty requests, while a higher burstiness value
(burstiness > 1) results in a more uniform arrival of requests.
ramp_up_strategy (optional):
The ramp-up strategy. Can be "linear" or "exponential".
If None, uses constant request rate (specified by request_rate).
ramp_up_start_rps (optional):
The starting request rate for ramp-up.
ramp_up_end_rps (optional):
The ending request rate for ramp-up.
"""
assert burstiness > 0, (
f"A positive burstiness factor is expected, but given {burstiness}.")
# Convert to list to get length for ramp-up calculations
if isinstance(input_requests,
Iterable) and not isinstance(input_requests, list):
input_requests = list(input_requests)
total_requests = len(input_requests)
assert total_requests > 0, "No requests provided."
# Precompute delays among requests to minimize request send laggings
request_rates = []
delay_ts = []
for request_index, request in enumerate(input_requests):
current_request_rate = _get_current_request_rate(
ramp_up_strategy, ramp_up_start_rps, ramp_up_end_rps,
request_index, total_requests, request_rate)
request_rates.append(current_request_rate)
if current_request_rate == float("inf"):
delay_ts.append(0)
else:
theta = 1.0 / (current_request_rate * burstiness)
# Sample the request interval from the gamma distribution.
# If burstiness is 1, it follows exponential distribution.
delay_ts.append(np.random.gamma(shape=burstiness, scale=theta))
# Calculate the cumulative delay time from the first sent out requests.
for i in range(1, len(delay_ts)):
delay_ts[i] += delay_ts[i - 1]
if ramp_up_strategy is None and delay_ts[-1] != 0:
# When ramp_up_strategy is not set, we assume the request rate is fixed
# and all requests should be sent in target_total_delay_s, the following
# logic would re-scale delay time to ensure the final delay_ts
# align with target_total_delay_s.
#
# NOTE: If we simply accumulate the random delta values
# from the gamma distribution, their sum would have 1-2% gap
# from target_total_delay_s. The purpose of the following logic is to
# close the gap for stabilizing the throughput data
# from different random seeds.
target_total_delay_s = total_requests / request_rate
normalize_factor = target_total_delay_s / delay_ts[-1]
delay_ts = [delay * normalize_factor for delay in delay_ts]
start_ts = time.time()
for request_index, request in enumerate(input_requests):
if delay_ts[request_index] > 0:
current_ts = time.time()
sleep_interval_s = start_ts + delay_ts[request_index] - current_ts
if sleep_interval_s > 0:
await asyncio.sleep(sleep_interval_s)
yield request, request_rates[request_index]
def calculate_metrics_for_embeddings(
outputs: list[RequestFuncOutput], dur_s: float,
selected_percentiles: list[float]) -> EmbedBenchmarkMetrics:
"""Calculate the metrics for the embedding requests.
Args:
outputs: The outputs of the requests.
dur_s: The duration of the benchmark.
selected_percentiles: The percentiles to select.
Returns:
The calculated benchmark metrics.
"""
total_input = 0
completed = 0
e2els: list[float] = []
for i in range(len(outputs)):
if outputs[i].success:
e2els.append(outputs[i].latency)
completed += 1
total_input += outputs[i].prompt_len
if completed == 0:
warnings.warn(
"All requests failed. This is likely due to a misconfiguration "
"on the benchmark arguments.",
stacklevel=2)
metrics = EmbedBenchmarkMetrics(
completed=completed,
total_input=total_input,
request_throughput=completed / dur_s,
total_token_throughput=total_input / dur_s,
mean_e2el_ms=np.mean(e2els or 0) * 1000,
std_e2el_ms=np.std(e2els or 0) * 1000,
median_e2el_ms=np.median(e2els or 0) * 1000,
percentiles_e2el_ms=[(p, np.percentile(e2els or 0, p) * 1000)
for p in selected_percentiles],
)
return metrics
def calculate_metrics(
input_requests: list[SampleRequest],
outputs: list[RequestFuncOutput],
dur_s: float,
tokenizer: PreTrainedTokenizerBase,
selected_percentiles: list[float],
goodput_config_dict: dict[str, float],
) -> tuple[BenchmarkMetrics, list[int]]:
"""Calculate the metrics for the benchmark.
Args:
input_requests: The input requests.
outputs: The outputs of the requests.
dur_s: The duration of the benchmark.
tokenizer: The tokenizer to use.
selected_percentiles: The percentiles to select.
goodput_config_dict: The goodput configuration.
Returns:
A tuple of the benchmark metrics and the actual output lengths.
"""
actual_output_lens: list[int] = []
total_input = 0
completed = 0
good_completed = 0
itls: list[float] = []
tpots: list[float] = []
all_tpots: list[float] = []
ttfts: list[float] = []
e2els: list[float] = []
for i in range(len(outputs)):
if outputs[i].success:
output_len = outputs[i].output_tokens
if not output_len:
# We use the tokenizer to count the number of output tokens
# for some serving backends instead of looking at
# len(outputs[i].itl) since multiple output tokens may be
# bundled together
# Note : this may inflate the output token count slightly
output_len = len(
tokenizer(outputs[i].generated_text,
add_special_tokens=False).input_ids)
actual_output_lens.append(output_len)
total_input += input_requests[i].prompt_len
tpot = 0
if output_len > 1:
latency_minus_ttft = outputs[i].latency - outputs[i].ttft
tpot = latency_minus_ttft / (output_len - 1)
tpots.append(tpot)
# Note: if output_len <= 1, we regard tpot as 0 for goodput
all_tpots.append(tpot)
itls += outputs[i].itl
ttfts.append(outputs[i].ttft)
e2els.append(outputs[i].latency)
completed += 1
else:
actual_output_lens.append(0)
if goodput_config_dict:
valid_metrics = []
slo_values = []
if "ttft" in goodput_config_dict:
valid_metrics.append(ttfts)
slo_values.append(goodput_config_dict["ttft"] /
MILLISECONDS_TO_SECONDS_CONVERSION)
if "tpot" in goodput_config_dict:
valid_metrics.append(all_tpots)
slo_values.append(goodput_config_dict["tpot"] /
MILLISECONDS_TO_SECONDS_CONVERSION)
if "e2el" in goodput_config_dict:
valid_metrics.append(e2els)
slo_values.append(goodput_config_dict["e2el"] /
MILLISECONDS_TO_SECONDS_CONVERSION)
for req_metric in zip(*valid_metrics):
is_good_req = all([s >= r for s, r in zip(slo_values, req_metric)])
if is_good_req:
good_completed += 1
if completed == 0:
warnings.warn(
"All requests failed. This is likely due to a misconfiguration "
"on the benchmark arguments.",
stacklevel=2)
# Calculate max output tokens per second metric
max_output_tokens_per_s = 0.0
max_concurrent_requests = 0
# Find the time range across all successful requests
successful_outputs = [output for output in outputs if output.success]
if successful_outputs:
min_start_time = min(output.start_time
for output in successful_outputs)
max_end_time = max(output.start_time + output.latency
for output in successful_outputs)
# Create second buckets (ceiling to ensure we capture all time)
duration_seconds = int(np.ceil(max_end_time - min_start_time)) + 1
tokens_per_second = np.zeros(duration_seconds)
concurrent_requests_per_second = np.zeros(duration_seconds)
for i, output in enumerate(successful_outputs):
# Calculate token generation timestamp using
# start_time, ttft, and itl
token_times = [output.start_time + output.ttft]
current_time = token_times[0]
for itl_value in output.itl:
current_time += itl_value
token_times.append(current_time)
# Add tokens to second buckets
for token_time in token_times:
second_bucket = int(token_time - min_start_time)
if 0 <= second_bucket < duration_seconds:
tokens_per_second[second_bucket] += 1
# Track concurrent requests for each second this request was active
request_start_second = int(output.start_time - min_start_time)
request_end_second = int((output.start_time + output.latency) -
min_start_time)
for second in range(request_start_second, request_end_second + 1):
concurrent_requests_per_second[second] += 1
# Find the maximum tokens per second and corresponding
# concurrent requests
if len(tokens_per_second) > 0:
max_output_tokens_per_s = float(np.max(tokens_per_second))
max_concurrent_requests = int(
np.max(concurrent_requests_per_second))
if TERM_PLOTLIB_AVAILABLE:
import termplotlib as tpl
fig = tpl.figure()
fig.plot(np.arange(len(tokens_per_second)),
tokens_per_second,
title="Output tokens per second")
fig.plot(np.arange(len(concurrent_requests_per_second)),
concurrent_requests_per_second,
title="Concurrent requests per second")
fig.show()
else:
print("tip: install termplotlib and gnuplot to plot the metrics")
metrics = BenchmarkMetrics(
completed=completed,
total_input=total_input,
total_output=sum(actual_output_lens),
request_throughput=completed / dur_s,
request_goodput=good_completed / dur_s,
output_throughput=sum(actual_output_lens) / dur_s,
total_token_throughput=(total_input + sum(actual_output_lens)) / dur_s,
mean_ttft_ms=np.mean(ttfts or 0) *
1000, # ttfts is empty if streaming is not supported by the endpoint
std_ttft_ms=np.std(ttfts or 0) * 1000,
median_ttft_ms=np.median(ttfts or 0) * 1000,
percentiles_ttft_ms=[(p, np.percentile(ttfts or 0, p) * 1000)
for p in selected_percentiles],
mean_tpot_ms=np.mean(tpots or 0) * 1000,
std_tpot_ms=np.std(tpots or 0) * 1000,
median_tpot_ms=np.median(tpots or 0) * 1000,
percentiles_tpot_ms=[(p, np.percentile(tpots or 0, p) * 1000)
for p in selected_percentiles],
mean_itl_ms=np.mean(itls or 0) * 1000,
std_itl_ms=np.std(itls or 0) * 1000,
median_itl_ms=np.median(itls or 0) * 1000,
percentiles_itl_ms=[(p, np.percentile(itls or 0, p) * 1000)
for p in selected_percentiles],
mean_e2el_ms=np.mean(e2els or 0) * 1000,
std_e2el_ms=np.std(e2els or 0) * 1000,
median_e2el_ms=np.median(e2els or 0) * 1000,
percentiles_e2el_ms=[(p, np.percentile(e2els or 0, p) * 1000)
for p in selected_percentiles],
max_output_tokens_per_s=max_output_tokens_per_s,
max_concurrent_requests=max_concurrent_requests,
)
return metrics, actual_output_lens
async def benchmark(
endpoint_type: str,
api_url: str,
base_url: str,
model_id: str,
model_name: str,
tokenizer: PreTrainedTokenizerBase,
input_requests: list[SampleRequest],
logprobs: Optional[int],
request_rate: float,
burstiness: float,
disable_tqdm: bool,
profile: bool,
selected_percentile_metrics: list[str],
selected_percentiles: list[float],
ignore_eos: bool,
goodput_config_dict: dict[str, float],
max_concurrency: Optional[int],
lora_modules: Optional[Iterable[str]],
extra_headers: Optional[dict],
extra_body: Optional[dict],
ramp_up_strategy: Optional[Literal["linear", "exponential"]] = None,
ramp_up_start_rps: Optional[int] = None,
ramp_up_end_rps: Optional[int] = None,
ready_check_timeout_sec: int = 600,
):
task_type = (TaskType.EMBEDDING if api_url.endswith("/v1/embeddings") else
TaskType.GENERATION)
if endpoint_type in ASYNC_REQUEST_FUNCS:
if task_type == TaskType.EMBEDDING:
request_func = ASYNC_REQUEST_FUNCS["openai-embeddings"]
else:
request_func = ASYNC_REQUEST_FUNCS[endpoint_type]
else:
raise ValueError(f"Unknown backend: {endpoint_type}")
# Reuses connections across requests to reduce TLS handshake overhead.
connector = aiohttp.TCPConnector(
limit=max_concurrency or 0,
limit_per_host=max_concurrency or 0,
ttl_dns_cache=300,
use_dns_cache=True,
keepalive_timeout=60,
enable_cleanup_closed=True,
force_close=False,
ssl=("https://" in api_url),
)
session = aiohttp.ClientSession(
connector=connector,
trust_env=True,
timeout=aiohttp.ClientTimeout(total=6 * 60 * 60),
)
print("Starting initial single prompt test run...")
test_prompt, test_prompt_len, test_output_len, test_mm_content = (
input_requests[0].prompt,
input_requests[0].prompt_len,
input_requests[0].expected_output_len,
input_requests[0].multi_modal_data,
)
assert (test_mm_content is None or isinstance(test_mm_content, dict)
or (isinstance(test_mm_content, list)
and all(isinstance(item, dict) for item in test_mm_content))
), "multi_modal_data must be a dict or list[dict]"
test_input = RequestFuncInput(
model=model_id,
model_name=model_name,
prompt=test_prompt,
api_url=api_url,
prompt_len=test_prompt_len,
output_len=test_output_len,
logprobs=logprobs,
multi_modal_content=test_mm_content,
ignore_eos=ignore_eos,
extra_headers=extra_headers,
extra_body=extra_body,
)
if ready_check_timeout_sec > 0:
test_output = await wait_for_endpoint(
request_func,
test_input,
session,
timeout_seconds=ready_check_timeout_sec,
)
if not test_output.success:
raise ValueError(
"Initial test run failed - Please make sure benchmark "
"arguments are correctly specified. "
f"Error: {test_output.error}")
else:
print("Initial test run completed. Starting main benchmark run...")
else:
print("Skipping endpoint ready check.")
if lora_modules:
# For each input request, choose a LoRA module at random.
lora_modules = iter(
[random.choice(lora_modules) for _ in range(len(input_requests))])
if profile:
print("Starting profiler...")
profile_input = RequestFuncInput(model=model_id,
model_name=model_name,
prompt=test_prompt,
api_url=base_url + "/start_profile",
prompt_len=test_prompt_len,
output_len=test_output_len,
logprobs=logprobs,
multi_modal_content=test_mm_content,
ignore_eos=ignore_eos,
extra_headers=extra_headers,
extra_body=extra_body)
profile_output = await request_func(request_func_input=profile_input,
session=session)
if profile_output.success:
print("Profiler started")
distribution = ("Poisson process"
if burstiness == 1.0 else "Gamma distribution")
if ramp_up_strategy is not None:
print(f"Traffic ramp-up strategy: {ramp_up_strategy}.")
print(f"Will increase RPS from {ramp_up_start_rps} to "
f"{ramp_up_end_rps} RPS over the duration of the benchmark.")
else:
print(f"Traffic request rate: {request_rate}")
print(f"Burstiness factor: {burstiness} ({distribution})")
print(f"Maximum request concurrency: {max_concurrency}")
pbar = None if disable_tqdm else tqdm(total=len(input_requests))
# This can be used once the minimum Python version is 3.10 or higher,
# and it will simplify the code in limited_request_func.
# semaphore = (asyncio.Semaphore(max_concurrency)
# if max_concurrency else contextlib.nullcontext())
semaphore = (asyncio.Semaphore(max_concurrency)
if max_concurrency else None)
async def limited_request_func(request_func_input, session, pbar):
if semaphore is None:
return await request_func(request_func_input=request_func_input,
session=session,
pbar=pbar)
async with semaphore:
return await request_func(request_func_input=request_func_input,
session=session,
pbar=pbar)
benchmark_start_time = time.perf_counter()
tasks: list[asyncio.Task] = []
rps_change_events = []
last_int_rps = -1
if ramp_up_strategy is not None and ramp_up_start_rps is not None:
last_int_rps = ramp_up_start_rps
rps_change_events.append({
"rps": last_int_rps,
"timestamp": datetime.now().isoformat(),
})
async for request, current_request_rate in get_request(
input_requests, request_rate, burstiness, ramp_up_strategy,
ramp_up_start_rps, ramp_up_end_rps):
if ramp_up_strategy is not None:
current_int_rps = int(current_request_rate)
if current_int_rps > last_int_rps:
timestamp = datetime.now().isoformat()
for rps_val in range(last_int_rps + 1, current_int_rps + 1):
rps_change_events.append({
"rps": rps_val,
"timestamp": timestamp
})
last_int_rps = current_int_rps
prompt, prompt_len, output_len, mm_content, request_id = (
request.prompt,
request.prompt_len,
request.expected_output_len,
request.multi_modal_data,
request.request_id,
)
req_model_id, req_model_name = model_id, model_name
if lora_modules:
req_lora_module = next(lora_modules)
req_model_id, req_model_name = req_lora_module, req_lora_module
request_func_input = RequestFuncInput(
model=req_model_id,
model_name=req_model_name,
prompt=prompt,
api_url=api_url,
prompt_len=prompt_len,
output_len=output_len,
logprobs=logprobs,
multi_modal_content=mm_content,
ignore_eos=ignore_eos,
extra_headers=extra_headers,
extra_body=extra_body,
request_id=request_id,
)
tasks.append(
asyncio.create_task(
limited_request_func(request_func_input=request_func_input,
session=session,
pbar=pbar)))
outputs: list[RequestFuncOutput] = await asyncio.gather(*tasks)
if pbar is not None:
pbar.close()
benchmark_duration = time.perf_counter() - benchmark_start_time
if task_type == TaskType.GENERATION:
metrics, actual_output_lens = calculate_metrics(
input_requests=input_requests,
outputs=outputs,
dur_s=benchmark_duration,
tokenizer=tokenizer,
selected_percentiles=selected_percentiles,
goodput_config_dict=goodput_config_dict,
)
else:
metrics = calculate_metrics_for_embeddings(
outputs=outputs,
dur_s=benchmark_duration,
selected_percentiles=selected_percentiles,
)
actual_output_lens = 0
print("{s:{c}^{n}}".format(s=' Serving Benchmark Result ', n=50, c='='))
print("{:<40} {:<10}".format("Successful requests:", metrics.completed))
if max_concurrency is not None:
print("{:<40} {:<10}".format("Maximum request concurrency:",
max_concurrency))
if request_rate != float('inf'):
print("{:<40} {:<10.2f}".format("Request rate configured (RPS):",
request_rate))
print("{:<40} {:<10.2f}".format("Benchmark duration (s):",
benchmark_duration))
print("{:<40} {:<10}".format("Total input tokens:", metrics.total_input))
if isinstance(metrics, BenchmarkMetrics):
print("{:<40} {:<10}".format("Total generated tokens:",
metrics.total_output))
print("{:<40} {:<10.2f}".format("Request throughput (req/s):",
metrics.request_throughput))
if goodput_config_dict:
print("{:<40} {:<10.2f}".format("Request goodput (req/s):",
metrics.request_goodput))
if isinstance(metrics, BenchmarkMetrics):
print("{:<40} {:<10.2f}".format("Output token throughput (tok/s):",
metrics.output_throughput))
print("{:<40} {:<10.2f}".format(
"Peak output token throughput (tok/s):",
metrics.max_output_tokens_per_s))
print("{:<40} {:<10.2f}".format("Peak concurrent requests:",
metrics.max_concurrent_requests))
print("{:<40} {:<10.2f}".format("Total Token throughput (tok/s):",
metrics.total_token_throughput))
if isinstance(metrics, BenchmarkMetrics):
result = {
"duration": benchmark_duration,
"completed": metrics.completed,
"total_input_tokens": metrics.total_input,
"total_output_tokens": metrics.total_output,
"request_throughput": metrics.request_throughput,
"request_goodput":
metrics.request_goodput if goodput_config_dict else None,
"output_throughput": metrics.output_throughput,
"total_token_throughput": metrics.total_token_throughput,
"input_lens": [output.prompt_len for output in outputs],
"output_lens": actual_output_lens,
"ttfts": [output.ttft for output in outputs],
"itls": [output.itl for output in outputs],
"generated_texts": [output.generated_text for output in outputs],
"errors": [output.error for output in outputs],
"max_output_tokens_per_s": metrics.max_output_tokens_per_s,
"max_concurrent_requests": metrics.max_concurrent_requests,
}
else:
result = {
"duration": benchmark_duration,
"completed": metrics.completed,
"total_input_tokens": metrics.total_input,
"request_throughput": metrics.request_throughput,
"total_token_throughput": metrics.total_token_throughput,
"input_lens": [output.prompt_len for output in outputs],
"errors": [output.error for output in outputs],
}
if rps_change_events:
result["rps_change_events"] = rps_change_events
def process_one_metric(
# E.g., "ttft"
metric_attribute_name: str,
# E.g., "TTFT"
metric_name: str,
# E.g., "Time to First Token"
metric_header: str,
):
# This function prints and adds statistics of the specified
# metric.
if metric_attribute_name not in selected_percentile_metrics:
return
print("{s:{c}^{n}}".format(s=metric_header, n=50, c='-'))
print("{:<40} {:<10.2f}".format(
f"Mean {metric_name} (ms):",
getattr(metrics, f"mean_{metric_attribute_name}_ms")))
print("{:<40} {:<10.2f}".format(
f"Median {metric_name} (ms):",
getattr(metrics, f"median_{metric_attribute_name}_ms")))
result[f"mean_{metric_attribute_name}_ms"] = getattr(
metrics, f"mean_{metric_attribute_name}_ms")
result[f"median_{metric_attribute_name}_ms"] = getattr(
metrics, f"median_{metric_attribute_name}_ms")
result[f"std_{metric_attribute_name}_ms"] = getattr(
metrics, f"std_{metric_attribute_name}_ms")
for p, value in getattr(metrics,
f"percentiles_{metric_attribute_name}_ms"):
p_word = str(int(p)) if int(p) == p else str(p)
print("{:<40} {:<10.2f}".format(f"P{p_word} {metric_name} (ms):",
value))
result[f"p{p_word}_{metric_attribute_name}_ms"] = value
if task_type == TaskType.GENERATION:
process_one_metric("ttft", "TTFT", "Time to First Token")
process_one_metric("tpot", "TPOT",
"Time per Output Token (excl. 1st token)")
process_one_metric("itl", "ITL", "Inter-token Latency")
process_one_metric("e2el", "E2EL", "End-to-end Latency")
print("=" * 50)
if profile:
print("Stopping profiler...")
profile_input = RequestFuncInput(
model=model_id,
prompt=test_prompt,
api_url=base_url + "/stop_profile",
prompt_len=test_prompt_len,
output_len=test_output_len,
logprobs=logprobs,
)
profile_output = await request_func(request_func_input=profile_input,
session=session)
if profile_output.success:
print("Profiler stopped")
await session.close()
return result
def check_goodput_args(args):
# Check and parse goodput arguments
goodput_config_dict = {}
VALID_NAMES = ["ttft", "tpot", "e2el"]
if args.goodput:
goodput_config_dict = parse_goodput(args.goodput)
for slo_name, slo_val in goodput_config_dict.items():
if slo_name not in VALID_NAMES:
raise ValueError(
f"Invalid metric name found, {slo_name}: {slo_val}. "
"The service level objective name should be one of "
f"{str(VALID_NAMES)}. ")
if slo_val < 0:
raise ValueError(
f"Invalid value found, {slo_name}: {slo_val}. "
"The service level objective value should be "
"non-negative.")
return goodput_config_dict
def parse_goodput(slo_pairs):
goodput_config_dict = {}
try:
for slo_pair in slo_pairs:
slo_name, slo_val = slo_pair.split(":")
goodput_config_dict[slo_name] = float(slo_val)
except ValueError as err:
raise argparse.ArgumentTypeError(
"Invalid format found for service level objectives. "
"Specify service level objectives for goodput as \"KEY:VALUE\" "
"pairs, where the key is a metric name, and the value is a "
"number in milliseconds.") from err
return goodput_config_dict
def save_to_pytorch_benchmark_format(args: argparse.Namespace,
results: dict[str, Any],
file_name: str) -> None:
metrics = [
"median_ttft_ms", "mean_ttft_ms", "std_ttft_ms", "p99_ttft_ms",
"mean_tpot_ms", "median_tpot_ms", "std_tpot_ms", "p99_tpot_ms",
"median_itl_ms", "mean_itl_ms", "std_itl_ms", "p99_itl_ms"
]
# These raw data might be useful, but they are rather big. They can be added
# later if needed
ignored_metrics = ["ttfts", "itls", "generated_texts", "errors"]
pt_records = convert_to_pytorch_benchmark_format(
args=args,
metrics={k: [results[k]]
for k in metrics if k in results},
extra_info={
k: results[k]
for k in results if k not in metrics and k not in ignored_metrics
})
if pt_records:
# Don't use json suffix here as we don't want CI to pick it up
pt_file = f"{os.path.splitext(file_name)[0]}.pytorch.json"
write_to_json(pt_file, pt_records)
def add_cli_args(parser: argparse.ArgumentParser):
add_dataset_parser(parser)
parser.add_argument(
"--label",
type=str,
default=None,
help="The label (prefix) of the benchmark results. If not specified, "
"the value of '--backend' will be used as the label.",
)
parser.add_argument(
"--backend",
type=str,
default="openai",
choices=list(ASYNC_REQUEST_FUNCS.keys()),
help="The type of backend or endpoint to use for the benchmark."
)
parser.add_argument(
"--endpoint-type",
type=str,
default=None,
choices=list(ASYNC_REQUEST_FUNCS.keys()),
action=DeprecatedEndpointTypeAction,
help="'--endpoint-type' is deprecated and will be removed in v0.11.0. "
"Please use '--backend' instead.",
)
parser.add_argument(
"--base-url",
type=str,
default=None,
help="Server or API base url if not using http host and port.",
)
# Use 127.0.0.1 here instead of localhost to force the use of ipv4
parser.add_argument("--host", type=str, default="127.0.0.1")
parser.add_argument("--port", type=int, default=8000)
parser.add_argument(
"--endpoint",
type=str,
default="/v1/completions",
help="API endpoint.",
)
parser.add_argument(
"--header",
metavar="KEY=VALUE",
nargs="*",
help="Key-value pairs (e.g, --header x-additional-info=0.3.3) "
"for headers to be passed with each request. These headers override " \
"per backend constants and values set via environment variable, and " \
"will be overriden by other arguments (such as request ids)."
)
parser.add_argument(
"--max-concurrency",
type=int,
default=None,
help="Maximum number of concurrent requests. This can be used "
"to help simulate an environment where a higher level component "
"is enforcing a maximum number of concurrent requests. While the "
"--request-rate argument controls the rate at which requests are "
"initiated, this argument will control how many are actually allowed "
"to execute at a time. This means that when used in combination, the "
"actual request rate may be lower than specified with --request-rate, "
"if the server is not processing requests fast enough to keep up.",
)
parser.add_argument(
"--model",
type=str,
required=True,
help="Name of the model.",
)
parser.add_argument(
"--tokenizer",
type=str,
help=
"Name or path of the tokenizer, if not using the default tokenizer.", # noqa: E501
)
parser.add_argument("--use-beam-search", action="store_true")
parser.add_argument(
"--logprobs",
type=int,
default=None,
help=("Number of logprobs-per-token to compute & return as part of "
"the request. If unspecified, then either (1) if beam search "
"is disabled, no logprobs are computed & a single dummy "
"logprob is returned for each token; or (2) if beam search "
"is enabled 1 logprob per token is computed"),
)
parser.add_argument(
"--request-rate",
type=float,
default=float("inf"),
help="Number of requests per second. If this is inf, "
"then all the requests are sent at time 0. "
"Otherwise, we use Poisson process or gamma distribution "
"to synthesize the request arrival times.",
)
parser.add_argument(
"--burstiness",
type=float,
default=1.0,
help="Burstiness factor of the request generation. "
"Only take effect when request_rate is not inf. "
"Default value is 1, which follows Poisson process. "
"Otherwise, the request intervals follow a gamma distribution. "
"A lower burstiness value (0 < burstiness < 1) results in more "
"bursty requests. A higher burstiness value (burstiness > 1) "
"results in a more uniform arrival of requests.",
)
parser.add_argument(
"--trust-remote-code",
action="store_true",
help="Trust remote code from huggingface",
)
parser.add_argument(
"--disable-tqdm",
action="store_true",
help="Specify to disable tqdm progress bar.",
)
parser.add_argument(
"--profile",
action="store_true",
help="Use Torch Profiler. The endpoint must be launched with "
"VLLM_TORCH_PROFILER_DIR to enable profiler.",
)
parser.add_argument(
"--save-result",
action="store_true",
help="Specify to save benchmark results to a json file",
)
parser.add_argument(
"--save-detailed",
action="store_true",
help="When saving the results, whether to include per request "
"information such as response, error, ttfs, tpots, etc.",
)
parser.add_argument(
"--append-result",
action="store_true",
help="Append the benchmark result to the existing json file.",
)
parser.add_argument(
"--metadata",
metavar="KEY=VALUE",
nargs="*",
help="Key-value pairs (e.g, --metadata version=0.3.3 tp=1) "
"for metadata of this run to be saved in the result JSON file "
"for record keeping purposes.",
)
parser.add_argument(
"--result-dir",
type=str,
default=None,
help="Specify directory to save benchmark json results."
"If not specified, results are saved in the current directory.",
)
parser.add_argument(
"--result-filename",
type=str,
default=None,
help="Specify the filename to save benchmark json results."
"If not specified, results will be saved in "
"{label}-{args.request_rate}qps-{base_model_id}-{current_dt}.json" # noqa
" format.",
)
parser.add_argument(
"--ignore-eos",
action="store_true",
help="Set ignore_eos flag when sending the benchmark request."
"Warning: ignore_eos is not supported in deepspeed_mii and tgi.")
parser.add_argument(
"--percentile-metrics",
type=str,
default="ttft,tpot,itl",
help="Comma-separated list of selected metrics to report percentils. "
"This argument specifies the metrics to report percentiles. "
"Allowed metric names are \"ttft\", \"tpot\", \"itl\", \"e2el\". ")
parser.add_argument(
"--metric-percentiles",
type=str,
default="99",
help="Comma-separated list of percentiles for selected metrics. "
"To report 25-th, 50-th, and 75-th percentiles, use \"25,50,75\". "
"Default value is \"99\"."
"Use \"--percentile-metrics\" to select metrics.",
)
parser.add_argument(
"--goodput",
nargs="+",
required=False,
help="Specify service level objectives for goodput as \"KEY:VALUE\" "
"pairs, where the key is a metric name, and the value is in "
"milliseconds. Multiple \"KEY:VALUE\" pairs can be provided, "
"separated by spaces. Allowed request level metric names are "
"\"ttft\", \"tpot\", \"e2el\". For more context on the definition of "
"goodput, refer to DistServe paper: https://arxiv.org/pdf/2401.09670 "
"and the blog: https://hao-ai-lab.github.io/blogs/distserve",
)
parser.add_argument(
"--request-id-prefix",
type=str,
required=False,
default="benchmark-serving",
help="Specify the prefix of request id.",
)
sampling_group = parser.add_argument_group("sampling parameters")
sampling_group.add_argument(
"--top-p",
type=float,
default=None,
help="Top-p sampling parameter. Only has effect on "
"openai-compatible backends.",
)
sampling_group.add_argument(
"--top-k",
type=int,
default=None,
help="Top-k sampling parameter. Only has effect on "
"openai-compatible backends.",
)
sampling_group.add_argument(
"--min-p",
type=float,
default=None,
help="Min-p sampling parameter. Only has effect on "
"openai-compatible backends.",
)
sampling_group.add_argument(
"--temperature",
type=float,
default=None,
help="Temperature sampling parameter. Only has effect on "
"openai-compatible backends. If not specified, default to greedy "
"decoding (i.e. temperature==0.0).",
)
parser.add_argument(
'--tokenizer-mode',
type=str,
default="auto",
choices=['auto', 'slow', 'mistral', 'custom'],
help='The tokenizer mode.\n\n* "auto" will use the '
'fast tokenizer if available.\n* "slow" will '
'always use the slow tokenizer. \n* '
'"mistral" will always use the `mistral_common` tokenizer. \n*'
'"custom" will use --tokenizer to select the preregistered tokenizer.')
parser.add_argument("--served-model-name",
type=str,
default=None,
help="The model name used in the API. "
"If not specified, the model name will be the "
"same as the ``--model`` argument. ")
parser.add_argument("--lora-modules",
nargs='+',
default=None,
help="A subset of LoRA module names passed in when "
"launching the server. For each request, the "
"script chooses a LoRA module at random.")
parser.add_argument(
"--ramp-up-strategy",
type=str,
default=None,
choices=["linear", "exponential"],
help="The ramp-up strategy. This would be used to "
"ramp up the request rate from initial RPS to final "
"RPS rate (specified by --ramp-up-start-rps and "
"--ramp-up-end-rps.) over the duration of the benchmark.")
parser.add_argument(
"--ramp-up-start-rps",
type=int,
default=None,
help="The starting request rate for ramp-up (RPS). "
"Needs to be specified when --ramp-up-strategy is used.",
)
parser.add_argument(
"--ramp-up-end-rps",
type=int,
default=None,
help="The ending request rate for ramp-up (RPS). "
"Needs to be specified when --ramp-up-strategy is used.",
)
parser.add_argument(
"--ready-check-timeout-sec",
type=int,
default=600,
help="Maximum time to wait for the endpoint to become ready "
"in seconds (default: 600 seconds / 10 minutes). If set to 0, "
"the ready check will be skipped."
)
def main(args: argparse.Namespace) -> dict[str, Any]:
return asyncio.run(main_async(args))
async def main_async(args: argparse.Namespace) -> dict[str, Any]:
print(args)
random.seed(args.seed)
np.random.seed(args.seed)
# Validate ramp-up arguments
if args.ramp_up_strategy is not None:
if args.request_rate != float("inf"):
raise ValueError(
"When using ramp-up, do not specify --request-rate. "
"The request rate will be controlled by ramp-up parameters. "
"Please remove the --request-rate argument.")
if args.ramp_up_start_rps is None or args.ramp_up_end_rps is None:
raise ValueError(
"When using --ramp-up-strategy, both --ramp-up-start-rps and "
"--ramp-up-end-rps must be specified")
if args.ramp_up_start_rps < 0 or args.ramp_up_end_rps < 0:
raise ValueError("Ramp-up start and end RPS must be non-negative")
if args.ramp_up_start_rps > args.ramp_up_end_rps:
raise ValueError("Ramp-up start RPS must be less than end RPS")
if (args.ramp_up_strategy == "exponential"
and args.ramp_up_start_rps == 0):
raise ValueError(
"For exponential ramp-up, the start RPS cannot be 0.")
label = args.label
model_id = args.model
model_name = args.served_model_name
tokenizer_id = args.tokenizer if args.tokenizer is not None else args.model
tokenizer_mode = args.tokenizer_mode
if args.base_url is not None:
api_url = f"{args.base_url}{args.endpoint}"
base_url = f"{args.base_url}"
else:
api_url = f"http://{args.host}:{args.port}{args.endpoint}"
base_url = f"http://{args.host}:{args.port}"
# Headers
headers = None
if args.header:
headers = {}
for item in args.header:
if "=" in item:
kvstring = item.split("=", 1)
headers[kvstring[0].strip()] = kvstring[1].strip()
else:
raise ValueError(
"Invalid header format. Please use KEY=VALUE format.")
tokenizer = get_tokenizer(tokenizer_id,
tokenizer_mode=tokenizer_mode,
trust_remote_code=args.trust_remote_code)
if args.dataset_name is None:
raise ValueError(
"Please specify '--dataset-name' and the corresponding "
"'--dataset-path' if required.")
# Load the dataset.
input_requests = get_samples(args, tokenizer)
goodput_config_dict = check_goodput_args(args)
# Collect the sampling parameters.
sampling_params = {
k: v
for k, v in {
"top_p": args.top_p,
"top_k": args.top_k,
"min_p": args.min_p,
"temperature": args.temperature,
}.items() if v is not None
}
# Sampling parameters are only supported by openai-compatible backend.
if sampling_params and args.backend not in OPENAI_COMPATIBLE_BACKENDS:
raise ValueError("Sampling parameters are only supported by "
"openai-compatible backends.")
if "temperature" not in sampling_params:
sampling_params["temperature"] = 0.0 # Default to greedy decoding.
# Avoid GC processing "static" data - reduce pause times.
gc.collect()
gc.freeze()
benchmark_result = await benchmark(
endpoint_type=args.backend,
api_url=api_url,
base_url=base_url,
model_id=model_id,
model_name=model_name,
tokenizer=tokenizer,
input_requests=input_requests,
logprobs=args.logprobs,
request_rate=args.request_rate,
burstiness=args.burstiness,
disable_tqdm=args.disable_tqdm,
profile=args.profile,
selected_percentile_metrics=args.percentile_metrics.split(","),
selected_percentiles=[
float(p) for p in args.metric_percentiles.split(",")
],
ignore_eos=args.ignore_eos,
goodput_config_dict=goodput_config_dict,
max_concurrency=args.max_concurrency,
lora_modules=args.lora_modules,
extra_headers=headers,
extra_body=sampling_params,
ramp_up_strategy=args.ramp_up_strategy,
ramp_up_start_rps=args.ramp_up_start_rps,
ramp_up_end_rps=args.ramp_up_end_rps,
ready_check_timeout_sec=args.ready_check_timeout_sec,
)
# Save config and results to json
result_json: dict[str, Any] = {}
# Setup
current_dt = datetime.now().strftime("%Y%m%d-%H%M%S")
result_json["date"] = current_dt
result_json["endpoint_type"] = args.backend # for backward compatibility
result_json["backend"] = args.backend
result_json["label"] = label
result_json["model_id"] = model_id
result_json["tokenizer_id"] = tokenizer_id
result_json["num_prompts"] = args.num_prompts
# Metadata
if args.metadata:
for item in args.metadata:
if "=" in item:
kvstring = item.split("=", 1)
result_json[kvstring[0].strip()] = kvstring[1].strip()
else:
raise ValueError(
"Invalid metadata format. Please use KEY=VALUE format.")
# Traffic
result_json["request_rate"] = (args.request_rate if args.request_rate
< float("inf") else "inf")
result_json["burstiness"] = args.burstiness
result_json["max_concurrency"] = args.max_concurrency
if args.ramp_up_strategy is not None:
result_json["ramp_up_strategy"] = args.ramp_up_strategy
result_json["ramp_up_start_rps"] = args.ramp_up_start_rps
result_json["ramp_up_end_rps"] = args.ramp_up_end_rps
# Merge with benchmark result
result_json = {**result_json, **benchmark_result}
if input_requests and hasattr(input_requests[0], 'metadata'):
# 提取所有样本的 metadata
dataset_metadata = [req.metadata for req in input_requests]
result_json["dataset_metadata"] = dataset_metadata
if not args.save_detailed:
# Remove fields with too many data points
for field in [
"input_lens",
"output_lens",
"ttfts",
"itls",
"generated_texts",
"errors",
]:
if field in result_json:
del result_json[field]
if field in benchmark_result:
del benchmark_result[field]
# Save to file
if args.save_result or args.append_result:
base_model_id = model_id.split("/")[-1]
max_concurrency_str = (f"-concurrency{args.max_concurrency}"
if args.max_concurrency is not None else "")
label = label or args.backend
if args.ramp_up_strategy is not None:
file_name = f"{label}-ramp-up-{args.ramp_up_strategy}-{args.ramp_up_start_rps}qps-{args.ramp_up_end_rps}qps{max_concurrency_str}-{base_model_id}-{current_dt}.json" # noqa
else:
file_name = f"{label}-{args.request_rate}qps{max_concurrency_str}-{base_model_id}-{current_dt}.json" # noqa
if args.result_filename:
file_name = args.result_filename
if args.result_dir:
os.makedirs(args.result_dir, exist_ok=True)
file_name = os.path.join(args.result_dir, file_name)
with open(file_name,
mode="a+" if args.append_result else "w",
encoding="utf-8") as outfile:
# Append a newline.
if args.append_result and outfile.tell() != 0:
outfile.write("\n")
json.dump(result_json, outfile)
save_to_pytorch_benchmark_format(args, result_json, file_name)
return result_json
export VLLM_NUMA_BIND=1
export VLLM_RANK0_NUMA=0
export VLLM_RANK1_NUMA=0
export VLLM_RANK2_NUMA=0
export VLLM_RANK3_NUMA=0
export HSA_FORCE_FINE_GRAIN_PCIE=1
export NCCL_MIN_NCHANNELS=16
export NCCL_MAX_NCHANNELS=16
export NCCL_P2P_LEVEL=SYS
export NCCL_LAUNCH_MODE=GROUP
export ALLREDUCE_STREAM_WITH_COMPUTE=1
export VLLM_RPC_TIMEOUT=1800000
export VLLM_ZERO_OVERHEAD=1
export VLLM_ZERO_OPT_ZEROS=1
# 测试Qwen3-30B-A3B所需环境变量
export VLLM_USE_FUSED_RMS_ROPE=1
export VLLM_USE_MARLIN_W16A16_MOE=1
# 测试Qwen3-Next需要环境变量
export VLLM_USE_NN=0
export TRITON_MOVE_LOAD_TOFRONT_DOT=0
export HIP_VISIBLE_DEVICES=6,7
vllm serve /data2/models/qwen3-8B \
--served-model-name qwen3-8B \
--host 0.0.0.0 \
--port 8000 \
--trust-remote-code \
--tensor-parallel-size 2
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment