Unverified Commit e18840ce authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

feat: add Prometheus auto and custom label injection for engine metrics (#5989)


Signed-off-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent 6fe2152b
......@@ -98,3 +98,4 @@ repos:
- pydantic
- filelock
- pyyaml
- prometheus_client>=0.23.1
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Custom Prometheus collector that injects additional labels into metrics.
Wraps a source registry and clones metrics during collection, injecting user-specified
labels without modifying the original metrics. Preserves all metric types, reserved
labels (histogram 'le', summary 'quantile'), timestamps, and exemplars.
"""
from typing import TYPE_CHECKING, Iterator
from prometheus_client.registry import Collector
# Import prometheus_client types for type hints only
# Actual prometheus_client imports happen inside methods to respect initialization order
if TYPE_CHECKING:
from prometheus_client import CollectorRegistry
from prometheus_client.metrics_core import Metric as MetricFamily
class LabelInjectingCollector(Collector):
"""
Prometheus collector that injects labels into metrics during collection.
Preserves all metric types, reserved labels, timestamps, and exemplars.
"""
def __init__(
self,
source_registry: "CollectorRegistry",
labels_to_inject: dict[str, str],
):
"""
Args:
source_registry: Source registry to collect from
labels_to_inject: Labels to inject (e.g. {"dynamo_namespace": "prod"})
Raises:
ValueError: If labels_to_inject is empty or contains reserved labels (le, quantile)
"""
if not labels_to_inject:
raise ValueError("labels_to_inject cannot be empty")
# Check for reserved label names that should not be overwritten
reserved_labels = {"le", "quantile"}
conflicting_labels = reserved_labels.intersection(labels_to_inject.keys())
if conflicting_labels:
raise ValueError(
f"Cannot inject reserved label names: {conflicting_labels}. "
f"Reserved labels: {reserved_labels}"
)
self.source_registry = source_registry
self.labels_to_inject = labels_to_inject
def collect(self) -> Iterator["MetricFamily"]:
"""
Collect metrics from source registry with injected labels.
Preserves all metric types, reserved labels, timestamps, and exemplars.
"""
# Delayed import here to respect prometheus_client multiprocess initialization order
from prometheus_client.metrics_core import Metric as MetricFamily
from prometheus_client.samples import Sample
for metric_family in self.source_registry.collect():
# Clone the metric family with injected labels
new_samples = []
for sample in metric_family.samples:
# Merge existing labels with injected labels
# Existing labels take precedence (don't overwrite if already present)
merged_labels = {**self.labels_to_inject, **sample.labels}
# Create new sample with merged labels
new_sample = Sample(
name=sample.name,
labels=merged_labels,
value=sample.value,
timestamp=sample.timestamp,
exemplar=sample.exemplar,
)
new_samples.append(new_sample)
# Create new metric family with updated samples
new_metric_family = MetricFamily(
name=metric_family.name,
documentation=metric_family.documentation,
typ=metric_family.type,
unit=metric_family.unit,
)
new_metric_family.samples = new_samples
yield new_metric_family
def describe(self) -> Iterator["MetricFamily"]:
"""Describe metrics from source registry (forwards to source)."""
return iter(self.source_registry.collect())
......@@ -25,13 +25,23 @@ from dynamo.prometheus_names import kvstats, labels, model_info, name_prefix
if TYPE_CHECKING:
from prometheus_client import CollectorRegistry
# Auto-label injection: always injects dynamo_namespace, dynamo_component, dynamo_endpoint labels
# into engine metrics based on the endpoint hierarchy.
#
# Rust counterpart: lib/runtime/src/metrics.rs create_metric() function
# Label constants defined in: lib/runtime/src/metrics/prometheus_names.rs labels module
def register_engine_metrics_callback(
endpoint: Endpoint,
registry: "CollectorRegistry",
metric_prefix_filters: Optional[list[str]] = None,
exclude_prefixes: Optional[list[str]] = None,
add_prefix: Optional[str] = None,
inject_custom_labels: Optional[dict[str, str]] = None,
namespace_name: Optional[str] = None,
component_name: Optional[str] = None,
endpoint_name: Optional[str] = None,
model_name: Optional[str] = None,
) -> None:
"""
Register a callback to expose engine Prometheus metrics via Dynamo's metrics endpoint.
......@@ -39,17 +49,39 @@ def register_engine_metrics_callback(
This registers a callback that is invoked when /metrics is scraped, passing through
engine-specific metrics alongside Dynamo runtime metrics.
Automatically injects dynamo_namespace, dynamo_component, dynamo_endpoint, model,
and model_name labels when namespace_name and component_name are provided.
Label Precedence (highest to lowest):
1. Existing labels from source metrics - never changed, never overwritten
2. Auto-injected labels (dynamo_*, model*) - added by Dynamo automatically
3. Custom labels (inject_custom_labels) - user-provided, lowest precedence
If inject_custom_labels contains keys that conflict with auto-injected labels,
a warning is logged and the auto-injected value takes precedence.
Args:
endpoint: Dynamo endpoint object with metrics.register_prometheus_expfmt_callback()
registry: Prometheus registry to collect from (e.g., REGISTRY or CollectorRegistry)
metric_prefix_filters: List of prefixes to filter metrics (e.g., ["vllm:"], ["vllm:", "lmcache:"], or None for no filtering)
exclude_prefixes: List of metric name prefixes to exclude (e.g., ["python_", "process_"])
add_prefix: Prefix to add to remaining metrics (e.g., "trtllm_")
inject_custom_labels: Optional dict of custom labels to inject (e.g. {"lora_adapter": "my-lora"}).
Injected at collection time without modifying source metrics.
Reserved labels (le, quantile) will raise ValueError.
Auto-labels (dynamo_namespace, dynamo_component, dynamo_endpoint, model,
model_name) are added automatically and should not be in inject_custom_labels.
namespace_name: Explicit namespace name for auto-labels (from config.namespace)
component_name: Explicit component name for auto-labels (from config.component)
endpoint_name: Explicit endpoint name for auto-labels (from config.endpoint, defaults to "generate")
model_name: Model name/path for auto-labels (from config.model, injected as both 'model' and 'model_name')
Example:
from prometheus_client import REGISTRY
# Auto-labels: automatically adds hierarchy labels
register_engine_metrics_callback(
generate_endpoint, REGISTRY, metric_prefix_filters=["vllm:"]
generate_endpoint, REGISTRY,
metric_prefix_filters=["vllm:"],
namespace_name="prod", component_name="vllm-worker", endpoint_name="generate"
)
# Include multiple metric prefixes
......@@ -61,17 +93,68 @@ def register_engine_metrics_callback(
register_engine_metrics_callback(
generate_endpoint, REGISTRY,
exclude_prefixes=["python_", "process_"],
add_prefix="trtllm_"
metric_prefix_filters=["trtllm_"],
)
# Inject additional labels (auto-labels are added automatically)
register_engine_metrics_callback(
generate_endpoint, REGISTRY,
metric_prefix_filters=["vllm:"],
inject_custom_labels={"lora_adapter": "my-lora"}
)
"""
# Auto-inject hierarchy labels
final_inject_labels = inject_custom_labels.copy() if inject_custom_labels else {}
if namespace_name and component_name:
# Extract hierarchy information
# Mirrors Rust auto-label injection in lib/runtime/src/metrics.rs create_metric()
endpoint_name_final = endpoint_name or "generate"
# Add auto-labels using constants from prometheus_names.labels
# These align with Rust auto-labels defined in lib/runtime/src/metrics/prometheus_names.rs
auto_labels = {
labels.NAMESPACE: namespace_name, # "dynamo_namespace"
labels.COMPONENT: component_name, # "dynamo_component"
labels.ENDPOINT: endpoint_name_final, # "dynamo_endpoint"
}
# Add model labels if model_name is provided
if model_name:
auto_labels[labels.MODEL] = model_name # "model" (OpenAI standard)
auto_labels[
labels.MODEL_NAME
] = model_name # "model_name" (engine-native compatibility)
# Validate that user didn't provide conflicting auto-labels
# Warn but don't error - custom labels have lower precedence than auto-labels
if inject_custom_labels:
for key in auto_labels:
if key in inject_custom_labels:
logging.warning(
f"Custom label '{key}' conflicts with auto-injected label. "
f"Auto-injected value takes precedence. Custom value '{inject_custom_labels[key]}' ignored."
)
# Merge labels with correct precedence:
# 1. Existing labels (from source metrics) - never overwritten
# 2. Auto-labels (dynamo_*, model*) - injected by Dynamo
# 3. Custom labels (inject_custom_labels) - user-provided, lowest precedence
# Put custom labels first, then overwrite with auto-labels (higher precedence)
final_inject_labels = {**final_inject_labels, **auto_labels}
logging.debug(
f"Auto-injecting labels: "
f"namespace={namespace_name}, component={component_name}, endpoint={endpoint_name_final}, model={model_name}"
)
def get_expfmt() -> str:
"""Callback to return engine Prometheus metrics in exposition format"""
result = get_prometheus_expfmt(
registry,
metric_prefix_filters=metric_prefix_filters,
exclude_prefixes=exclude_prefixes,
add_prefix=add_prefix,
inject_custom_labels=final_inject_labels if final_inject_labels else None,
)
return result
......@@ -101,23 +184,17 @@ def _compile_include_pattern(metric_prefixes: tuple[str, ...]) -> Pattern:
return re.compile(rf"^(# (HELP|TYPE) )?({prefixes_regex})")
@lru_cache(maxsize=128)
def _compile_help_type_pattern() -> Pattern:
"""Compile and cache regex for extracting metric names from HELP/TYPE comment lines."""
return re.compile(r"^# (HELP|TYPE) (\S+)(.*)$")
def get_prometheus_expfmt(
registry,
metric_prefix_filters: Optional[list[str]] = None,
exclude_prefixes: Optional[list[str]] = None,
add_prefix: Optional[str] = None,
inject_custom_labels: Optional[dict[str, str]] = None,
) -> str:
"""
Get Prometheus metrics from a registry formatted as text using the standard text encoder.
Collects all metrics from the registry and returns them in Prometheus text exposition format.
Optionally filters metrics by prefix, excludes certain prefixes, and adds a prefix.
Optionally filters metrics by prefix, excludes certain prefixes, adds a prefix, and injects labels.
IMPORTANT: prometheus_client is imported lazily here because it must be imported AFTER
set_prometheus_multiproc_dir() is called by SGLang's engine initialization. Importing
......@@ -131,7 +208,14 @@ def get_prometheus_expfmt(
metric_prefix_filters: Optional list of prefixes to filter displayed metrics (e.g., ["vllm:"] or ["vllm:", "lmcache:"]).
If None, returns all metrics. Supports single string or list of strings. (default: None)
exclude_prefixes: List of metric name prefixes to exclude (e.g., ["python_", "process_"])
add_prefix: Prefix to add to remaining metrics (e.g., "trtllm_")
inject_custom_labels: Optional dict of custom labels to inject at collection time.
Example: {"lora_adapter": "my-lora"}
Reserved labels (le, quantile) will raise ValueError.
Label Precedence (highest to lowest):
1. Existing labels from source metrics - never changed
2. Auto-injected labels (via register_engine_metrics_callback)
3. Custom labels (inject_custom_labels) - lowest precedence
Returns:
Formatted metrics text in Prometheus exposition format. Returns empty string on error.
......@@ -140,16 +224,40 @@ def get_prometheus_expfmt(
# Filter to include only vllm and lmcache metrics
get_prometheus_expfmt(registry, metric_prefix_filters=["vllm:", "lmcache:"])
# Filter out python_/process_ metrics and add trtllm_ prefix
get_prometheus_expfmt(registry, exclude_prefixes=["python_", "process_"], add_prefix="trtllm_")
# Filter out python_/process_ metrics (TRT-LLM natively outputs trtllm_* prefix)
get_prometheus_expfmt(registry, metric_prefix_filters=["trtllm_"])
# Inject labels (custom labels, not auto-injected ones)
get_prometheus_expfmt(
registry, metric_prefix_filters=["vllm:"],
inject_custom_labels={"lora_adapter": "my-lora"}
)
"""
from prometheus_client import generate_latest
from prometheus_client import CollectorRegistry, generate_latest
try:
# If label injection requested, wrap registry with custom collector
if inject_custom_labels:
# Delayed import: LabelInjectingCollector imports prometheus_client.registry.Collector
# at module level. This import must happen AFTER set_prometheus_multiproc_dir() is
# called by SGLang's engine initialization. Importing at the top of this file would
# trigger prometheus_client initialization too early (before PROMETHEUS_MULTIPROC_DIR
# is set), breaking multiprocess metrics collection.
from dynamo.common.utils.label_injecting_collector import (
LabelInjectingCollector,
)
# Create temporary registry with label-injecting collector
temp_registry = CollectorRegistry()
temp_registry.register(
LabelInjectingCollector(registry, inject_custom_labels)
)
registry = temp_registry
# Generate metrics in Prometheus text format
metrics_text = generate_latest(registry).decode("utf-8")
if metric_prefix_filters or exclude_prefixes or add_prefix:
if metric_prefix_filters or exclude_prefixes:
lines = []
# Get cached compiled patterns
......@@ -163,9 +271,6 @@ def get_prometheus_expfmt(
filter_tuple: tuple[str, ...] = tuple(metric_prefix_filters)
include_pattern = _compile_include_pattern(filter_tuple)
# Get cached HELP/TYPE pattern
help_type_pattern = _compile_help_type_pattern()
for line in metrics_text.split("\n"):
if not line.strip():
continue
......@@ -178,52 +283,6 @@ def get_prometheus_expfmt(
if include_pattern and not include_pattern.match(line):
continue
# Apply prefix transformation if needed
if add_prefix:
# Handle HELP/TYPE comments
if line.startswith("# HELP ") or line.startswith("# TYPE "):
match = help_type_pattern.match(line)
if match:
comment_type, metric_name, rest = match.groups()
# Remove existing prefix if present
if metric_prefix_filters:
for prefix in metric_prefix_filters:
if metric_name.startswith(prefix):
metric_name = metric_name.removeprefix(prefix)
break
# Only add prefix if it doesn't already exist
if not metric_name.startswith(add_prefix):
metric_name = add_prefix + metric_name
line = f"# {comment_type} {metric_name}{rest}"
# Handle metric lines
elif line and not line.startswith("#"):
# Extract metric name (first token)
parts = line.split(None, 1)
if parts:
metric_name_part = parts[0]
rest_of_line = parts[1] if len(parts) > 1 else ""
# Remove existing prefix if present
if metric_prefix_filters:
for prefix in metric_prefix_filters:
if metric_name_part.startswith(prefix):
metric_name_part = (
metric_name_part.removeprefix(prefix)
)
break
# Only add prefix if it doesn't already exist
if not metric_name_part.startswith(add_prefix):
metric_name_part = add_prefix + metric_name_part
# Reconstruct line
line = metric_name_part + (
" " + rest_of_line if rest_of_line else ""
)
else:
# Empty line or just whitespace, skip prefix addition
pass
lines.append(line)
result = "\n".join(lines)
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Unit tests for LabelInjectingCollector.
Tests the custom Prometheus collector that injects labels into metrics without
modifying the source metrics.
"""
import pytest
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, Summary
class TestLabelInjectingCollector:
"""Test suite for LabelInjectingCollector"""
def test_counter_label_injection(self):
"""Test injecting labels into Counter metrics"""
from dynamo.common.utils.label_injecting_collector import (
LabelInjectingCollector,
)
# Create source registry with a counter
source_registry = CollectorRegistry()
counter = Counter(
"test_counter",
"Test counter",
registry=source_registry,
)
counter.inc(5)
# Create collector that injects labels
labels_to_inject = {
"dynamo_namespace": "prod",
"dynamo_component": "test-component",
}
collector = LabelInjectingCollector(source_registry, labels_to_inject)
# Collect metrics
metric_families = list(collector.collect())
# Verify counter exists with injected labels
assert len(metric_families) == 1
mf = metric_families[0]
assert mf.name == "test_counter"
assert mf.type == "counter"
# Verify samples have injected labels
# Counter may have multiple samples: _total (value) and _created (timestamp)
assert len(mf.samples) >= 1
# Find the _total sample (the actual counter value)
total_sample = next(s for s in mf.samples if s.name.endswith("_total"))
assert total_sample.labels["dynamo_namespace"] == "prod"
assert total_sample.labels["dynamo_component"] == "test-component"
assert total_sample.value == 5
def test_gauge_label_injection(self):
"""Test injecting labels into Gauge metrics"""
from dynamo.common.utils.label_injecting_collector import (
LabelInjectingCollector,
)
# Create source registry with a gauge
source_registry = CollectorRegistry()
gauge = Gauge(
"test_gauge",
"Test gauge",
registry=source_registry,
)
gauge.set(42)
# Create collector that injects labels
labels_to_inject = {"dynamo_endpoint": "generate", "model": "llama-3-70b"}
collector = LabelInjectingCollector(source_registry, labels_to_inject)
# Collect metrics
metric_families = list(collector.collect())
# Verify gauge exists with injected labels
assert len(metric_families) == 1
mf = metric_families[0]
assert mf.name == "test_gauge"
assert mf.type == "gauge"
# Verify samples have injected labels
assert len(mf.samples) == 1
sample = mf.samples[0]
assert sample.labels["dynamo_endpoint"] == "generate"
assert sample.labels["model"] == "llama-3-70b"
assert sample.value == 42
def test_histogram_preserves_le_label(self):
"""Test that histogram 'le' label is preserved and not overwritten"""
from dynamo.common.utils.label_injecting_collector import (
LabelInjectingCollector,
)
# Create source registry with a histogram
source_registry = CollectorRegistry()
histogram = Histogram(
"test_histogram",
"Test histogram",
registry=source_registry,
)
histogram.observe(0.5)
histogram.observe(1.5)
histogram.observe(2.5)
# Create collector that injects labels
labels_to_inject = {
"dynamo_namespace": "prod",
"dynamo_component": "vllm-worker",
}
collector = LabelInjectingCollector(source_registry, labels_to_inject)
# Collect metrics
metric_families = list(collector.collect())
# Find histogram metric family
histogram_mf = next(mf for mf in metric_families if mf.name == "test_histogram")
assert histogram_mf.type == "histogram"
# Verify histogram buckets have 'le' label preserved
bucket_samples = [s for s in histogram_mf.samples if s.name.endswith("_bucket")]
assert len(bucket_samples) > 0
for sample in bucket_samples:
# Verify 'le' label exists (reserved for histogram buckets)
assert "le" in sample.labels
# Verify injected labels are present
assert sample.labels["dynamo_namespace"] == "prod"
assert sample.labels["dynamo_component"] == "vllm-worker"
# Verify sum and count samples also have injected labels
sum_sample = next(s for s in histogram_mf.samples if s.name.endswith("_sum"))
assert sum_sample.labels["dynamo_namespace"] == "prod"
assert sum_sample.labels["dynamo_component"] == "vllm-worker"
count_sample = next(
s for s in histogram_mf.samples if s.name.endswith("_count")
)
assert count_sample.labels["dynamo_namespace"] == "prod"
assert count_sample.labels["dynamo_component"] == "vllm-worker"
def test_summary_preserves_quantile_label(self):
"""Test that summary 'quantile' label is preserved"""
from dynamo.common.utils.label_injecting_collector import (
LabelInjectingCollector,
)
# Create source registry with a summary
source_registry = CollectorRegistry()
summary = Summary(
"test_summary",
"Test summary",
registry=source_registry,
)
summary.observe(1.0)
summary.observe(2.0)
summary.observe(3.0)
# Create collector that injects labels
labels_to_inject = {"dynamo_endpoint": "generate", "rank": "0"}
collector = LabelInjectingCollector(source_registry, labels_to_inject)
# Collect metrics
metric_families = list(collector.collect())
# Find summary metric family
summary_mf = next(mf for mf in metric_families if mf.name == "test_summary")
assert summary_mf.type == "summary"
# Verify summary quantiles have 'quantile' label preserved (if present)
# Note: Not all summary implementations expose quantiles, so this is optional
quantile_samples = [s for s in summary_mf.samples if "quantile" in s.labels]
for sample in quantile_samples:
# Verify 'quantile' label exists (reserved for summary quantiles)
assert "quantile" in sample.labels
# Verify injected labels are present
assert sample.labels["dynamo_endpoint"] == "generate"
assert sample.labels["rank"] == "0"
# Verify sum and count samples have injected labels
sum_sample = next(s for s in summary_mf.samples if s.name.endswith("_sum"))
assert sum_sample.labels["dynamo_endpoint"] == "generate"
assert sum_sample.labels["rank"] == "0"
count_sample = next(s for s in summary_mf.samples if s.name.endswith("_count"))
assert count_sample.labels["dynamo_endpoint"] == "generate"
assert count_sample.labels["rank"] == "0"
def test_multiple_labels_injection(self):
"""Test injecting multiple labels at once"""
from dynamo.common.utils.label_injecting_collector import (
LabelInjectingCollector,
)
# Create source registry with a counter
source_registry = CollectorRegistry()
counter = Counter("test_counter", "Test counter", registry=source_registry)
counter.inc()
# Create collector with multiple labels to inject
labels_to_inject = {
"dynamo_namespace": "prod",
"dynamo_component": "vllm-worker",
"dynamo_endpoint": "generate",
"model": "llama-3-70b",
"instance_id": "worker-0",
"rank": "0",
}
collector = LabelInjectingCollector(source_registry, labels_to_inject)
# Collect metrics
metric_families = list(collector.collect())
assert len(metric_families) == 1
# Verify all labels are present
sample = metric_families[0].samples[0]
assert sample.labels["dynamo_namespace"] == "prod"
assert sample.labels["dynamo_component"] == "vllm-worker"
assert sample.labels["dynamo_endpoint"] == "generate"
assert sample.labels["model"] == "llama-3-70b"
assert sample.labels["instance_id"] == "worker-0"
assert sample.labels["rank"] == "0"
def test_merge_with_existing_labels(self):
"""Test injecting labels into metrics that already have labels"""
from dynamo.common.utils.label_injecting_collector import (
LabelInjectingCollector,
)
# Create source registry with a counter that has labels
source_registry = CollectorRegistry()
counter = Counter(
"test_counter",
"Test counter",
labelnames=["status", "method"],
registry=source_registry,
)
counter.labels(status="success", method="GET").inc(10)
counter.labels(status="error", method="POST").inc(5)
# Create collector that injects additional labels
labels_to_inject = {
"dynamo_namespace": "prod",
"dynamo_component": "vllm-worker",
}
collector = LabelInjectingCollector(source_registry, labels_to_inject)
# Collect metrics
metric_families = list(collector.collect())
assert len(metric_families) == 1
# Verify both original and injected labels are present
samples = metric_families[0].samples
# Counter may have _total and _created samples for each label combination
assert len(samples) >= 2
# Filter to only _total samples (the actual counter values)
total_samples = [s for s in samples if s.name.endswith("_total")]
assert len(total_samples) == 2
# First sample: status=success, method=GET
sample1 = total_samples[0]
assert sample1.labels["status"] == "success"
assert sample1.labels["method"] == "GET"
assert sample1.labels["dynamo_namespace"] == "prod"
assert sample1.labels["dynamo_component"] == "vllm-worker"
assert sample1.value == 10
# Second sample: status=error, method=POST
sample2 = total_samples[1]
assert sample2.labels["status"] == "error"
assert sample2.labels["method"] == "POST"
assert sample2.labels["dynamo_namespace"] == "prod"
assert sample2.labels["dynamo_component"] == "vllm-worker"
assert sample2.value == 5
def test_existing_label_not_overwritten(self):
"""Test that existing labels take precedence over injected labels"""
from dynamo.common.utils.label_injecting_collector import (
LabelInjectingCollector,
)
# Create source registry with a counter that has a 'model' label
source_registry = CollectorRegistry()
counter = Counter(
"test_counter",
"Test counter",
labelnames=["model"],
registry=source_registry,
)
counter.labels(model="original-model").inc()
# Try to inject a 'model' label with different value
labels_to_inject = {
"model": "injected-model", # This should NOT overwrite existing label
"dynamo_namespace": "prod",
}
collector = LabelInjectingCollector(source_registry, labels_to_inject)
# Collect metrics
metric_families = list(collector.collect())
sample = metric_families[0].samples[0]
# Verify original label is preserved (not overwritten)
assert sample.labels["model"] == "original-model"
assert sample.labels["dynamo_namespace"] == "prod"
def test_empty_labels_raises_error(self):
"""Test that empty labels dict raises ValueError"""
from dynamo.common.utils.label_injecting_collector import (
LabelInjectingCollector,
)
source_registry = CollectorRegistry()
# Empty labels dict should raise ValueError
with pytest.raises(ValueError, match="labels_to_inject cannot be empty"):
LabelInjectingCollector(source_registry, {})
def test_reserved_label_le_raises_error(self):
"""Test that trying to inject reserved label 'le' raises ValueError"""
from dynamo.common.utils.label_injecting_collector import (
LabelInjectingCollector,
)
source_registry = CollectorRegistry()
# Trying to inject reserved 'le' label should raise ValueError
with pytest.raises(ValueError, match="Cannot inject reserved label names"):
LabelInjectingCollector(
source_registry,
{"le": "1.0", "dynamo_namespace": "prod"},
)
def test_reserved_label_quantile_raises_error(self):
"""Test that trying to inject reserved label 'quantile' raises ValueError"""
from dynamo.common.utils.label_injecting_collector import (
LabelInjectingCollector,
)
source_registry = CollectorRegistry()
# Trying to inject reserved 'quantile' label should raise ValueError
with pytest.raises(ValueError, match="Cannot inject reserved label names"):
LabelInjectingCollector(
source_registry,
{"quantile": "0.99", "dynamo_namespace": "prod"},
)
def test_multiple_metrics_all_get_labels(self):
"""Test that all metrics in registry get injected labels"""
from dynamo.common.utils.label_injecting_collector import (
LabelInjectingCollector,
)
# Create source registry with multiple metrics
source_registry = CollectorRegistry()
counter = Counter("test_counter", "Test counter", registry=source_registry)
gauge = Gauge("test_gauge", "Test gauge", registry=source_registry)
histogram = Histogram(
"test_histogram", "Test histogram", registry=source_registry
)
counter.inc(5)
gauge.set(42)
histogram.observe(1.5)
# Create collector that injects labels
labels_to_inject = {
"dynamo_namespace": "prod",
"dynamo_component": "vllm-worker",
}
collector = LabelInjectingCollector(source_registry, labels_to_inject)
# Collect metrics
metric_families = list(collector.collect())
assert len(metric_families) == 3
# Verify all metric families have injected labels in their samples
for mf in metric_families:
for sample in mf.samples:
assert sample.labels["dynamo_namespace"] == "prod"
assert sample.labels["dynamo_component"] == "vllm-worker"
def test_timestamp_preservation(self):
"""Test that timestamps are preserved"""
from dynamo.common.utils.label_injecting_collector import (
LabelInjectingCollector,
)
# Create source registry with a gauge
source_registry = CollectorRegistry()
gauge = Gauge("test_gauge", "Test gauge", registry=source_registry)
gauge.set(42)
# Create collector
labels_to_inject = {"dynamo_namespace": "prod"}
collector = LabelInjectingCollector(source_registry, labels_to_inject)
# Collect metrics
metric_families = list(collector.collect())
sample = metric_families[0].samples[0]
# Verify timestamp attribute exists (may be None)
assert sample.timestamp is None or isinstance(sample.timestamp, (float, int))
......@@ -10,6 +10,7 @@ import time
import sglang as sgl
import uvloop
from dynamo import prometheus_names
from dynamo.common.config_dump import dump_config
from dynamo.common.storage import get_fs
from dynamo.common.utils.endpoint_types import parse_endpoint_types
......@@ -530,11 +531,14 @@ async def init_multimodal_processor(runtime: DistributedRuntime, config: Config)
await encode_worker_client.wait_for_instances()
try:
await asyncio.gather(
_ = await asyncio.gather(
generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True,
metrics_labels=[("model", server_args.served_model_name)],
metrics_labels=[
(prometheus_names.labels.MODEL, server_args.served_model_name),
(prometheus_names.labels.MODEL_NAME, server_args.served_model_name),
],
),
register_llm_with_readiness_gate(
None, # engine
......@@ -581,7 +585,10 @@ async def init_multimodal_encode_worker(runtime: DistributedRuntime, config: Con
await generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True,
metrics_labels=[("model", server_args.served_model_name)],
metrics_labels=[
(prometheus_names.labels.MODEL, server_args.served_model_name),
(prometheus_names.labels.MODEL_NAME, server_args.served_model_name),
],
)
except Exception as e:
logging.error(f"Failed to serve endpoints: {e}")
......
......@@ -283,7 +283,7 @@ class DynamoSglangPublisher:
def setup_prometheus_registry(
engine: sgl.Engine, generate_endpoint: Endpoint
engine: sgl.Engine, generate_endpoint: Endpoint, config: Config
) -> CollectorRegistry:
"""Set up Prometheus registry for SGLang metrics collection.
......@@ -304,6 +304,7 @@ def setup_prometheus_registry(
Args:
engine: The SGLang engine instance.
generate_endpoint: The Dynamo endpoint for generation requests.
config: SGLang configuration including dynamo_args with namespace/component/endpoint.
Returns:
Configured CollectorRegistry with multiprocess support.
......@@ -314,10 +315,15 @@ def setup_prometheus_registry(
multiprocess.MultiProcessCollector(registry)
# Register callback for SGLang metrics (sglang:* prefixed)
# Auto-label injection: hierarchy labels are added automatically
register_engine_metrics_callback(
endpoint=generate_endpoint,
registry=registry,
metric_prefix_filters=["sglang:"],
namespace_name=config.dynamo_args.namespace,
component_name=config.dynamo_args.component,
endpoint_name=config.dynamo_args.endpoint,
model_name=engine.server_args.served_model_name,
)
return registry
......@@ -344,7 +350,7 @@ async def setup_sgl_metrics(
# SGLang only calls set_prometheus_multiproc_dir() when enable_metrics=True,
# so MultiProcessCollector will crash without it.
if engine.server_args.enable_metrics:
setup_prometheus_registry(engine, generate_endpoint)
setup_prometheus_registry(engine, generate_endpoint, config)
# Always register the Dynamo component metrics callback (total_blocks,
# gpu_cache_usage, model_load_time). These use a dedicated registry that
......
......@@ -71,8 +71,7 @@ def test_prometheus_metrics_integration():
register_engine_metrics_callback(
endpoint=mock_endpoint,
registry=REGISTRY,
exclude_prefixes=["python_", "process_"],
add_prefix="trtllm_",
metric_prefix_filters=["trtllm_"],
)
print("✅ Prometheus metrics integration test passed")
......
......@@ -27,20 +27,20 @@ python_gc_objects_collected_total{generation="0"} 123.0
# HELP process_cpu_seconds_total Total user and system CPU time spent in seconds
# TYPE process_cpu_seconds_total counter
process_cpu_seconds_total 45.6
# HELP request_latency_seconds Request latency in seconds
# TYPE request_latency_seconds histogram
request_latency_seconds_bucket{le="0.1"} 10.0
request_latency_seconds_count 25.0
# HELP num_requests_running Number of requests currently running
# TYPE num_requests_running gauge
num_requests_running 3.0
# HELP tokens_per_second Tokens generated per second
# TYPE tokens_per_second gauge
tokens_per_second 245.7
# HELP trtllm_request_latency_seconds Request latency in seconds
# TYPE trtllm_request_latency_seconds histogram
trtllm_request_latency_seconds_bucket{le="0.1"} 10.0
trtllm_request_latency_seconds_count 25.0
# HELP trtllm_num_requests_running Number of requests currently running
# TYPE trtllm_num_requests_running gauge
trtllm_num_requests_running 3.0
# HELP trtllm_tokens_per_second Tokens generated per second
# TYPE trtllm_tokens_per_second gauge
trtllm_tokens_per_second 245.7
"""
def test_trtllm_use_case(self):
"""Test TensorRT-LLM use case: exclude python_/process_ and add trtllm_ prefix."""
"""Test TensorRT-LLM use case: filter to include only trtllm_* metrics (after traffic)."""
registry = Mock()
with patch(
......@@ -49,15 +49,14 @@ tokens_per_second 245.7
):
result = get_prometheus_expfmt(
registry,
exclude_prefixes=["python_", "process_"],
add_prefix="trtllm_",
metric_prefix_filters=["trtllm_"],
)
# Should not contain excluded metrics
# Should not contain excluded metrics (filtered out by metric_prefix_filters)
assert "python_gc_objects_collected_total" not in result
assert "process_cpu_seconds_total" not in result
# All remaining metrics should have trtllm_ prefix
# All remaining metrics should have trtllm_ prefix (already present from TRT-LLM engine)
assert "trtllm_request_latency_seconds" in result
assert "trtllm_num_requests_running" in result
assert "trtllm_tokens_per_second" in result
......@@ -98,7 +97,7 @@ tokens_per_second 245.7
):
result = get_prometheus_expfmt(
registry,
exclude_prefixes=["python_", "process_", "request_", "num_", "tokens_"],
exclude_prefixes=["python_", "process_", "trtllm_"],
)
# Should return empty string with newline or just newline
......@@ -124,7 +123,7 @@ trtllm_time_to_first_token_seconds_count 5.0
result = get_prometheus_expfmt(
registry,
exclude_prefixes=["python_", "process_"],
add_prefix="trtllm_",
metric_prefix_filters=["trtllm_"],
)
# Should not double-add prefix
......
......@@ -29,6 +29,7 @@ from torch.cuda import device_count
from transformers import AutoConfig
import dynamo.nixl_connect as nixl_connect
from dynamo import prometheus_names
from dynamo.common.config_dump import dump_config
from dynamo.common.utils.endpoint_types import parse_endpoint_types
from dynamo.common.utils.prometheus import (
......@@ -392,13 +393,16 @@ async def init_llm_worker(
)
logging.info("TensorRT-LLM MetricsCollector initialized")
# Register callback to expose TRT-LLM metrics via Dynamo endpoint
# Note: latest TRT-LLM's MetricsCollector already adds the 'trtllm_' prefix to all metrics,
# so we filter by that prefix to include only TRT-LLM metrics.
# Register TRT-LLM metrics (TRT-LLM natively outputs trtllm_* metrics after traffic)
# Auto-label injection: hierarchy labels are added automatically
register_engine_metrics_callback(
endpoint=endpoint,
registry=REGISTRY,
metric_prefix_filters=["trtllm_"],
namespace_name=config.namespace,
component_name=config.component,
endpoint_name="generate",
model_name=model_name_for_metrics,
)
logging.info("TensorRT-LLM Prometheus metrics registered")
except Exception as e:
......@@ -456,7 +460,16 @@ async def init_llm_worker(
)
# Use model_path as fallback if served_model_name is not provided
model_name_for_metrics = config.served_model_name or config.model_path
metrics_labels = [("model", model_name_for_metrics)]
metrics_labels = [
(
prometheus_names.labels.MODEL,
model_name_for_metrics,
), # OpenAI standard
(
prometheus_names.labels.MODEL_NAME,
model_name_for_metrics,
), # Native engine compatibility
]
# Create worker-side publisher for consolidated events if consolidator is enabled
# This subscribes to consolidator's ZMQ output and publishes to NATS with worker_id
......
......@@ -15,6 +15,7 @@ from vllm.usage.usage_lib import UsageContext
from vllm.v1.engine.async_llm import AsyncLLM
from vllm.v1.metrics.prometheus import setup_multiprocess_prometheus
from dynamo import prometheus_names
from dynamo.common.config_dump import dump_config
from dynamo.common.utils.endpoint_types import parse_endpoint_types
from dynamo.common.utils.prometheus import (
......@@ -269,6 +270,11 @@ def setup_metrics_collection(config: Config, generate_endpoint, logger):
Solution: Try adding MultiProcessCollector to REGISTRY. If that fails, use
separate registry for multiprocess collection and register callbacks to both
registries to ensure all metrics (vllm, lmcache, dynamo_component) are collected.
Auto-label injection:
Hierarchy labels (dynamo_namespace, dynamo_component, dynamo_endpoint) are automatically
injected into engine metrics to align Python metrics with Rust auto-labels.
Additional labels can be provided via inject_labels parameter.
"""
if config.engine_args.disable_log_stats is False:
# Register the dedicated dynamo_component registry callback
......@@ -291,10 +297,11 @@ def setup_metrics_collection(config: Config, generate_endpoint, logger):
register_engine_metrics_callback(
endpoint=generate_endpoint,
registry=REGISTRY,
metric_prefix_filters=[
"vllm:",
"lmcache:",
],
metric_prefix_filters=["vllm:", "lmcache:"],
namespace_name=config.namespace,
component_name=config.component,
endpoint_name=config.endpoint,
model_name=config.model,
)
except ValueError as e:
# Conflict: metrics already in REGISTRY, MultiProcessCollector tries to add same metrics from .db files
......@@ -311,15 +318,20 @@ def setup_metrics_collection(config: Config, generate_endpoint, logger):
endpoint=generate_endpoint,
registry=REGISTRY,
metric_prefix_filters=["vllm:"],
namespace_name=config.namespace,
component_name=config.component,
endpoint_name=config.endpoint,
model_name=config.model,
)
# Multiproc registry has .db file metrics (lmcache, possibly vllm duplicates)
register_engine_metrics_callback(
endpoint=generate_endpoint,
registry=multiproc_registry,
metric_prefix_filters=[
"vllm:",
"lmcache:",
],
metric_prefix_filters=["vllm:", "lmcache:"],
namespace_name=config.namespace,
component_name=config.component,
endpoint_name=config.endpoint,
model_name=config.model,
)
else:
# No multiprocess mode
......@@ -327,6 +339,10 @@ def setup_metrics_collection(config: Config, generate_endpoint, logger):
endpoint=generate_endpoint,
registry=REGISTRY,
metric_prefix_filters=["vllm:", "lmcache:"],
namespace_name=config.namespace,
component_name=config.component,
endpoint_name=config.endpoint,
model_name=config.model,
)
......@@ -694,12 +710,24 @@ async def init_prefill(
handler.generate,
graceful_shutdown=True,
# In practice config.served_model_name is always set, but mypy needs the "or" here.
metrics_labels=[("model", config.served_model_name or config.model)],
metrics_labels=[
(
prometheus_names.labels.MODEL,
config.served_model_name or config.model,
),
(
prometheus_names.labels.MODEL_NAME,
config.served_model_name or config.model,
),
],
health_check_payload=health_check_payload,
),
clear_endpoint.serve_endpoint(
handler.clear_kv_blocks,
metrics_labels=[("model", config.served_model_name)],
metrics_labels=[
(prometheus_names.labels.MODEL, config.served_model_name),
(prometheus_names.labels.MODEL_NAME, config.served_model_name),
],
),
)
logger.debug("serve_endpoint completed for prefill worker")
......@@ -856,24 +884,69 @@ async def init(
generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True,
metrics_labels=[("model", config.served_model_name or config.model)],
metrics_labels=[
(
prometheus_names.labels.MODEL,
config.served_model_name or config.model,
),
(
prometheus_names.labels.MODEL_NAME,
config.served_model_name or config.model,
),
],
health_check_payload=health_check_payload,
),
clear_endpoint.serve_endpoint(
handler.clear_kv_blocks,
metrics_labels=[("model", config.served_model_name or config.model)],
metrics_labels=[
(
prometheus_names.labels.MODEL,
config.served_model_name or config.model,
),
(
prometheus_names.labels.MODEL_NAME,
config.served_model_name or config.model,
),
],
),
load_lora_endpoint.serve_endpoint(
handler.load_lora,
metrics_labels=[("model", config.served_model_name or config.model)],
metrics_labels=[
(
prometheus_names.labels.MODEL,
config.served_model_name or config.model,
),
(
prometheus_names.labels.MODEL_NAME,
config.served_model_name or config.model,
),
],
),
unload_lora_endpoint.serve_endpoint(
handler.unload_lora,
metrics_labels=[("model", config.served_model_name or config.model)],
metrics_labels=[
(
prometheus_names.labels.MODEL,
config.served_model_name or config.model,
),
(
prometheus_names.labels.MODEL_NAME,
config.served_model_name or config.model,
),
],
),
list_loras_endpoint.serve_endpoint(
handler.list_loras,
metrics_labels=[("model", config.served_model_name or config.model)],
metrics_labels=[
(
prometheus_names.labels.MODEL,
config.served_model_name or config.model,
),
(
prometheus_names.labels.MODEL_NAME,
config.served_model_name or config.model,
),
],
),
)
logger.debug("serve_endpoint completed for decode worker")
......@@ -961,7 +1034,11 @@ async def init_multimodal_processor(
try:
await asyncio.gather(
generate_endpoint.serve_endpoint(
handler.generate, metrics_labels=[("model", config.model)]
handler.generate,
metrics_labels=[
(prometheus_names.labels.MODEL, config.model),
(prometheus_names.labels.MODEL_NAME, config.model),
],
),
)
except Exception as e:
......@@ -1001,7 +1078,11 @@ async def init_multimodal_encode_worker(
try:
await asyncio.gather(
generate_endpoint.serve_endpoint(
handler.generate, metrics_labels=[("model", config.model)]
handler.generate,
metrics_labels=[
(prometheus_names.labels.MODEL, config.model),
(prometheus_names.labels.MODEL_NAME, config.model),
],
),
)
except Exception as e:
......@@ -1065,7 +1146,11 @@ async def init_vllm_native_encoder(
try:
await asyncio.gather(
generate_endpoint.serve_endpoint(
handler.generate, metrics_labels=[("model", config.model)]
handler.generate,
metrics_labels=[
(prometheus_names.labels.MODEL, config.model),
(prometheus_names.labels.MODEL_NAME, config.model),
],
),
)
except Exception as e:
......@@ -1137,7 +1222,11 @@ async def init_ec_processor(
try:
await asyncio.gather(
generate_endpoint.serve_endpoint(
handler.generate, metrics_labels=[("model", config.model)]
handler.generate,
metrics_labels=[
(prometheus_names.labels.MODEL, config.model),
(prometheus_names.labels.MODEL_NAME, config.model),
],
),
)
except Exception as e:
......@@ -1245,7 +1334,10 @@ async def init_multimodal_worker(
if kv_publisher:
handler.kv_publisher = kv_publisher
metrics_labels = [("model", config.model)]
metrics_labels = [
(prometheus_names.labels.MODEL, config.model),
(prometheus_names.labels.MODEL_NAME, config.model),
]
try:
await asyncio.gather(
generate_endpoint.serve_endpoint(
......@@ -1318,7 +1410,16 @@ async def init_omni(
await generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True,
metrics_labels=[("model", config.served_model_name or config.model)],
metrics_labels=[
(
prometheus_names.labels.MODEL,
config.served_model_name or config.model,
),
(
prometheus_names.labels.MODEL_NAME,
config.served_model_name or config.model,
),
],
health_check_payload=health_check_payload,
)
except Exception as e:
......
......@@ -173,7 +173,7 @@ TensorRT-LLM provides extensive performance data beyond the basic Prometheus met
## Implementation Details
- **Prometheus Integration**: Uses the `MetricsCollector` class from `tensorrt_llm.metrics` (see [collector.py](https://github.com/NVIDIA/TensorRT-LLM/blob/main/tensorrt_llm/metrics/collector.py))
- **Dynamo Integration**: Uses `register_engine_metrics_callback()` function with `add_prefix="trtllm_"`
- **Dynamo Integration**: Uses `register_engine_metrics_callback()` function with `metric_prefix_filter=["trtllm_"]`
- **Engine Configuration**: `return_perf_metrics` set to `True` when `--publish-events-and-metrics` is enabled
- **Initialization**: Metrics appear after TensorRT-LLM engine initialization completes
- **Metadata**: `MetricsCollector` initialized with model metadata (model name, engine type)
......
......@@ -11,7 +11,7 @@ When running TensorRT-LLM through Dynamo, TensorRT-LLM's Prometheus metrics are
Additional performance metrics are available via non-Prometheus APIs (see [Non-Prometheus Performance Metrics](#non-prometheus-performance-metrics) below).
As of the date of this documentation, the included TensorRT-LLM version 1.1.0rc5 exposes **5 basic Prometheus metrics**. Note that the `trtllm_` prefix is added by Dynamo.
TensorRT-LLM natively exposes several Prometheus metrics with the `trtllm_` prefix. The specific metrics available depend on your TensorRT-LLM version.
**For Dynamo runtime metrics**, see the [Dynamo Metrics Guide](../../observability/metrics.md).
......@@ -114,7 +114,7 @@ TensorRT-LLM provides metrics in the following categories (all prefixed with `tr
## Available Metrics
The following metrics are exposed via Dynamo's `/metrics` endpoint (with the `trtllm_` prefix added by Dynamo) for TensorRT-LLM version 1.1.0rc5:
TensorRT-LLM exposes metrics via Dynamo's `/metrics` endpoint with the `trtllm_` prefix. Common metrics include:
- `trtllm_request_success_total` (Counter) — Count of successfully processed requests by finish reason
- Labels: `model_name`, `engine_type`, `finished_reason`
......@@ -127,7 +127,7 @@ The following metrics are exposed via Dynamo's `/metrics` endpoint (with the `tr
- `trtllm_request_queue_time_seconds` (Histogram) — Time a request spends waiting in the queue (seconds)
- Labels: `model_name`, `engine_type`
These metric names and availability are subject to change with TensorRT-LLM version updates.
**Note:** The specific metrics available depend on your TensorRT-LLM version. Always inspect your actual `/metrics` endpoint to see the current list of metrics for your version.
TensorRT-LLM provides Prometheus metrics through the `MetricsCollector` class (see [tensorrt_llm/metrics/collector.py](https://github.com/NVIDIA/TensorRT-LLM/blob/main/tensorrt_llm/metrics/collector.py)).
......@@ -168,14 +168,14 @@ TensorRT-LLM provides extensive performance data beyond the basic Prometheus met
}
```
**Note:** These structures are valid as of the date of this documentation but are subject to change with TensorRT-LLM version updates.
**Note:** These structures may vary depending on your TensorRT-LLM version. Refer to the [TensorRT-LLM source code](https://github.com/NVIDIA/TensorRT-LLM/blob/main/tensorrt_llm/executor/result.py) for the most up-to-date structure for your version.
## Implementation Details
- **Prometheus Integration**: Uses the `MetricsCollector` class from `tensorrt_llm.metrics` (see [collector.py](https://github.com/NVIDIA/TensorRT-LLM/blob/main/tensorrt_llm/metrics/collector.py))
- **Dynamo Integration**: Uses `register_engine_metrics_callback()` function with `add_prefix="trtllm_"`
- **Dynamo Integration**: Uses `register_engine_metrics_callback()` function to pass through TRT-LLM's native `trtllm_*` metrics
- **Engine Configuration**: `return_perf_metrics` set to `True` when `--publish-events-and-metrics` is enabled
- **Initialization**: Metrics appear after TensorRT-LLM engine initialization completes
- **Initialization**: Metrics appear after TensorRT-LLM engine initialization completes and after at least one request is processed
- **Metadata**: `MetricsCollector` initialized with model metadata (model name, engine type)
## Related Documentation
......
......@@ -164,8 +164,15 @@ class labels:
# Note: this is not an auto-inserted label like `dynamo_namespace`/`dynamo_component`.
# It is used by worker/load-style metrics that need to disambiguate per-worker series.
DP_RANK = "dp_rank"
# Label for model name
# Label for model name/path (OpenAI API standard, injected by Dynamo)
# This is the standard label name injected by all backends in metrics_labels=[("model", ...)].
# Ensures compatibility with OpenAI-compatible tooling.
MODEL = "model"
# Label for model name/path (alternative/native engine label, injected by Dynamo)
# Some engines natively use model_name, so we inject both model and model_name
# to ensure maximum compatibility with both OpenAI standard and engine-native tooling.
# When a metric already has a label, injection does not overwrite it (original is preserved).
MODEL_NAME = "model_name"
# Label for worker type (e.g., "aggregated", "prefill", "decode", "encoder", etc.)
WORKER_TYPE = "worker_type"
......
......@@ -34,10 +34,6 @@ use crate::protocols::annotated::Annotated;
use crate::stream;
use crate::stream::StreamExt;
// If set to true, then metrics will be labeled with the namespace, component, and endpoint labels.
// These labels are prefixed with "dynamo_" to avoid collisions with Kubernetes and other monitoring system labels.
pub const USE_AUTO_LABELS: bool = true;
// Prometheus imports
use prometheus::Encoder;
......@@ -224,43 +220,47 @@ pub fn create_metric<T: PrometheusMetric, H: MetricsHierarchy + ?Sized>(
// Build updated_labels: auto-labels first, then `labels` + stored labels
let mut updated_labels: Vec<(String, String)> = Vec::new();
if USE_AUTO_LABELS {
// Validate that user-provided labels don't conflict with auto-generated labels
for (key, _) in labels {
if *key == labels::NAMESPACE || *key == labels::COMPONENT || *key == labels::ENDPOINT {
return Err(anyhow::anyhow!(
"Label '{}' is automatically added by auto_label feature and cannot be manually set",
key
));
}
// Auto-label injection: Always add dynamo_namespace, dynamo_component, dynamo_endpoint labels
// based on the hierarchy. Label constants defined in prometheus_names.rs labels module.
//
// Python counterpart: components/src/dynamo/common/utils/prometheus.py register_engine_metrics_callback()
// Validate that user-provided labels don't conflict with auto-generated labels
for (key, _) in labels {
if *key == labels::NAMESPACE || *key == labels::COMPONENT || *key == labels::ENDPOINT {
return Err(anyhow::anyhow!(
"Label '{}' is automatically added by auto-label injection and cannot be manually set",
key
));
}
}
// Add auto-generated labels with sanitized values
if hierarchy_names.len() > 1 {
let namespace = &hierarchy_names[1];
if !namespace.is_empty() {
let valid_namespace = sanitize_prometheus_label(namespace)?;
if !valid_namespace.is_empty() {
updated_labels.push((labels::NAMESPACE.to_string(), valid_namespace));
}
// Add auto-generated labels with sanitized values
// Hierarchy: [drt, namespace, component, endpoint]
if hierarchy_names.len() > 1 {
let namespace = &hierarchy_names[1];
if !namespace.is_empty() {
let valid_namespace = sanitize_prometheus_label(namespace)?;
if !valid_namespace.is_empty() {
updated_labels.push((labels::NAMESPACE.to_string(), valid_namespace));
}
}
if hierarchy_names.len() > 2 {
let component = &hierarchy_names[2];
if !component.is_empty() {
let valid_component = sanitize_prometheus_label(component)?;
if !valid_component.is_empty() {
updated_labels.push((labels::COMPONENT.to_string(), valid_component));
}
}
if hierarchy_names.len() > 2 {
let component = &hierarchy_names[2];
if !component.is_empty() {
let valid_component = sanitize_prometheus_label(component)?;
if !valid_component.is_empty() {
updated_labels.push((labels::COMPONENT.to_string(), valid_component));
}
}
if hierarchy_names.len() > 3 {
let endpoint = &hierarchy_names[3];
if !endpoint.is_empty() {
let valid_endpoint = sanitize_prometheus_label(endpoint)?;
if !valid_endpoint.is_empty() {
updated_labels.push((labels::ENDPOINT.to_string(), valid_endpoint));
}
}
if hierarchy_names.len() > 3 {
let endpoint = &hierarchy_names[3];
if !endpoint.is_empty() {
let valid_endpoint = sanitize_prometheus_label(endpoint)?;
if !valid_endpoint.is_empty() {
updated_labels.push((labels::ENDPOINT.to_string(), valid_endpoint));
}
}
}
......
......@@ -71,6 +71,12 @@ pub mod name_prefix {
}
/// Automatically inserted Prometheus label names used across the metrics system
///
/// These labels are auto-injected into metrics by the hierarchy system:
/// - Rust: lib/runtime/src/metrics.rs create_metric() function
/// - Python: components/src/dynamo/common/utils/prometheus.py register_engine_metrics_callback()
///
/// Python codegen: These constants are exported to lib/bindings/python/src/dynamo/prometheus_names.py
pub mod labels {
/// Label for component identification
pub const COMPONENT: &str = "dynamo_component";
......@@ -87,9 +93,17 @@ pub mod labels {
/// It is used by worker/load-style metrics that need to disambiguate per-worker series.
pub const DP_RANK: &str = "dp_rank";
/// Label for model name
/// Label for model name/path (OpenAI API standard, injected by Dynamo)
/// This is the standard label name injected by all backends in metrics_labels=[("model", ...)].
/// Ensures compatibility with OpenAI-compatible tooling.
pub const MODEL: &str = "model";
/// Label for model name/path (alternative/native engine label, injected by Dynamo)
/// Some engines natively use model_name, so we inject both model and model_name
/// to ensure maximum compatibility with both OpenAI standard and engine-native tooling.
/// When a metric already has a label, injection does not overwrite it (original is preserved).
pub const MODEL_NAME: &str = "model_name";
/// Label for worker type (e.g., "aggregated", "prefill", "decode", "encoder", etc.)
pub const WORKER_TYPE: &str = "worker_type";
}
......
......@@ -12,31 +12,11 @@ import argparse
import asyncio
import math
import os
# Mock dependencies before importing planner modules
import sys
# We'll import the actual Planner class to test its calculation logic
from unittest.mock import MagicMock, Mock, patch
from unittest.mock import Mock, patch
import pytest
# Create mock modules for dependencies that might not be available in test environment
mock_prometheus = MagicMock()
mock_prometheus.Gauge = MagicMock()
mock_prometheus.start_http_server = MagicMock()
mock_runtime = MagicMock()
mock_runtime.logging = MagicMock()
mock_runtime.logging.configure_dynamo_logging = MagicMock()
# Patch them into sys.modules before importing
sys.modules["prometheus_client"] = mock_prometheus
sys.modules["dynamo.runtime"] = mock_runtime
sys.modules["dynamo.runtime.logging"] = mock_runtime.logging
# Now import after mocking
from dynamo.planner.utils.planner_core import ( # noqa: E402
from dynamo.planner.utils.planner_core import (
DecodePlanner,
Metrics,
PlannerSharedState,
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Integration tests for Prometheus label injection via get_prometheus_expfmt.
Tests the complete flow of label injection through the exposition format generation:
get_prometheus_expfmt with inject_custom_labels -> verify labels in output text format.
"""
import pytest
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram
from dynamo import prometheus_names
from dynamo.common.utils.prometheus import get_prometheus_expfmt
pytestmark = [pytest.mark.unit, pytest.mark.pre_merge, pytest.mark.gpu_0]
class TestPrometheusExpositionFormatInjection:
"""Integration tests for label injection through exposition format generation"""
def test_inject_labels_into_counter_expfmt(self):
"""Test label injection produces correct exposition format for Counter"""
# Create registry with a counter
registry = CollectorRegistry()
counter = Counter("requests_total", "Total requests", registry=registry)
counter.inc(5)
# Get exposition format with label injection (using prometheus_names constants)
labels_to_inject = {
prometheus_names.labels.NAMESPACE: "prod",
prometheus_names.labels.COMPONENT: "vllm-worker",
prometheus_names.labels.ENDPOINT: "generate",
}
expfmt = get_prometheus_expfmt(registry, inject_custom_labels=labels_to_inject)
# Verify exposition format contains injected labels
assert f'{prometheus_names.labels.NAMESPACE}="prod"' in expfmt
assert f'{prometheus_names.labels.COMPONENT}="vllm-worker"' in expfmt
assert f'{prometheus_names.labels.ENDPOINT}="generate"' in expfmt
# Verify counter value is present
assert "requests_total_total" in expfmt or "requests_total{" in expfmt
# Verify HELP and TYPE comments are present
assert "# HELP requests_total" in expfmt
assert "# TYPE requests_total counter" in expfmt
def test_inject_labels_into_gauge_expfmt(self):
"""Test label injection produces correct exposition format for Gauge"""
# Create registry with a gauge
registry = CollectorRegistry()
gauge = Gauge("active_requests", "Active requests", registry=registry)
gauge.set(10)
# Get exposition format with label injection (using prometheus_names constants)
labels_to_inject = {
prometheus_names.labels.MODEL: "llama-3-70b",
"instance_id": "worker-0",
}
expfmt = get_prometheus_expfmt(registry, inject_custom_labels=labels_to_inject)
# Verify exposition format contains injected labels
assert f'{prometheus_names.labels.MODEL}="llama-3-70b"' in expfmt
assert 'instance_id="worker-0"' in expfmt
# Verify gauge value
assert "active_requests" in expfmt
assert "10" in expfmt or "10.0" in expfmt
def test_inject_labels_into_histogram_expfmt(self):
"""Test label injection preserves histogram structure and le label"""
# Create registry with a histogram
registry = CollectorRegistry()
histogram = Histogram(
"request_duration_seconds",
"Request duration",
registry=registry,
)
histogram.observe(0.5)
histogram.observe(1.5)
# Get exposition format with label injection (using prometheus_names constants)
labels_to_inject = {
prometheus_names.labels.NAMESPACE: "prod",
prometheus_names.labels.ENDPOINT: "generate",
}
expfmt = get_prometheus_expfmt(registry, inject_custom_labels=labels_to_inject)
# Verify injected labels are present
assert f'{prometheus_names.labels.NAMESPACE}="prod"' in expfmt
assert f'{prometheus_names.labels.ENDPOINT}="generate"' in expfmt
# Verify histogram structure (buckets with 'le' label)
assert "request_duration_seconds_bucket" in expfmt
assert "le=" in expfmt # Histogram buckets must have 'le' label
# Verify sum and count
assert "request_duration_seconds_sum" in expfmt
assert "request_duration_seconds_count" in expfmt
def test_inject_labels_with_prefix_filter(self):
"""Test label injection works with metric prefix filtering"""
# Create registry with multiple metrics
registry = CollectorRegistry()
vllm_counter = Counter("vllm:requests", "vLLM requests", registry=registry)
other_counter = Counter("python_gc_objects", "GC objects", registry=registry)
vllm_counter.inc(5)
other_counter.inc(100)
# Get exposition format with filtering and label injection
labels_to_inject = {
prometheus_names.labels.NAMESPACE: "prod",
prometheus_names.labels.MODEL: "llama-3-70b",
}
expfmt = get_prometheus_expfmt(
registry,
metric_prefix_filters=["vllm:"],
inject_custom_labels=labels_to_inject,
)
# Verify vllm metric is present with injected labels
assert "vllm:requests" in expfmt
assert f'{prometheus_names.labels.NAMESPACE}="prod"' in expfmt
assert f'{prometheus_names.labels.MODEL}="llama-3-70b"' in expfmt
# Verify other metric is filtered out
assert "python_gc_objects" not in expfmt
def test_inject_labels_with_exclude_prefix(self):
"""Test label injection works with exclude prefixes"""
# Create registry with multiple metrics
registry = CollectorRegistry()
app_counter = Counter("app_requests", "App requests", registry=registry)
python_counter = Counter("python_gc_objects", "GC objects", registry=registry)
app_counter.inc(5)
python_counter.inc(100)
# Get exposition format with exclude and label injection
labels_to_inject = {prometheus_names.labels.COMPONENT: "test-component"}
expfmt = get_prometheus_expfmt(
registry,
exclude_prefixes=["python_"],
inject_custom_labels=labels_to_inject,
)
# Verify app metric is present with injected label
assert "app_requests" in expfmt
assert f'{prometheus_names.labels.COMPONENT}="test-component"' in expfmt
# Verify python metric is excluded
assert "python_gc_objects" not in expfmt
def test_inject_labels_with_prefix_filter_trtllm(self):
"""Test label injection works with metric prefix filtering for trtllm"""
# Create registry with a counter that has trtllm_ prefix
registry = CollectorRegistry()
counter = Counter("trtllm_requests", "TensorRT-LLM Requests", registry=registry)
counter.inc(5)
# Get exposition format filtering for trtllm metrics and inject labels
labels_to_inject = {
prometheus_names.labels.NAMESPACE: "prod",
prometheus_names.labels.MODEL: "qwen-32b",
}
expfmt = get_prometheus_expfmt(
registry,
metric_prefix_filters=["trtllm_"],
inject_custom_labels=labels_to_inject,
)
# Verify metric is present with injected labels
assert "trtllm_requests" in expfmt
assert f'{prometheus_names.labels.NAMESPACE}="prod"' in expfmt
assert f'{prometheus_names.labels.MODEL}="qwen-32b"' in expfmt
def test_inject_labels_with_existing_labels(self):
"""Test label injection merges with existing metric labels"""
# Create registry with a counter that has labels
registry = CollectorRegistry()
counter = Counter(
"requests",
"Requests",
labelnames=["status", "method"],
registry=registry,
)
counter.labels(status="success", method="GET").inc(10)
# Get exposition format with label injection
labels_to_inject = {
prometheus_names.labels.NAMESPACE: "prod",
prometheus_names.labels.COMPONENT: "vllm-worker",
}
expfmt = get_prometheus_expfmt(registry, inject_custom_labels=labels_to_inject)
# Verify both existing and injected labels are present
assert 'status="success"' in expfmt
assert 'method="GET"' in expfmt
assert f'{prometheus_names.labels.NAMESPACE}="prod"' in expfmt
assert f'{prometheus_names.labels.COMPONENT}="vllm-worker"' in expfmt
def test_inject_multiple_labels(self):
"""Test injecting many labels at once"""
# Create registry with a gauge
registry = CollectorRegistry()
gauge = Gauge("memory_usage_bytes", "Memory usage", registry=registry)
gauge.set(1024)
# Get exposition format with many injected labels
labels_to_inject = {
prometheus_names.labels.NAMESPACE: "prod",
prometheus_names.labels.COMPONENT: "vllm-worker",
prometheus_names.labels.ENDPOINT: "generate",
prometheus_names.labels.MODEL: "llama-3-70b",
"instance_id": "worker-0",
"rank": "0",
"gpu_id": "0",
}
expfmt = get_prometheus_expfmt(registry, inject_custom_labels=labels_to_inject)
# Verify all labels are present
for label_name, label_value in labels_to_inject.items():
assert f'{label_name}="{label_value}"' in expfmt
def test_inject_labels_none_is_noop(self):
"""Test that inject_custom_labels=None doesn't modify output"""
# Create registry with a counter
registry = CollectorRegistry()
counter = Counter("requests", "Requests", registry=registry)
counter.inc(5)
# Get exposition format without label injection
expfmt_without = get_prometheus_expfmt(registry, inject_custom_labels=None)
# Get exposition format with empty dict (should raise error in collector)
# But inject_custom_labels=None should work fine
expfmt_with_none = get_prometheus_expfmt(registry, inject_custom_labels=None)
# Both should be identical (no label injection)
assert expfmt_without == expfmt_with_none
assert "requests" in expfmt_without
def test_inject_labels_align_with_rust_labels(self):
"""Test injecting labels that align with Rust auto-labels"""
# Create registry with vllm metrics
registry = CollectorRegistry()
counter = Counter("vllm:requests_total", "Total requests", registry=registry)
counter.inc(100)
# Inject labels that match Rust auto-labels
labels_to_inject = {
prometheus_names.labels.NAMESPACE: "prod-inference",
prometheus_names.labels.COMPONENT: "vllm-decode-worker",
prometheus_names.labels.ENDPOINT: "generate",
}
expfmt = get_prometheus_expfmt(
registry,
metric_prefix_filters=["vllm:"],
inject_custom_labels=labels_to_inject,
)
# Verify Rust-compatible labels are present
assert f'{prometheus_names.labels.NAMESPACE}="prod-inference"' in expfmt
assert f'{prometheus_names.labels.COMPONENT}="vllm-decode-worker"' in expfmt
assert f'{prometheus_names.labels.ENDPOINT}="generate"' in expfmt
# Verify metric is present
assert "vllm:requests" in expfmt
......@@ -12,7 +12,11 @@ from tests.utils.payloads import (
CompletionPayload,
CompletionPayloadWithLogprobs,
EmbeddingPayload,
LMCacheMetricsPayload,
MetricsPayload,
SGLangMetricsPayload,
TRTLLMMetricsPayload,
VLLMMetricsPayload,
)
# Common default text prompt used across tests
......@@ -178,15 +182,39 @@ def metric_payload_default(
backend: Optional[str] = None,
port: int = DefaultPort.SYSTEM1.value,
) -> MetricsPayload:
return MetricsPayload(
body={},
repeat_count=repeat_count,
expected_log=expected_log or [],
expected_response=[],
min_num_requests=min_num_requests,
backend=backend,
port=port,
)
"""Create a metrics payload for the specified backend.
Args:
min_num_requests: Minimum number of requests expected in metrics
repeat_count: Number of times to repeat the request
expected_log: Expected log messages
backend: Backend type ('vllm', 'sglang', 'trtllm', 'lmcache')
port: Port to use for metrics endpoint
Returns:
Backend-specific MetricsPayload subclass based on backend parameter
"""
common_args = {
"body": {},
"repeat_count": repeat_count,
"expected_log": expected_log or [],
"expected_response": [],
"min_num_requests": min_num_requests,
"port": port,
}
# Return backend-specific payload class
if backend == "vllm":
return VLLMMetricsPayload(**common_args)
elif backend == "sglang":
return SGLangMetricsPayload(**common_args)
elif backend == "trtllm":
return TRTLLMMetricsPayload(**common_args)
elif backend == "lmcache":
return LMCacheMetricsPayload(**common_args)
else:
# Default to base MetricsPayload for unknown backends
return MetricsPayload(**common_args)
def chat_payload(
......
......@@ -540,13 +540,16 @@ class MetricCheck:
@dataclass
class MetricsPayload(BasePayload):
"""Base class for Prometheus metrics validation payloads.
Validates common dynamo_component_* metrics shared across all backends.
Backend-specific subclasses handle engine-specific metrics.
"""
endpoint: str = "/metrics"
method: str = "GET"
port: int = DefaultPort.SYSTEM1.value
min_num_requests: int = 1
backend: Optional[
str
] = None # Backend identifier for metrics validation (e.g., 'vllm', 'sglang', 'trtllm')
def with_model(self, model):
# Metrics does not use model in request body
......@@ -556,16 +559,14 @@ class MetricsPayload(BasePayload):
response.raise_for_status()
return response.text
def validate(self, response: Any, content: str) -> None:
# Use backend from payload configuration
backend = self.backend
# Filter out _bucket metrics from content (histogram buckets inflate counts)
def _filter_bucket_metrics(self, content: str) -> str:
"""Filter out histogram bucket metrics to avoid count inflation"""
content_lines = content.split("\n")
filtered_lines = [line for line in content_lines if "_bucket{" not in line]
content = "\n".join(filtered_lines)
return "\n".join(filtered_lines)
# Build full metric names with prefix
def _get_common_metric_checks(self) -> list[MetricCheck]:
"""Get common dynamo_component_* metric checks shared across all backends"""
prefix = prometheus_names.name_prefix.COMPONENT
# Define metrics to check
......@@ -577,7 +578,7 @@ class MetricsPayload(BasePayload):
def metric_pattern(name):
return rf"{name}(?:\{{[^}}]*\}})?\s+([\d.eE+-]+)"
metrics_to_check = [
return [
MetricCheck(
# Check: Minimum count of unique dynamo_component_* metrics
name=f"{prefix}_*",
......@@ -625,61 +626,14 @@ class MetricsPayload(BasePayload):
),
]
# Add backend-specific metric checks
if backend == "vllm":
metrics_to_check.append(
MetricCheck(
# Check: Minimum count of unique vllm:* metrics
name="vllm:*",
pattern=lambda name: r"^vllm:\w+",
validator=lambda value: len(set(value))
>= 52, # 80% of typical ~65 vllm metrics (excluding _bucket) as of 2025-10-22 (but will grow)
error_msg=lambda name, value: f"Expected at least 52 unique vllm:* metrics, but found only {len(set(value))}",
success_msg=lambda name, value: f"SUCCESS: Found {len(set(value))} unique vllm:* metrics (minimum required: 52)",
multiline=True,
)
)
elif backend == "lmcache":
metrics_to_check.append(
MetricCheck(
# Check: Minimum count of unique lmcache:* metrics
name="lmcache:*",
pattern=lambda name: r"^lmcache:\w+",
validator=lambda value: len(set(value))
>= 1, # At least 1 lmcache metric
error_msg=lambda name, value: f"Expected at least 1 lmcache:* metric, but found only {len(set(value))}",
success_msg=lambda name, value: f"SUCCESS: Found {len(set(value))} lmcache:* metrics",
multiline=True,
)
)
elif backend == "sglang":
metrics_to_check.append(
MetricCheck(
# Check: Minimum count of unique sglang:* metrics
name="sglang:*",
pattern=lambda name: r"^sglang:\w+",
validator=lambda value: len(set(value))
>= 20, # 80% of typical ~25 sglang metrics (excluding _bucket) as of 2025-10-22 (but will grow)
error_msg=lambda name, value: f"Expected at least 20 unique sglang:* metrics, but found only {len(set(value))}",
success_msg=lambda name, value: f"SUCCESS: Found {len(set(value))} unique sglang:* metrics (minimum required: 20)",
multiline=True,
)
)
elif backend == "trtllm":
metrics_to_check.append(
MetricCheck(
# Check: Minimum count of unique trtllm_* metrics
name="trtllm_*",
pattern=lambda name: r"^trtllm_\w+",
validator=lambda value: len(set(value))
>= 4, # 80% of typical ~5 trtllm metrics (excluding _bucket) as of 2025-10-22 (but will grow)
error_msg=lambda name, value: f"Expected at least 4 unique trtllm_* metrics, but found only {len(set(value))}",
success_msg=lambda name, value: f"SUCCESS: Found {len(set(value))} unique trtllm_* metrics (minimum required: 4)",
multiline=True,
)
)
def _get_backend_specific_checks(self) -> list[MetricCheck]:
"""Get backend-specific metric checks. Override in subclasses."""
return []
# Check all metrics
def _validate_metric_checks(
self, metrics_to_check: list[MetricCheck], content: str
) -> None:
"""Run all metric checks and raise AssertionError if any fail"""
for metric in metrics_to_check:
# Special handling for multiline patterns (like counting unique metrics)
if metric.multiline:
......@@ -727,6 +681,163 @@ class MetricsPayload(BasePayload):
)
)
def validate(self, response: Any, content: str) -> None:
"""Validate Prometheus metrics output"""
content = self._filter_bucket_metrics(content)
# Collect all checks: common + backend-specific
metrics_to_check = self._get_common_metric_checks()
metrics_to_check.extend(self._get_backend_specific_checks())
# Run all validations
self._validate_metric_checks(metrics_to_check, content)
@dataclass
class VLLMMetricsPayload(MetricsPayload):
"""Metrics validation for vLLM backend with auto-label checks"""
def _get_backend_specific_checks(self) -> list[MetricCheck]:
"""vLLM-specific metric checks"""
checks = [
MetricCheck(
# Check: Minimum count of unique vllm:* metrics
name="vllm:*",
pattern=lambda name: r"^vllm:\w+",
validator=lambda value: len(set(value))
>= 56, # 80% of typical ~70 vllm metrics (excluding _bucket) as of 2026-02-05 (but will grow)
error_msg=lambda name, value: f"Expected at least 56 unique vllm:* metrics, but found only {len(set(value))}",
success_msg=lambda name, value: f"SUCCESS: Found {len(set(value))} unique vllm:* metrics (minimum required: 56)",
multiline=True,
)
]
# Check required labels: auto-injected (from prometheus_names.labels) + injected by backend
required_labels = [
prometheus_names.labels.NAMESPACE,
prometheus_names.labels.COMPONENT,
prometheus_names.labels.ENDPOINT,
prometheus_names.labels.MODEL, # OpenAI standard (injected by all backends)
prometheus_names.labels.MODEL_NAME, # Alternative label (injected for compatibility)
]
for label_name in required_labels:
checks.append(
MetricCheck(
name=f"vllm:* with {label_name}",
pattern=lambda name, lbl=label_name: rf'vllm:\w+\{{[^}}]*{lbl}="[^"]+"',
validator=lambda value: len(value) > 0,
error_msg=lambda name, value, lbl=label_name: f"vLLM metrics missing label: {lbl}",
success_msg=lambda name, value, lbl=label_name: f"SUCCESS: vLLM metrics include {lbl} label (found {len(value)} metrics)",
multiline=True,
)
)
return checks
@dataclass
class LMCacheMetricsPayload(MetricsPayload):
"""Metrics validation for lmcache"""
def _get_backend_specific_checks(self) -> list[MetricCheck]:
"""lmcache-specific metric checks"""
return [
MetricCheck(
# Check: Minimum count of unique lmcache:* metrics
name="lmcache:*",
pattern=lambda name: r"^lmcache:\w+",
validator=lambda value: len(set(value))
>= 26, # 80% of typical ~33 lmcache metrics (excluding _bucket) as of 2026-02-05 (but will grow)
error_msg=lambda name, value: f"Expected at least 26 unique lmcache:* metrics, but found only {len(set(value))}",
success_msg=lambda name, value: f"SUCCESS: Found {len(set(value))} lmcache:* metrics (minimum required: 26)",
multiline=True,
)
]
@dataclass
class SGLangMetricsPayload(MetricsPayload):
"""Metrics validation for SGLang backend with auto-label checks"""
def _get_backend_specific_checks(self) -> list[MetricCheck]:
"""SGLang-specific metric checks"""
checks = [
MetricCheck(
# Check: Minimum count of unique sglang:* metrics
name="sglang:*",
pattern=lambda name: r"^sglang:\w+",
validator=lambda value: len(set(value))
>= 20, # 80% of typical ~25 sglang metrics (excluding _bucket) as of 2025-10-22 (but will grow)
error_msg=lambda name, value: f"Expected at least 20 unique sglang:* metrics, but found only {len(set(value))}",
success_msg=lambda name, value: f"SUCCESS: Found {len(set(value))} unique sglang:* metrics (minimum required: 20)",
multiline=True,
)
]
# Check required labels: auto-injected (from prometheus_names.labels) + injected by backend
required_labels = [
prometheus_names.labels.NAMESPACE,
prometheus_names.labels.COMPONENT,
prometheus_names.labels.ENDPOINT,
prometheus_names.labels.MODEL, # OpenAI standard (injected by all backends)
prometheus_names.labels.MODEL_NAME, # Alternative label (injected for compatibility)
]
for label_name in required_labels:
checks.append(
MetricCheck(
name=f"sglang:* with {label_name}",
pattern=lambda name, lbl=label_name: rf'sglang:\w+\{{[^}}]*{lbl}="[^"]+"',
validator=lambda value: len(value) > 0,
error_msg=lambda name, value, lbl=label_name: f"sglang metrics missing label: {lbl}",
success_msg=lambda name, value, lbl=label_name: f"SUCCESS: sglang metrics include {lbl} label (found {len(value)} metrics)",
multiline=True,
)
)
return checks
@dataclass
class TRTLLMMetricsPayload(MetricsPayload):
"""Metrics validation for TensorRT-LLM backend"""
def _get_backend_specific_checks(self) -> list[MetricCheck]:
"""TRT-LLM-specific metric checks"""
checks = [
MetricCheck(
# Check: Minimum count of unique trtllm_* metrics
name="trtllm_*",
pattern=lambda name: r"^trtllm_\w+",
validator=lambda value: len(set(value))
>= 4, # 80% of typical ~5 trtllm metrics (excluding _bucket) as of 2025-10-22 (but will grow)
error_msg=lambda name, value: f"Expected at least 4 unique trtllm_* metrics, but found only {len(set(value))}",
success_msg=lambda name, value: f"SUCCESS: Found {len(set(value))} unique trtllm_* metrics (minimum required: 4)",
multiline=True,
)
]
# Check required labels: auto-injected (from prometheus_names.labels) + injected by backend
required_labels = [
prometheus_names.labels.NAMESPACE,
prometheus_names.labels.COMPONENT,
prometheus_names.labels.ENDPOINT,
prometheus_names.labels.MODEL, # OpenAI standard (injected by all backends)
prometheus_names.labels.MODEL_NAME, # Alternative label (injected for compatibility)
]
for label_name in required_labels:
checks.append(
MetricCheck(
name=f"trtllm_* with {label_name}",
pattern=lambda name, lbl=label_name: rf'trtllm_\w+\{{[^}}]*{lbl}="[^"]+"',
validator=lambda value: len(value) > 0,
error_msg=lambda name, value, lbl=label_name: f"TRT-LLM metrics missing label: {lbl}",
success_msg=lambda name, value, lbl=label_name: f"SUCCESS: TRT-LLM metrics include {lbl} label (found {len(value)} metrics)",
multiline=True,
)
)
return checks
def check_models_api(response):
"""Check if models API is working and returns models"""
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment