Commit 8b2e8ec0 authored by sunzhq2's avatar sunzhq2
Browse files

init evalscope

parent 2a7c435f
# Copyright (c) Alibaba, Inc. and its affiliates.
"""
Default evaluator implementation for running benchmark evaluations.
This module provides the DefaultEvaluator class which orchestrates the entire
evaluation process including data loading, model inference, metric calculation,
and report generation.
"""
import os
import threading
import traceback
from collections import defaultdict
from dataclasses import dataclass
from time import monotonic, perf_counter, sleep
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from evalscope.api.dataset import Dataset, Sample
from evalscope.api.evaluator import CacheManager, Evaluator, TaskState
from evalscope.api.metric import AggScore, SampleScore
from evalscope.api.registry import register_evaluator
from evalscope.constants import HEARTBEAT_INTERVAL_SEC
from evalscope.evaluator.batch_reviewer import BatchReviewer
from evalscope.report import Report, gen_table
from evalscope.utils.function_utils import run_in_threads_with_progress
from evalscope.utils.logger import get_logger
if TYPE_CHECKING:
from evalscope.api.benchmark import DataAdapter
from evalscope.api.model import Model
from evalscope.config import TaskConfig
from evalscope.utils.io_utils import OutputsStructure
logger = get_logger()
@dataclass
class _WorkItem:
"""
A work item for the unified evaluation pool.
Represents a single unit of work in the evaluation pipeline:
- If ``task_state`` is None: run full predict + review for ``sample``.
- If ``task_state`` is provided: prediction is already cached; run review only.
"""
subset: str
"""Subset this item belongs to."""
sample: Optional[Sample] = None
"""Sample to predict. Set when prediction is required."""
task_state: Optional[TaskState] = None
"""Cached task state. Set when only review is required."""
@property
def needs_predict(self) -> bool:
"""True when prediction has not yet been computed."""
return self.task_state is None
@dataclass
class _PoolContext:
"""
Carries state from :meth:`~DefaultEvaluator._collect_work_items` through
the evaluation phases, acting as the shared data contract between phases.
Attributes:
work_items: Work items to process in the unified pool.
cached_scores_by_subset: Fully cached (predict + review) scores per subset.
review_pending_by_subset: TaskStates whose prediction is cached but review
is still needed; populated only for ``use_batch_scoring`` benchmarks.
model_prediction_dir: Shared parent directory for prediction JSONL files.
total_cached: Number of samples already fully cached (skipped in pool).
"""
work_items: List[_WorkItem]
cached_scores_by_subset: Dict[str, List[SampleScore]]
review_pending_by_subset: Dict[str, List[TaskState]]
model_prediction_dir: str
total_cached: int
@property
def grand_total(self) -> int:
"""Total sample count across all subsets: cached + work-pool items."""
return self.total_cached + len(self.work_items)
@dataclass
class _PerfAccumulator:
"""
Thread-safe runtime metric accumulator for evaluator mode.
Metrics are aligned with perf module naming:
- TTFT: time to first token (seconds)
- TPOT: time per output token (seconds)
- OUTPUT_THROUGHPUT: output tokens per second
"""
total_samples: int = 0
samples_with_latency: int = 0
samples_with_ttft: int = 0
samples_with_tpot: int = 0
total_output_tokens: int = 0
total_latency: float = 0.0
window_start: Optional[float] = None
window_end: Optional[float] = None
ttft_values: List[float] = None
tpot_values: List[float] = None
output_throughput_values: List[float] = None
def __post_init__(self):
self.ttft_values = []
self.tpot_values = []
self.output_throughput_values = []
self._lock = threading.Lock()
@staticmethod
def _as_float(value: Any) -> Optional[float]:
try:
if value is None:
return None
return float(value)
except (TypeError, ValueError):
return None
@staticmethod
def _parse_ttft(meta: Dict[str, Any]) -> Optional[float]:
for key in ('ttft', 'TTFT', 'first_chunk_latency', 'first_token_latency'):
if key in meta:
value = _PerfAccumulator._as_float(meta.get(key))
if value is not None and value >= 0:
return value
return None
def add(
self,
task_state: TaskState,
elapsed_sec: Optional[float] = None,
started_at: Optional[float] = None,
ended_at: Optional[float] = None,
) -> None:
output = task_state.output
if output is None:
return
metadata = output.metadata or {}
latency = self._as_float(output.time) or self._as_float(metadata.get('latency')) or elapsed_sec
if latency is None or latency <= 0:
return
usage = output.usage
output_tokens = int(getattr(usage, 'output_tokens', 0) or 0)
ttft = self._parse_ttft(metadata)
if ttft is not None and ttft > latency:
# Guard abnormal values from upstream metadata.
ttft = None
tpot = None
if ttft is not None and output_tokens > 1:
# Standard TPOT formula when TTFT is available.
tpot = max(0.0, (latency - ttft) / (output_tokens - 1))
elif output_tokens > 0:
# Non-streaming fallback: approximate per-token generation time.
tpot = latency / output_tokens
output_throughput = output_tokens / latency if output_tokens > 0 else 0.0
with self._lock:
self.total_samples += 1
self.samples_with_latency += 1
self.total_output_tokens += output_tokens
self.total_latency += latency
if started_at is not None:
self.window_start = started_at if self.window_start is None else min(self.window_start, started_at)
if ended_at is not None:
self.window_end = ended_at if self.window_end is None else max(self.window_end, ended_at)
if ttft is not None:
self.ttft_values.append(ttft)
self.samples_with_ttft += 1
if tpot is not None:
self.tpot_values.append(tpot)
self.samples_with_tpot += 1
self.output_throughput_values.append(output_throughput)
def to_report_metrics(self) -> Dict[str, Any]:
if self.samples_with_latency <= 0:
return {}
avg_ttft = sum(self.ttft_values) / len(self.ttft_values) if self.ttft_values else None
avg_tpot = sum(self.tpot_values) / len(self.tpot_values) if self.tpot_values else None
avg_output_throughput = (
sum(self.output_throughput_values) / len(self.output_throughput_values)
if self.output_throughput_values else None
)
overall_output_throughput_sum_latency = (
self.total_output_tokens / self.total_latency if self.total_latency > 0 else None
)
wall_clock_sec = (
(self.window_end - self.window_start)
if self.window_start is not None and self.window_end is not None and self.window_end > self.window_start
else None
)
overall_output_throughput = (
self.total_output_tokens / wall_clock_sec if wall_clock_sec and wall_clock_sec > 0 else None
)
request_throughput = (
self.samples_with_latency / wall_clock_sec if wall_clock_sec and wall_clock_sec > 0 else None
)
metrics = {
'TTFT': round(avg_ttft*1000, 3) if avg_ttft is not None else None,
'TPOT': round(avg_tpot*1000, 3) if avg_tpot is not None else None,
# Benchmark-comparable global throughput (total tokens / wall-clock window).
'OUTPUT_THROUGHPUT': round(overall_output_throughput, 3) if overall_output_throughput is not None else None,
# Achieved requests per second (observed).
'REQUEST_THROUGHPUT': round(request_throughput, 3) if request_throughput is not None else None,
}
return metrics
@register_evaluator('default')
class DefaultEvaluator(Evaluator):
"""
Default Evaluator for running evaluations on benchmarks.
This evaluator handles the complete evaluation pipeline:
1. Loading datasets from all subsets
2. Flattening all subsets into a single unified work pool
3. Running model inference and metric calculation atomically per sample
4. Writing results to per-subset JSONL cache as each sample completes
5. Aggregating scores and generating the final report
Args:
benchmark: The data adapter for loading and processing data.
model: The model to be evaluated.
outputs: The output structure for saving evaluation results.
task_config: The task configuration.
"""
def __init__(
self,
benchmark: 'DataAdapter',
model: 'Model',
outputs: 'OutputsStructure',
task_config: 'TaskConfig',
):
# Store core components needed for evaluation
self.benchmark = benchmark
self.model = model
self.outputs = outputs
self.task_config = task_config
# Extract frequently used identifiers
self.benchmark_name = benchmark.name
"""Name of the benchmark being evaluated."""
self.model_name = task_config.model_id
"""ID of the model being evaluated."""
self.use_cache = task_config.use_cache
"""Whether to use cache for predictions."""
# Initialize cache manager for storing and retrieving cached results
self.cache_manager = CacheManager(
outputs=outputs,
model_name=self.model_name,
benchmark_name=self.benchmark_name,
)
# Initialize batch reviewer for benchmarks that use batch scoring
self.batch_reviewer = BatchReviewer(
benchmark=benchmark,
cache_manager=self.cache_manager,
task_config=task_config,
)
self.perf_accumulator = _PerfAccumulator()
self._request_rate_lock = threading.Lock()
self._first_request_time = None
self._request_count = 0
def eval(self) -> Report:
"""
Run the complete evaluation process.
All subsets are merged into a **single unified work pool** so slow
samples in one subset never block samples in another. Each sample is
predicted and reviewed atomically; results are persisted to per-subset
JSONL caches as they complete.
Returns:
Report: The complete evaluation report.
"""
logger.info(f'Start loading benchmark dataset: {self.benchmark_name}')
dataset_dict = {k: v for k, v in self.benchmark.load_dataset().items() if len(v) > 0}
if not dataset_dict:
logger.warning(f'No samples found in any subset of {self.benchmark_name}. Skipping.')
self.finalize()
return {}
subset_list = list(dataset_dict.keys())
logger.info(f'Start evaluating {len(dataset_dict)} subsets of {self.benchmark_name}: {subset_list}')
# Phase 1 – build unified work pool from all subsets
context = self._collect_work_items(dataset_dict)
logger.info(
f'Unified pool: {len(context.work_items)} items to process, '
f'{context.total_cached} already fully cached '
f'({context.grand_total} total across all subsets).'
)
# Phase 2 – execute unified thread pool (single progress bar)
results_by_subset = self._run_pool(context)
logger.info(f'Unified pool finished for {self.benchmark_name}.')
# Phase 3 – aggregate scores per subset (batch review happens here too)
agg_score_dict = self._aggregate_scores(dataset_dict, context, results_by_subset)
# Phase 4 – generate report
if not agg_score_dict:
logger.warning(
f'No valid scores generated for {self.benchmark_name} '
'(all samples filtered or empty subsets). Skipping report generation.'
)
report = {}
else:
logger.info('Generating report...')
report = self.get_report(agg_score_dict)
self.finalize()
logger.info(f'Benchmark {self.benchmark_name} evaluation finished.')
return report
# ------------------------------------------------------------------ #
# Phase helpers #
# ------------------------------------------------------------------ #
def _collect_work_items(self, dataset_dict: Dict[str, Dataset]) -> _PoolContext:
"""
Phase 1 – classify every sample across all subsets into a work tier.
Each sample falls into exactly one of three tiers:
1. **Fully cached** (predict + review on disk) → skipped; its score is
stored in ``cached_scores_by_subset``.
2. **Predict cached, review pending** → review-only :class:`_WorkItem`
(or placed in ``review_pending_by_subset`` for batch-scoring).
3. **Uncached** → full predict + review :class:`_WorkItem`.
Args:
dataset_dict: Mapping of subset name → dataset.
Returns:
A :class:`_PoolContext` ready for :meth:`_run_pool` and
:meth:`_aggregate_scores`.
"""
work_items: List[_WorkItem] = []
cached_scores_by_subset: Dict[str, List[SampleScore]] = defaultdict(list)
review_pending_by_subset: Dict[str, List[TaskState]] = defaultdict(list)
for subset, dataset in dataset_dict.items():
cached_pred_states, remaining_dataset = (
self.cache_manager.filter_prediction_cache(subset, dataset) if self.use_cache else ([], dataset)
)
if self.benchmark.use_batch_scoring:
# Prediction runs in the pool; review is deferred until all
# task_states for this subset are available (after pool).
for sample in remaining_dataset:
work_items.append(_WorkItem(subset=subset, sample=sample))
if self.use_cache and not self.task_config.rerun_review:
cached_scores, need_review = self.cache_manager.filter_review_cache(subset, cached_pred_states)
cached_scores_by_subset[subset].extend(cached_scores)
review_pending_by_subset[subset].extend(need_review)
else:
self.cache_manager.delete_review_cache(subset)
review_pending_by_subset[subset].extend(cached_pred_states)
else:
# Predict + review happen atomically per sample inside the pool.
if self.use_cache and not self.task_config.rerun_review:
cached_scores, need_review = self.cache_manager.filter_review_cache(subset, cached_pred_states)
cached_scores_by_subset[subset].extend(cached_scores)
for ts in need_review: # Tier 2: review-only items
work_items.append(_WorkItem(subset=subset, task_state=ts))
else:
self.cache_manager.delete_review_cache(subset)
for ts in cached_pred_states: # Prediction cached, review cleared
work_items.append(_WorkItem(subset=subset, task_state=ts))
for sample in remaining_dataset: # Tier 3: full predict+review
work_items.append(_WorkItem(subset=subset, sample=sample))
model_prediction_dir = os.path.dirname(self.cache_manager.get_prediction_cache_path(next(iter(dataset_dict))))
total_cached = sum(len(v) for v in cached_scores_by_subset.values())
return _PoolContext(
work_items=work_items,
cached_scores_by_subset=cached_scores_by_subset,
review_pending_by_subset=review_pending_by_subset,
model_prediction_dir=model_prediction_dir,
total_cached=total_cached,
)
def _run_pool(self, context: _PoolContext) -> Dict[str, List[Tuple[TaskState, Optional[SampleScore]]]]:
"""
Phase 2 – execute the unified work pool under a single progress bar.
Each item is processed by :meth:`_process_work_item`; results are
immediately persisted by :meth:`_persist_result` and accumulated
into a per-subset bucket for downstream aggregation.
Args:
context: Pool context produced by :meth:`_collect_work_items`.
Returns:
Mapping of subset name → ``(task_state, sample_score)`` pairs in
completion order. ``sample_score`` is ``None`` for batch-scoring
benchmarks (review is deferred to :meth:`_aggregate_scores`).
"""
results_by_subset: Dict[str, List[Tuple[TaskState, Optional[SampleScore]]]] = \
defaultdict(list)
def worker(item: _WorkItem) -> Tuple[TaskState, Optional[SampleScore]]:
return self._process_work_item(item, context.model_prediction_dir)
def on_result(item: _WorkItem, result: Tuple[TaskState, Optional[SampleScore]]) -> None:
self._persist_result(item, *result)
results_by_subset[item.subset].append(result)
def on_error(item: _WorkItem, exc: Exception) -> None:
tb_str = traceback.format_exc()
logger.error(f'Processing item in subset={item.subset!r} failed: {exc}\nTraceback:\n{tb_str}')
if self.task_config.ignore_errors:
logger.warning('Error ignored, continuing with next sample.')
return
raise exc
run_in_threads_with_progress(
context.work_items,
worker,
desc=f'Evaluating[{self.benchmark_name}]',
max_workers=self.task_config.eval_batch_size,
log_interval=HEARTBEAT_INTERVAL_SEC,
on_result=on_result,
on_error=on_error,
skip_failed=True,
initial=context.total_cached,
total=context.grand_total,
)
return results_by_subset
def _process_work_item(self, item: _WorkItem, model_prediction_dir: str) -> Tuple[TaskState, Optional[SampleScore]]:
"""
Process a single work item: predict (if needed) then review.
Called concurrently inside the thread pool by :meth:`_run_pool`.
Override this method to inject custom logic around inference or scoring.
Args:
item: The work item to process.
model_prediction_dir: Directory for storing prediction output files.
Returns:
``(task_state, sample_score)`` where ``sample_score`` is ``None``
for batch-scoring benchmarks (review deferred).
"""
if item.needs_predict:
self._throttle_request_rate()
infer_start = perf_counter()
task_state = (
self._predict_sample(item.sample, model_prediction_dir) if item.needs_predict else item.task_state
)
infer_end = perf_counter()
elapsed_sec = infer_end - infer_start
self.perf_accumulator.add(
task_state=task_state,
elapsed_sec=elapsed_sec,
started_at=infer_start,
ended_at=infer_end,
)
sample_score = (None if self.benchmark.use_batch_scoring else self._review_task_state(task_state))
return task_state, sample_score
def _throttle_request_rate(self) -> None:
request_rate = getattr(self.task_config, 'request_rate', None)
if request_rate is None or request_rate <= 0:
return
interval = 1.0 / request_rate
with self._request_rate_lock:
now = monotonic()
if self._first_request_time is None:
self._first_request_time = now
# Calculate target time for this request based on its position in the sequence
target_time = self._first_request_time + self._request_count * interval
self._request_count += 1
# Sleep if we're ahead of schedule
if now < target_time:
sleep(target_time - now)
def _persist_result(
self,
item: _WorkItem,
task_state: TaskState,
sample_score: Optional[SampleScore],
) -> None:
"""
Persist a completed item’s results to the on-disk cache.
Called in the **main thread** by :meth:`_run_pool` immediately after
each item completes (no concurrent writes). Override to add custom
persistence logic.
Args:
item: The originating work item.
task_state: The completed task state (prediction output).
sample_score: The review score, or ``None`` for batch-scoring.
"""
if item.needs_predict:
model_result = self.cache_manager.save_prediction_cache(
item.subset, task_state, self.benchmark.save_metadata
)
logger.debug(f'Model result: \n{model_result.pretty_print()}')
if sample_score is not None:
review_result = self.cache_manager.save_review_cache(
subset=item.subset,
task_state=task_state,
sample_score=sample_score,
save_metadata=self.benchmark.save_metadata,
)
logger.debug(f'Review result: \n{review_result.pretty_print()}')
def _aggregate_scores(
self,
dataset_dict: Dict[str, Dataset],
context: _PoolContext,
results_by_subset: Dict[str, List[Tuple[TaskState, Optional[SampleScore]]]],
) -> Dict[str, List[AggScore]]:
"""
Phase 3 – aggregate per-sample scores into subset-level metrics.
For standard benchmarks the pool scores are combined with cached scores
and passed directly to ``benchmark.aggregate_scores``.
For batch-scoring benchmarks :meth:`BatchReviewer.review_subset` is invoked
first to produce final per-sample scores from all collected task states.
Args:
dataset_dict: Subset iteration order.
context: Pool context from :meth:`_collect_work_items`.
results_by_subset: Per-subset pool results from :meth:`_run_pool`.
Returns:
Mapping of subset name → aggregated scores. Empty subsets omitted.
"""
agg_score_dict: Dict[str, List[AggScore]] = {}
for subset in dataset_dict:
cached_scores = context.cached_scores_by_subset.get(subset, [])
pool_results = results_by_subset.get(subset, [])
if self.benchmark.use_batch_scoring:
pending = context.review_pending_by_subset.get(subset, [])
new_task_states = [ts for ts, _ in pool_results]
batch_scores = self.batch_reviewer.review_subset(
subset, pending + new_task_states, review_fn=self._review_task_state
)
all_scores = cached_scores + batch_scores
else:
new_scores = [sc for _, sc in pool_results if sc is not None]
all_scores = cached_scores + new_scores
if not all_scores:
logger.info(f'No valid scores generated for subset: {subset}, skipping.')
continue
logger.info(f'Aggregating scores for subset: {subset}')
agg_score_dict[subset] = self.benchmark.aggregate_scores(sample_scores=all_scores)
return agg_score_dict
def _predict_sample(self, sample: Sample, model_prediction_dir: str) -> TaskState:
"""
Run model inference on a single sample.
Args:
sample: The sample to predict.
model_prediction_dir: Directory for storing model predictions.
Returns:
TaskState: The task state containing the prediction result.
"""
logger.debug(f'\n{sample.pretty_print()}')
task_state = self.benchmark.run_inference(model=self.model, sample=sample, output_dir=model_prediction_dir)
return task_state
def _review_task_state(self, task_state: TaskState) -> SampleScore:
"""
Compute evaluation metrics for a single task state.
Args:
task_state: The task state to review.
Returns:
SampleScore: The evaluation score for the task state.
"""
sample_score = self.benchmark.calculate_metrics(task_state=task_state)
return sample_score
def get_report(self, agg_score_dict: Dict[str, List[AggScore]]) -> Report:
"""
Generate a comprehensive evaluation report from aggregated scores.
This method handles:
1. Creating the evaluation report from scores
2. Generating and displaying a summary table
3. Optionally generating detailed analysis
4. Saving the report to file
Args:
agg_score_dict: Dictionary mapping subset names to their aggregated scores.
Returns:
Report: The complete evaluation report.
"""
assert agg_score_dict, 'No scores to generate report from.'
# Get paths for saving the report
report_path = self.cache_manager.get_report_path()
report_file = self.cache_manager.get_report_file()
# Generate the main evaluation report using benchmark-specific logic
report = self.benchmark.generate_report(
scores=agg_score_dict, model_name=self.model_name, output_dir=report_path
)
perf_metrics = self.perf_accumulator.to_report_metrics()
if perf_metrics:
configured_rate = getattr(self.task_config, 'request_rate', None)
if configured_rate is not None:
perf_metrics['REQUEST_RATE'] = round(configured_rate, 3)
report.metadata = report.metadata or {}
report.metadata['inference_performance'] = perf_metrics
logger.info(f'Evaluator performance metrics: {perf_metrics}')
if perf_metrics.get('TTFT') is None:
logger.warning(
'TTFT is unavailable for this run. For OpenAI/vLLM backends, '
'set `generation_config.stream=true` and avoid cache reuse to collect TTFT.'
)
# Generate and display a summary table of results
try:
report_table = gen_table(report_list=[report], add_overall_metric=self.benchmark.add_overall_metric)
logger.info(f'\n{self.benchmark_name} report table:'
f'\n{report_table} \n')
except Exception:
logger.error('Failed to generate report table.')
# Generate detailed analysis if requested in configuration
if self.task_config.analysis_report:
logger.info('Generating report analysis, please wait ...')
analysis = report.generate_analysis(self.task_config)
logger.info(f'Report analysis:\n{analysis}')
else:
logger.info('Skipping report analysis (`analysis_report=False`).')
# Save the complete report to file
report.to_json(report_file)
logger.info(f'Dump report to: {report_file} \n')
return report
def finalize(self, *args, **kwargs):
self.benchmark.finalize(*args, **kwargs)
\ No newline at end of file
from evalscope.api.model import ModelAPI
from evalscope.api.registry import register_model_api
from evalscope.utils.deprecation_utils import deprecated
from evalscope.utils.import_utils import check_import
@register_model_api(name='mock_llm')
def mockllm() -> type[ModelAPI]:
from .mockllm import MockLLM
return MockLLM
@register_model_api(name='openai_api')
def openai_api() -> type[ModelAPI]:
from .openai_compatible import OpenAICompatibleAPI
return OpenAICompatibleAPI
@register_model_api(name='openai_raw_http')
def openai_raw_http() -> type[ModelAPI]:
from .openai_compatible import OpenAICompatibleRawHTTP
return OpenAICompatibleRawHTTP
@register_model_api(name='anthropic_api')
def anthropic_api() -> type[ModelAPI]:
check_import('anthropic', package='anthropic', raise_error=True, feature_name='anthropic_api')
from .anthropic_compatible import AnthropicCompatibleAPI
return AnthropicCompatibleAPI
@register_model_api(name='server')
@deprecated(since='1.0.0', remove_in='1.1.0', alternative='openai_api')
def server() -> type[ModelAPI]:
from .openai_compatible import OpenAICompatibleAPI
return OpenAICompatibleAPI
@register_model_api(name='llm_ckpt')
def llm_ckpt() -> type[ModelAPI]:
check_import('torch', package='torch', raise_error=True, feature_name='llm_ckpt')
from .modelscope import ModelScopeAPI
return ModelScopeAPI
@register_model_api(name='checkpoint')
@deprecated(since='1.0.0', remove_in='1.1.0', alternative='llm_ckpt')
def checkpoint() -> type[ModelAPI]:
check_import('torch', package='torch', raise_error=True, feature_name='llm_ckpt')
from .modelscope import ModelScopeAPI
return ModelScopeAPI
@register_model_api(name='text2image')
def text2image() -> type[ModelAPI]:
check_import(['torch', 'torchvision', 'diffusers'],
package='evalscope[aigc]',
raise_error=True,
feature_name='text2image')
from .text2image_model import Text2ImageAPI
return Text2ImageAPI
@register_model_api(name='image_editing')
def image_editing() -> type[ModelAPI]:
check_import(['torch', 'torchvision', 'diffusers'],
package='evalscope[aigc]',
raise_error=True,
feature_name='image_editing')
from .image_edit_model import ImageEditAPI
return ImageEditAPI
import os
from time import perf_counter
from openai import APIStatusError, BadRequestError, OpenAI, PermissionDeniedError, UnprocessableEntityError
from openai._types import NOT_GIVEN
from openai.types.chat import ChatCompletion
from typing import Any, Dict, List, Optional, Tuple, Union
from evalscope.api.messages import ChatMessage
from evalscope.api.model import ChatCompletionChoice, GenerateConfig, ModelAPI, ModelOutput
from evalscope.api.tool import ToolChoice, ToolInfo
from evalscope.utils import get_logger
from evalscope.utils.argument_utils import get_supported_params
from evalscope.utils.function_utils import retry_call
from .utils.openai import (
chat_choices_from_openai,
collect_stream_response,
model_output_from_openai,
openai_chat_messages,
openai_chat_tool_choice,
openai_chat_tools,
openai_completion_params,
openai_handle_bad_request,
)
logger = get_logger()
class OpenAICompatibleAPI(ModelAPI):
def __init__(
self,
model_name: str,
base_url: Optional[str] = None,
api_key: Optional[str] = None,
config: GenerateConfig = GenerateConfig(),
**model_args: Any,
) -> None:
super().__init__(
model_name=model_name,
base_url=base_url,
api_key=api_key,
config=config,
)
# use service prefix to lookup api_key
self.api_key = api_key or os.environ.get('EVALSCOPE_API_KEY', None)
assert self.api_key, f'API key for {model_name} not found'
# use service prefix to lookup base_url
self.base_url = base_url or os.environ.get('EVALSCOPE_BASE_URL', None)
assert self.base_url, f'Base URL for {model_name} not found'
# remove trailing slash from base_url
self.base_url = self.base_url.rstrip('/').removesuffix('/chat/completions')
# create http client
self.client = OpenAI(
api_key=self.api_key,
base_url=self.base_url,
**model_args,
)
def generate(
self,
input: List[ChatMessage],
tools: List[ToolInfo],
tool_choice: ToolChoice,
config: GenerateConfig,
) -> ModelOutput:
# setup request and response for ModelCall
request: Dict[str, Any] = {}
response: Dict[str, Any] = {}
tools, tool_choice, config = self.resolve_tools(tools, tool_choice, config)
# get completion params (slice off service from model name)
completion_params = self.completion_params(
config=config,
tools=len(tools) > 0,
)
request = dict(
messages=openai_chat_messages(input),
tools=openai_chat_tools(tools) if len(tools) > 0 else NOT_GIVEN,
tool_choice=openai_chat_tool_choice(tool_choice) if len(tools) > 0 else NOT_GIVEN,
**completion_params,
)
self.validate_request_params(request)
try:
# generate completion and save response for model call
request_start = perf_counter()
completion = retry_call(
self.client.chat.completions.create,
retries=config.retries,
sleep_interval=config.retry_interval,
**request
)
# handle streaming response
ttft = None
is_stream_response = not isinstance(completion, ChatCompletion)
if is_stream_response:
collected_chunks = []
for chunk in completion:
collected_chunks.append(chunk)
# TTFT should reflect first generated token/content chunk, not just any chunk.
# Different OpenAI-compatible servers may return delta as object or dict.
if ttft is None and self._chunk_has_generation_payload(chunk):
ttft = perf_counter() - request_start
completion = collect_stream_response(collected_chunks)
response = completion.model_dump()
self.on_response(response)
# return output and call
choices = self.chat_choices_from_completion(completion, tools)
model_output = model_output_from_openai(completion, choices)
if ttft is not None:
model_output.metadata = model_output.metadata or {}
model_output.metadata['ttft'] = ttft
model_output.metadata['ttft_source'] = 'first_content_stream_chunk'
return model_output
except (BadRequestError, UnprocessableEntityError, PermissionDeniedError) as ex:
return self.handle_bad_request(ex)
except ValueError as ex:
logger.error(f'Model [{self.model_name}] returned an invalid response: {ex}')
raise
def resolve_tools(self, tools: List[ToolInfo], tool_choice: ToolChoice,
config: GenerateConfig) -> Tuple[List[ToolInfo], ToolChoice, GenerateConfig]:
"""Provides an opportunity for concrete classes to customize tool resolution."""
return tools, tool_choice, config
def completion_params(self, config: GenerateConfig, tools: bool) -> Dict[str, Any]:
return openai_completion_params(
model=self.model_name,
config=config,
tools=tools,
)
def validate_request_params(self, params: Dict[str, Any]):
"""Hook for subclasses to do custom request parameter validation."""
# Cache supported params to avoid repeated calls to inspect.signature.
if not hasattr(self, '_valid_params'):
self._valid_params = get_supported_params(self.client.chat.completions.create)
# Move unsupported parameters to extra_body.
extra_body = params.get('extra_body', {})
for key in list(params.keys()):
if key not in self._valid_params:
extra_body[key] = params.pop(key)
if extra_body:
params['extra_body'] = extra_body
def on_response(self, response: Dict[str, Any]) -> None:
"""Hook for subclasses to do custom response handling."""
pass
def chat_choices_from_completion(self, completion: ChatCompletion,
tools: List[ToolInfo]) -> List[ChatCompletionChoice]:
"""Hook for subclasses to do custom chat choice processing."""
return chat_choices_from_openai(completion, tools)
def handle_bad_request(self, ex: APIStatusError) -> Union[ModelOutput, Exception]:
"""Hook for subclasses to do bad request handling"""
return openai_handle_bad_request(self.model_name, ex)
@staticmethod
def _chunk_has_generation_payload(chunk: Any) -> bool:
"""Return True when stream chunk carries actual generated payload."""
choices = getattr(chunk, 'choices', None) or []
for choice in choices:
delta = getattr(choice, 'delta', None)
if delta is None:
continue
if isinstance(delta, dict):
content = delta.get('content')
reasoning = delta.get('reasoning_content') or delta.get('reasoning')
tool_calls = delta.get('tool_calls')
else:
content = getattr(delta, 'content', None)
reasoning = getattr(delta, 'reasoning_content', None) or getattr(delta, 'reasoning', None)
tool_calls = getattr(delta, 'tool_calls', None)
if content not in (None, '', []):
return True
if reasoning not in (None, '', []):
return True
if tool_calls:
return True
return False
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment