Unverified Commit 027d2653 authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

feat: expose Python Prometheus metric via DynamoComponentMetrics (#5817)


Signed-off-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent 56a1b6e3
......@@ -17,8 +17,9 @@ from functools import lru_cache
from typing import TYPE_CHECKING, Optional, Pattern
from dynamo._core import Endpoint
from dynamo.prometheus_names import kvstats, labels, model_info, name_prefix
# Import CollectorRegistry only for type hints to avoid importing prometheus_client at module load time.
# Import CollectorRegistry and Gauge only for type hints to avoid importing prometheus_client at module load time.
# prometheus_client must be imported AFTER set_prometheus_multiproc_dir() is called.
# See main.py worker() function for detailed explanation.
if TYPE_CHECKING:
......@@ -66,12 +67,13 @@ def register_engine_metrics_callback(
def get_expfmt() -> str:
"""Callback to return engine Prometheus metrics in exposition format"""
return get_prometheus_expfmt(
result = get_prometheus_expfmt(
registry,
metric_prefix_filters=metric_prefix_filters,
exclude_prefixes=exclude_prefixes,
add_prefix=add_prefix,
)
return result
endpoint.metrics.register_prometheus_expfmt_callback(get_expfmt)
......@@ -237,3 +239,65 @@ def get_prometheus_expfmt(
except Exception as e:
logging.error(f"Error getting metrics: {e}")
return ""
class LLMBackendMetrics:
"""Prometheus metrics for LLM backends with `dynamo_component_` prefix.
Usage:
metrics = LLMBackendMetrics(registry, model_name="Qwen/Qwen3-0.6B", component_name="backend")
metrics.set_total_blocks("0", 1000)
metrics.set_gpu_cache_usage("0", 0.75)
metrics.set_model_load_time(5.2)
"""
def __init__(self, registry=None, model_name: str = "", component_name: str = ""):
"""Create all Dynamo component gauges."""
from prometheus_client import Gauge
self.total_blocks = Gauge(
f"{name_prefix.COMPONENT}_{kvstats.TOTAL_BLOCKS}",
"Total number of KV cache blocks available on the worker.",
labelnames=[labels.MODEL, labels.COMPONENT, labels.DP_RANK],
registry=registry,
multiprocess_mode="max",
)
self.gpu_cache_usage_percent = Gauge(
f"{name_prefix.COMPONENT}_{kvstats.GPU_CACHE_USAGE_PERCENT}",
"GPU cache usage as a percentage (0.0-1.0).",
labelnames=[labels.MODEL, labels.COMPONENT, labels.DP_RANK],
registry=registry,
multiprocess_mode="max",
)
self.model_load_time = Gauge(
f"{name_prefix.COMPONENT}_{model_info.LOAD_TIME_SECONDS}",
"Model load time in seconds.",
labelnames=[labels.MODEL, labels.COMPONENT],
registry=registry,
multiprocess_mode="max",
)
self.model_name = model_name
self.component_name = component_name
def set_total_blocks(self, dp_rank: str, value: int) -> None:
self.total_blocks.labels(
**{
labels.MODEL: self.model_name,
labels.COMPONENT: self.component_name,
labels.DP_RANK: dp_rank,
}
).set(value)
def set_gpu_cache_usage(self, dp_rank: str, value: float) -> None:
self.gpu_cache_usage_percent.labels(
**{
labels.MODEL: self.model_name,
labels.COMPONENT: self.component_name,
labels.DP_RANK: dp_rank,
}
).set(value)
def set_model_load_time(self, value: float) -> None:
self.model_load_time.labels(
**{labels.MODEL: self.model_name, labels.COMPONENT: self.component_name}
).set(value)
......@@ -5,6 +5,7 @@ import asyncio
import logging
import os
import sys
import time
import sglang as sgl
import uvloop
......@@ -22,11 +23,7 @@ from dynamo.sglang.health_check import (
SglangHealthCheckPayload,
SglangPrefillHealthCheckPayload,
)
from dynamo.sglang.publisher import (
DynamoSglangPublisher,
setup_prometheus_registry,
setup_sgl_metrics,
)
from dynamo.sglang.publisher import DynamoSglangPublisher, setup_sgl_metrics
from dynamo.sglang.register import (
register_image_diffusion_model,
register_llm_with_readiness_gate,
......@@ -129,7 +126,10 @@ async def init(runtime: DistributedRuntime, config: Config):
if server_args.node_rank >= 1:
os.environ["SGLANG_BLOCK_NONZERO_RANK_CHILDREN"] = "0"
# Time model loading
start_time = time.time()
engine = sgl.Engine(server_args=server_args)
load_time = time.time() - start_time
component = runtime.namespace(dynamo_args.namespace).component(
dynamo_args.component
......@@ -143,9 +143,9 @@ async def init(runtime: DistributedRuntime, config: Config):
engine, config, component, generate_endpoint
)
# Register Prometheus metrics callback if enabled
if engine.server_args.enable_metrics:
setup_prometheus_registry(engine, generate_endpoint)
# Record model load time immediately after publisher setup (which creates the gauges)
publisher.component_gauges.set_model_load_time(load_time)
logging.debug(f"SGLang model load time: {load_time:.2f}s")
# Handle non-leader nodes (multi-node parallelism)
# Non-leader nodes run schedulers and publish KV events, but don't serve requests
......@@ -230,10 +230,6 @@ async def init_prefill(runtime: DistributedRuntime, config: Config):
engine, config, component, generate_endpoint
)
# Register Prometheus metrics callback if enabled
if engine.server_args.enable_metrics:
setup_prometheus_registry(engine, generate_endpoint)
# Handle non-leader nodes (multi-node parallelism)
# Non-leader nodes run schedulers and publish KV events, but don't serve requests
if server_args.node_rank >= 1:
......@@ -317,10 +313,6 @@ async def init_diffusion(runtime: DistributedRuntime, config: Config):
engine, config, component, generate_endpoint
)
# Register Prometheus metrics callback if enabled
if engine.server_args.enable_metrics:
setup_prometheus_registry(engine, generate_endpoint)
# Handle non-leader nodes (multi-node parallelism)
# Non-leader nodes run schedulers and publish KV events, but don't serve requests
if server_args.node_rank >= 1:
......@@ -391,10 +383,6 @@ async def init_embedding(runtime: DistributedRuntime, config: Config):
engine, config, component, generate_endpoint
)
# Register Prometheus metrics callback if enabled
if engine.server_args.enable_metrics:
setup_prometheus_registry(engine, generate_endpoint)
# Readiness gate: requests wait until model is registered
ready_event = asyncio.Event()
......
......@@ -9,13 +9,17 @@ from typing import TYPE_CHECKING, List, Optional, Tuple
import sglang as sgl
import zmq
import zmq.asyncio
from prometheus_client import CollectorRegistry
from sglang.srt.disaggregation.kv_events import ZmqEventPublisher
from sglang.srt.utils import get_local_ip_auto, get_zmq_socket, maybe_wrap_ipv6_address
if TYPE_CHECKING:
from prometheus_client import CollectorRegistry
from sglang.srt.managers.scheduler_metrics_mixin import KvMetrics
from dynamo.common.utils.prometheus import register_engine_metrics_callback
from dynamo.common.utils.prometheus import (
LLMBackendMetrics,
register_engine_metrics_callback,
)
from dynamo.llm import (
KvEventPublisher,
WorkerMetricsPublisher,
......@@ -24,6 +28,10 @@ from dynamo.llm import (
from dynamo.runtime import Component, Endpoint
from dynamo.sglang.args import Config
# Create a dedicated registry for dynamo_component metrics
# This ensures these metrics are isolated and can be exposed via their own callback
DYNAMO_COMPONENT_REGISTRY = CollectorRegistry()
def format_zmq_endpoint(endpoint_template: str, ip_address: str) -> str:
"""Format ZMQ endpoint by replacing wildcard with IP address.
......@@ -65,6 +73,7 @@ class DynamoSglangPublisher:
config: Config,
component: Component,
generate_endpoint: Endpoint,
component_gauges: LLMBackendMetrics,
metrics_labels: Optional[List[Tuple[str, str]]] = None,
) -> None:
"""Initialize the SGLang publisher for metrics and KV events.
......@@ -75,6 +84,7 @@ class DynamoSglangPublisher:
component: The Dynamo runtime component.
generate_endpoint: The Dynamo endpoint for generation requests.
metrics_labels: Optional list of label key-value pairs for metrics.
component_gauges: LLM backend metrics instance (created via LLMBackendMetrics()).
"""
self.engine = engine
self.server_args = config.server_args
......@@ -82,6 +92,7 @@ class DynamoSglangPublisher:
self.generate_endpoint = generate_endpoint
self.component = component
self.metrics_publisher = WorkerMetricsPublisher()
self.component_gauges = component_gauges
# Endpoint creation is deferred to async context in setup_sgl_metrics
# Set default values (can be overridden later if needed)
......@@ -125,13 +136,26 @@ class DynamoSglangPublisher:
while self._running:
try:
kv_metrics = await self._sock.recv_pyobj() # type: ignore
# Receive KvMetrics object from SGLang scheduler via ZMQ
# KvMetrics class: sglang/srt/managers/scheduler_metrics_mixin.py lines 45-54
# Sent from: sglang/srt/managers/scheduler_metrics_mixin.py lines 482-499 (_emit_kv_metrics)
kv_metrics: KvMetrics = await self._sock.recv_pyobj()
dp_rank = (
kv_metrics.data_parallel_rank
if kv_metrics.data_parallel_rank is not None
else self.dp_rank
)
self.metrics_publisher.publish(dp_rank, kv_metrics.kv_active_blocks)
active_decode_blocks = kv_metrics.kv_active_blocks
self.metrics_publisher.publish(dp_rank, active_decode_blocks)
dp_rank_str = str(dp_rank)
# Publish total blocks (always available in KvMetrics)
self.component_gauges.set_total_blocks(
dp_rank_str, kv_metrics.kv_total_blocks
)
# Publish GPU cache usage percentage (always available in KvMetrics)
self.component_gauges.set_gpu_cache_usage(
dp_rank_str, kv_metrics.gpu_cache_usage_perc
)
except Exception:
if self._running:
logging.exception(
......@@ -168,6 +192,9 @@ class DynamoSglangPublisher:
"""Publish initial dummy metrics to bootstrap the metrics endpoint."""
logging.info("Sending dummy metrics to initialize")
self.metrics_publisher.publish(self.dp_rank, 0)
dp_rank_str = str(self.dp_rank)
self.component_gauges.set_total_blocks(dp_rank_str, 0)
self.component_gauges.set_gpu_cache_usage(dp_rank_str, 0.0)
def init_kv_event_publish(self) -> List[KvEventPublisher]:
"""Initialize KV event publisher(s) if configured.
......@@ -190,6 +217,10 @@ class DynamoSglangPublisher:
if self.server_args.kv_events_config:
kv_events = json.loads(self.server_args.kv_events_config)
base_ep = kv_events.get("endpoint")
if not base_ep:
raise ValueError(
"sglang kv_events_config is set but missing 'endpoint'"
)
local_ip = get_local_ip_auto()
# Determine DP attention configuration
......@@ -253,7 +284,7 @@ class DynamoSglangPublisher:
def setup_prometheus_registry(
engine: sgl.Engine, generate_endpoint: Endpoint
) -> "CollectorRegistry":
) -> CollectorRegistry:
"""Set up Prometheus registry for SGLang metrics collection.
SGLang uses multiprocess architecture where metrics are stored in shared memory.
......@@ -261,6 +292,10 @@ def setup_prometheus_registry(
registry collects sglang:* metrics which are exposed via the metrics server endpoint
(set DYN_SYSTEM_PORT to a positive value to enable, e.g., DYN_SYSTEM_PORT=8081).
IMPORTANT: This function requires PROMETHEUS_MULTIPROC_DIR to be set, which only
happens when SGLang is started with --enable-metrics. Callers must guard this call
with an enable_metrics check.
IMPORTANT: prometheus_client must be imported AFTER sgl.Engine() has called
set_prometheus_multiproc_dir(). Importing at module level causes prometheus_client
to initialize in single-process mode before PROMETHEUS_MULTIPROC_DIR is set,
......@@ -277,11 +312,14 @@ def setup_prometheus_registry(
registry = CollectorRegistry()
multiprocess.MultiProcessCollector(registry)
# Register callback for SGLang metrics (sglang:* prefixed)
register_engine_metrics_callback(
endpoint=generate_endpoint,
registry=registry,
metric_prefix_filters=["sglang:"],
)
return registry
......@@ -302,9 +340,35 @@ async def setup_sgl_metrics(
Returns:
Tuple of (publisher instance, running asyncio task, metrics labels).
"""
# Register SGLang multiprocess metrics only when --enable-metrics was passed.
# SGLang only calls set_prometheus_multiproc_dir() when enable_metrics=True,
# so MultiProcessCollector will crash without it.
if engine.server_args.enable_metrics:
setup_prometheus_registry(engine, generate_endpoint)
# Always register the Dynamo component metrics callback (total_blocks,
# gpu_cache_usage, model_load_time). These use a dedicated registry that
# doesn't need MultiProcessCollector or PROMETHEUS_MULTIPROC_DIR.
register_engine_metrics_callback(
endpoint=generate_endpoint,
registry=DYNAMO_COMPONENT_REGISTRY,
)
# Create all Dynamo component gauges using the dedicated registry
component_gauges = LLMBackendMetrics(
registry=DYNAMO_COMPONENT_REGISTRY,
model_name=engine.server_args.served_model_name,
component_name=config.dynamo_args.component,
)
metrics_labels = [("model", engine.server_args.served_model_name)]
publisher = DynamoSglangPublisher(
engine, config, component, generate_endpoint, metrics_labels
engine,
config,
component,
generate_endpoint,
component_gauges=component_gauges,
metrics_labels=metrics_labels,
)
# Create endpoint in async context (must await before publishing)
await publisher.metrics_publisher.create_endpoint(component)
......
......@@ -3,6 +3,7 @@
import enum
import logging
import time
from contextlib import asynccontextmanager
from typing import AsyncGenerator, Optional
......@@ -139,10 +140,28 @@ class TensorRTLLMEngine:
async def get_llm_engine(
engine_args,
disaggregation_mode: Optional[DisaggregationMode] = None,
component_gauges=None,
) -> AsyncGenerator[TensorRTLLMEngine, None]:
"""Get TensorRT-LLM engine instance with load time tracking.
Args:
engine_args: Engine configuration arguments.
disaggregation_mode: Optional disaggregation mode configuration.
component_gauges: Optional LLMBackendGauges instance for recording load time.
"""
# Time engine initialization
start_time = time.time()
engine = TensorRTLLMEngine(engine_args, disaggregation_mode)
try:
await engine.initialize()
load_time = time.time() - start_time
logger.debug(f"TensorRT-LLM engine initialized in {load_time:.2f}s")
# Record model load time immediately after measurement
if component_gauges:
component_gauges.set_model_load_time(load_time)
yield engine
except Exception as e:
logging.error(f"Error in engine context: {e}")
......
......@@ -37,7 +37,10 @@ from transformers import AutoConfig
import dynamo.nixl_connect as nixl_connect
from dynamo.common.config_dump import dump_config
from dynamo.common.utils.endpoint_types import parse_endpoint_types
from dynamo.common.utils.prometheus import register_engine_metrics_callback
from dynamo.common.utils.prometheus import (
LLMBackendMetrics,
register_engine_metrics_callback,
)
from dynamo.common.utils.runtime import create_runtime, parse_endpoint
from dynamo.llm import (
KvEventPublisher,
......@@ -52,7 +55,7 @@ from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.trtllm.engine import Backend, TensorRTLLMEngine, get_llm_engine
from dynamo.trtllm.health_check import TrtllmHealthCheckPayload
from dynamo.trtllm.multimodal_processor import MultimodalRequestProcessor
from dynamo.trtllm.publisher import get_publisher
from dynamo.trtllm.publisher import DYNAMO_COMPONENT_REGISTRY, get_publisher
from dynamo.trtllm.request_handlers.handler_base import DisaggregationMode
from dynamo.trtllm.request_handlers.handlers import (
RequestHandlerConfig,
......@@ -337,7 +340,22 @@ async def init(
config.dump_config_to, {"engine_args": engine_args, "dynamo_args": config}
)
async with get_llm_engine(engine_args, config.disaggregation_mode) as engine:
# Prepare model name for metrics
model_name_for_metrics = config.served_model_name or config.model_path
# Construct Prometheus gauges directly; passed through to the engine and publisher
# via explicit parameters (no module-level global).
component_gauges = LLMBackendMetrics(
registry=DYNAMO_COMPONENT_REGISTRY,
model_name=model_name_for_metrics,
component_name=config.component,
)
async with get_llm_engine(
engine_args,
config.disaggregation_mode,
component_gauges=component_gauges,
) as engine:
endpoint = component.endpoint(config.endpoint)
# should ideally call get_engine_runtime_config
......@@ -394,12 +412,12 @@ async def init(
logging.info("TensorRT-LLM MetricsCollector initialized")
# Register callback to expose TRT-LLM metrics via Dynamo endpoint
# Filter out python_/process_ metrics and add trtllm_ prefix to remaining metrics
# Note: latest TRT-LLM's MetricsCollector already adds the 'trtllm_' prefix to all metrics,
# so we filter by that prefix to include only TRT-LLM metrics.
register_engine_metrics_callback(
endpoint=endpoint,
registry=REGISTRY,
exclude_prefixes=["python_", "process_"],
add_prefix="trtllm_",
metric_prefix_filters=["trtllm_"],
)
logging.info("TensorRT-LLM Prometheus metrics registered")
except Exception as e:
......@@ -407,6 +425,13 @@ async def init(
f"Failed to initialize TensorRT-LLM Prometheus metrics: {e}"
)
# Register callback for Dynamo component metrics using dedicated registry
register_engine_metrics_callback(
endpoint=endpoint,
registry=DYNAMO_COMPONENT_REGISTRY,
)
logging.debug("DYNAMO_COMPONENT_REGISTRY callback registered successfully")
# publisher will be set later if publishing is enabled.
handler_config = RequestHandlerConfig(
component=component,
......@@ -478,6 +503,7 @@ async def init(
int(endpoint.connection_id()),
config.kv_block_size,
metrics_labels,
component_gauges=component_gauges,
zmq_endpoint=trtllm_zmq_bind_endpoint,
enable_local_indexer=config.enable_local_indexer,
) as publisher:
......
......@@ -32,11 +32,18 @@ from typing import Awaitable, Callable, Dict, Optional, Union
import msgpack
import zmq
from prometheus_client import CollectorRegistry
from dynamo.common.utils.prometheus import LLMBackendMetrics
from dynamo.llm import KvEventPublisher, WorkerMetricsPublisher
logging.basicConfig(level=logging.DEBUG)
# Create a dedicated registry for dynamo_component metrics
# This ensures these metrics are isolated and can be exposed via their own callback
DYNAMO_COMPONENT_REGISTRY = CollectorRegistry()
# Use non-blocking RPC calls; control overhead with backoff sleeps.
_STATS_TIMEOUT_SEC = 0.01
_KV_EVENTS_TIMEOUT_SEC = 0.0
......@@ -290,6 +297,7 @@ class Publisher:
worker_id,
kv_block_size,
metrics_labels,
component_gauges: LLMBackendMetrics,
zmq_endpoint: Optional[str] = None,
enable_local_indexer: bool = False,
):
......@@ -300,6 +308,7 @@ class Publisher:
self.kv_block_size = kv_block_size
self.max_window_size = None
self.metrics_labels = metrics_labels
self.component_gauges = component_gauges
self.enable_local_indexer = enable_local_indexer
self.attention_dp_size = engine.get_attention_dp_size()
......@@ -391,7 +400,10 @@ class Publisher:
return
# Publish initial metrics with 0 active blocks
# TRT-LLM doesn't use data parallelism currently (dp_rank="0")
self.metrics_publisher.publish(None, 0)
self.component_gauges.set_total_blocks("0", 0)
self.component_gauges.set_gpu_cache_usage("0", 0.0)
# Prepare threads for publishing stats but don't start them yet.
# TRTLLM needs to start generating tokens first before stats
......@@ -453,10 +465,20 @@ class Publisher:
def handle_stat(stat):
kv_active_blocks = stat["kvCacheStats"]["usedNumBlocks"]
kv_total_blocks = stat["kvCacheStats"]["maxNumBlocks"]
logging.debug(f"Publishing stats: kv_active_blocks: {kv_active_blocks}")
# TRT-LLM doesn't use data parallelism currently (dp_rank=None)
# TRT-LLM doesn't use data parallelism currently (dp_rank=None for NATS, "0" for Prometheus)
self.metrics_publisher.publish(None, kv_active_blocks)
# Publish Prometheus metrics
self.component_gauges.set_total_blocks("0", kv_total_blocks)
# Calculate and publish GPU cache usage percentage
gpu_cache_usage = (
kv_active_blocks / kv_total_blocks if kv_total_blocks > 0 else 0.0
)
self.component_gauges.set_gpu_cache_usage("0", gpu_cache_usage)
await self._polling_loop(
lambda: self.engine.llm.get_stats_async(timeout=_STATS_TIMEOUT_SEC),
handle_stat,
......@@ -716,6 +738,7 @@ async def get_publisher(
worker_id,
kv_block_size,
metrics_labels,
component_gauges: LLMBackendMetrics,
zmq_endpoint: Optional[str] = None,
enable_local_indexer: bool = False,
):
......@@ -726,6 +749,7 @@ async def get_publisher(
worker_id,
kv_block_size,
metrics_labels,
component_gauges=component_gauges,
zmq_endpoint=zmq_endpoint,
enable_local_indexer=enable_local_indexer,
)
......
......@@ -5,6 +5,7 @@ import asyncio
import logging
import os
import tempfile
import time
from typing import Optional
import uvloop
......@@ -16,7 +17,10 @@ from vllm.v1.metrics.prometheus import setup_multiprocess_prometheus
from dynamo.common.config_dump import dump_config
from dynamo.common.utils.endpoint_types import parse_endpoint_types
from dynamo.common.utils.prometheus import register_engine_metrics_callback
from dynamo.common.utils.prometheus import (
LLMBackendMetrics,
register_engine_metrics_callback,
)
from dynamo.common.utils.runtime import create_runtime
from dynamo.llm import (
KvEventPublisher,
......@@ -57,7 +61,7 @@ from .health_check import (
VllmOmniHealthCheckPayload,
VllmPrefillHealthCheckPayload,
)
from .publisher import StatLoggerFactory
from .publisher import DYNAMO_COMPONENT_REGISTRY, StatLoggerFactory
configure_dynamo_logging()
logger = logging.getLogger(__name__)
......@@ -267,6 +271,17 @@ def setup_metrics_collection(config: Config, generate_endpoint, logger):
registries to ensure all metrics (vllm, lmcache, dynamo_component) are collected.
"""
if config.engine_args.disable_log_stats is False:
# Register the dedicated dynamo_component registry callback
# IMPORTANT: We do NOT use MultiProcessCollector for DYNAMO_COMPONENT_REGISTRY
# because our gauges use in-memory values which work fine for single-process
# and multi-process (each process has its own gauge with dp_rank label).
# Using MultiProcessCollector would read from .db files which causes stale
# values to accumulate across test runs.
register_engine_metrics_callback(
endpoint=generate_endpoint,
registry=DYNAMO_COMPONENT_REGISTRY,
)
if os.environ.get("PROMETHEUS_MULTIPROC_DIR"):
try:
# MultiProcessCollector reads metrics from .db files in PROMETHEUS_MULTIPROC_DIR
......@@ -276,7 +291,10 @@ def setup_metrics_collection(config: Config, generate_endpoint, logger):
register_engine_metrics_callback(
endpoint=generate_endpoint,
registry=REGISTRY,
metric_prefix_filters=["vllm:", "lmcache:"],
metric_prefix_filters=[
"vllm:",
"lmcache:",
],
)
except ValueError as e:
# Conflict: metrics already in REGISTRY, MultiProcessCollector tries to add same metrics from .db files
......@@ -288,17 +306,20 @@ def setup_metrics_collection(config: Config, generate_endpoint, logger):
multiprocess.MultiProcessCollector(multiproc_registry)
# Register both registries to collect all metrics
# Global REGISTRY has in-memory metrics (vllm, dynamo_component)
# Global REGISTRY has in-memory metrics (vllm)
register_engine_metrics_callback(
endpoint=generate_endpoint,
registry=REGISTRY,
metric_prefix_filters=["vllm:", "dynamo_component:"],
metric_prefix_filters=["vllm:"],
)
# Multiproc registry has .db file metrics (lmcache, possibly vllm duplicates)
register_engine_metrics_callback(
endpoint=generate_endpoint,
registry=multiproc_registry,
metric_prefix_filters=["vllm:", "lmcache:"],
metric_prefix_filters=[
"vllm:",
"lmcache:",
],
)
else:
# No multiprocess mode
......@@ -404,6 +425,19 @@ def setup_vllm_engine(config, stat_logger=None):
f"Prometheus multiproc dir set to: {os.environ.get('PROMETHEUS_MULTIPROC_DIR')}"
)
# Construct Prometheus gauges AFTER setup_multiprocess_prometheus() so Gauge objects
# see the correct ValueClass (multiprocess vs in-memory).
component_gauges = LLMBackendMetrics(
registry=DYNAMO_COMPONENT_REGISTRY,
model_name=config.served_model_name or "",
component_name=config.component or "",
)
# If a StatLoggerFactory was provided, give it the gauges so the loggers
# it creates can publish Prometheus metrics.
if stat_logger is not None:
stat_logger.component_gauges = component_gauges
os.environ["VLLM_NO_USAGE_STATS"] = "1" # Avoid internal HTTP requests
os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn"
......@@ -448,6 +482,8 @@ def setup_vllm_engine(config, stat_logger=None):
if stat_logger:
factory.append(stat_logger)
# Time engine initialization
start_time = time.time()
engine_client = AsyncLLM.from_vllm_config(
vllm_config=vllm_config,
usage_context=usage_context,
......@@ -455,10 +491,20 @@ def setup_vllm_engine(config, stat_logger=None):
enable_log_requests=engine_args.enable_log_requests,
disable_log_stats=engine_args.disable_log_stats,
)
load_time = time.time() - start_time
# Record model load time
component_gauges.set_model_load_time(load_time)
logger.info(f"VllmWorker for {config.served_model_name} has been initialized")
return engine_client, vllm_config, default_sampling_params, prometheus_temp_dir
return (
engine_client,
vllm_config,
default_sampling_params,
prometheus_temp_dir,
component_gauges,
)
async def register_vllm_model(
......@@ -557,6 +603,7 @@ async def init_prefill(
vllm_config,
default_sampling_params,
prometheus_temp_dir,
_component_gauges,
) = pre_created_engine
else:
(
......@@ -564,6 +611,7 @@ async def init_prefill(
vllm_config,
default_sampling_params,
prometheus_temp_dir,
_component_gauges,
) = setup_vllm_engine(config)
handler = PrefillWorkerHandler(
......@@ -681,11 +729,7 @@ async def init(
unload_lora_endpoint = component.endpoint("unload_lora")
list_loras_endpoint = component.endpoint("list_loras")
factory = StatLoggerFactory(
component,
config.engine_args.data_parallel_rank or 0,
metrics_labels=[("model", config.served_model_name or config.model)],
)
model_name = config.served_model_name or config.model
# Use pre-created engine if provided (checkpoint mode), otherwise create new
if pre_created_engine is not None:
......@@ -694,13 +738,30 @@ async def init(
vllm_config,
default_sampling_params,
prometheus_temp_dir,
component_gauges,
) = pre_created_engine
# Factory is created after unpack so component_gauges is available
factory = StatLoggerFactory(
component,
component_gauges=component_gauges,
dp_rank=config.engine_args.data_parallel_rank or 0,
metrics_labels=[("model", model_name)],
)
else:
# Factory is created without component_gauges; setup_vllm_engine() will
# create the gauges after setup_multiprocess_prometheus() and set them
# on the factory before vLLM calls create_stat_logger().
factory = StatLoggerFactory(
component,
dp_rank=config.engine_args.data_parallel_rank or 0,
metrics_labels=[("model", model_name)],
)
(
engine_client,
vllm_config,
default_sampling_params,
prometheus_temp_dir,
component_gauges,
) = setup_vllm_engine(config, factory)
# TODO Hack to get data, move this to registering in TBD
......@@ -983,6 +1044,7 @@ async def init_vllm_native_encoder(
vllm_config,
default_sampling_params,
prometheus_temp_dir,
_component_gauges,
) = setup_vllm_engine(config)
# Initialize vLLM Native Encoder Worker Handler
......@@ -1134,6 +1196,7 @@ async def init_multimodal_worker(
vllm_config,
default_sampling_params,
prometheus_temp_dir,
_component_gauges,
) = pre_created_engine
else:
(
......@@ -1141,6 +1204,7 @@ async def init_multimodal_worker(
vllm_config,
default_sampling_params,
prometheus_temp_dir,
_component_gauges,
) = setup_vllm_engine(config)
# Set up decode worker client for disaggregated mode
......
......@@ -5,13 +5,19 @@ import asyncio
import logging
from typing import List, Optional, Tuple
from prometheus_client import CollectorRegistry
from vllm.config import VllmConfig
from vllm.v1.metrics.loggers import StatLoggerBase
from vllm.v1.metrics.stats import IterationStats, SchedulerStats
from dynamo.common.utils.prometheus import LLMBackendMetrics
from dynamo.llm import WorkerMetricsPublisher
from dynamo.runtime import Component
# Create a dedicated registry for dynamo_component metrics
# This ensures these metrics are isolated and can be exposed via their own callback
DYNAMO_COMPONENT_REGISTRY = CollectorRegistry()
class NullStatLogger(StatLoggerBase):
def __init__(self):
......@@ -38,11 +44,13 @@ class DynamoStatLoggerPublisher(StatLoggerBase):
self,
component: Component,
dp_rank: int,
component_gauges: LLMBackendMetrics,
metrics_labels: Optional[List[Tuple[str, str]]] = None,
) -> None:
self.inner = WorkerMetricsPublisher()
self._component = component
self.dp_rank = dp_rank
self.component_gauges = component_gauges
self.num_gpu_block = 1
# Schedule async endpoint creation
self._endpoint_task = asyncio.create_task(self._create_endpoint())
......@@ -71,8 +79,22 @@ class DynamoStatLoggerPublisher(StatLoggerBase):
active_decode_blocks = int(self.num_gpu_block * scheduler_stats.kv_cache_usage)
self.inner.publish(self.dp_rank, active_decode_blocks)
dp_rank_str = str(self.dp_rank)
self.component_gauges.set_total_blocks(dp_rank_str, self.num_gpu_block)
# Set GPU cache usage percentage directly from scheduler_stats
# Note: vLLM's scheduler_stats.kv_cache_usage returns very small values
# (e.g., 0.0000834 for ~0.08% usage), which Prometheus outputs in scientific
# notation (8.34e-05). This is the correct value and will be properly parsed.
self.component_gauges.set_gpu_cache_usage(
dp_rank_str, scheduler_stats.kv_cache_usage
)
def init_publish(self):
self.inner.publish(self.dp_rank, 0)
dp_rank_str = str(self.dp_rank)
self.component_gauges.set_total_blocks(dp_rank_str, 0)
self.component_gauges.set_gpu_cache_usage(dp_rank_str, 0.0)
def log_engine_initialized(self) -> None:
pass
......@@ -84,10 +106,12 @@ class StatLoggerFactory:
def __init__(
self,
component: Component,
component_gauges: Optional[LLMBackendMetrics] = None,
dp_rank: int = 0,
metrics_labels: Optional[List[Tuple[str, str]]] = None,
) -> None:
self.component = component
self.component_gauges = component_gauges
self.created_logger: Optional[DynamoStatLoggerPublisher] = None
self.dp_rank = dp_rank
self.metrics_labels = metrics_labels or []
......@@ -95,8 +119,16 @@ class StatLoggerFactory:
def create_stat_logger(self, dp_rank: int) -> StatLoggerBase:
if self.dp_rank != dp_rank:
return NullStatLogger()
# component_gauges must be set by setup_vllm_engine() before vLLM
# calls create_stat_logger() during engine initialization.
assert (
self.component_gauges is not None
), "component_gauges must be set before creating stat loggers"
logger = DynamoStatLoggerPublisher(
self.component, dp_rank, metrics_labels=self.metrics_labels
self.component,
dp_rank,
component_gauges=self.component_gauges,
metrics_labels=self.metrics_labels,
)
self.created_logger = logger
......
......@@ -140,6 +140,13 @@ class kvrouter:
KV_CACHE_EVENTS_APPLIED = "kv_cache_events_applied"
class kvstats:
# Total number of KV cache blocks available on the worker
TOTAL_BLOCKS = "total_blocks"
# GPU cache usage as a percentage (0.0-1.0)
GPU_CACHE_USAGE_PERCENT = "gpu_cache_usage_percent"
class labels:
"""Automatically inserted Prometheus label names used across the metrics system"""
......@@ -149,6 +156,19 @@ class labels:
NAMESPACE = "dynamo_namespace"
# Label for endpoint identification
ENDPOINT = "dynamo_endpoint"
# Label for worker data-parallel rank.
# Note: this is not an auto-inserted label like `dynamo_namespace`/`dynamo_component`.
# It is used by worker/load-style metrics that need to disambiguate per-worker series.
DP_RANK = "dp_rank"
# Label for model name
MODEL = "model"
# Label for worker type (e.g., "aggregated", "prefill", "decode", "encoder", etc.)
WORKER_TYPE = "worker_type"
class model_info:
# Model load time in seconds
LOAD_TIME_SECONDS = "model_load_time_seconds"
class name_prefix:
......
......@@ -47,11 +47,11 @@
//! - ✅ `dynamo_component_errors_total` - Total error counter (not `total_errors`)
//! - ✅ `dynamo_component_memory_usage_bytes` - Memory usage gauge
//! - ✅ `dynamo_frontend_inflight_requests` - Current inflight requests gauge
//! - ✅ `nats_client_connection_duration_ms` - Connection time in milliseconds
//! - ✅ `dynamo_component_cpu_usage_percent` - CPU usage percentage
//! - ✅ `dynamo_frontend_tokens_per_second` - Token generation rate
//! - ✅ `nats_client_current_connections` - Current active connections gauge
//! - ✅ `nats_client_in_messages` - Total messages received counter
//! - ✅ `dynamo_messaging_client_connection_duration_ms` - Connection time in milliseconds
//! - ✅ `dynamo_messaging_client_current_connections` - Current active connections gauge
//! - ✅ `dynamo_messaging_client_in_messages_total` - Total messages received counter
//!
//! ## Key Differences: Prometheus Metric Names vs Prometheus Label Names
//!
......@@ -80,6 +80,18 @@ pub mod labels {
/// Label for endpoint identification
pub const ENDPOINT: &str = "dynamo_endpoint";
/// Label for worker data-parallel rank.
///
/// Note: this is not an auto-inserted label like `dynamo_namespace`/`dynamo_component`.
/// It is used by worker/load-style metrics that need to disambiguate per-worker series.
pub const DP_RANK: &str = "dp_rank";
/// Label for model name
pub const MODEL: &str = "model";
/// Label for worker type (e.g., "aggregated", "prefill", "decode", "encoder", etc.)
pub const WORKER_TYPE: &str = "worker_type";
}
/// Frontend service metrics (LLM HTTP service)
......@@ -331,6 +343,21 @@ pub mod kvrouter {
pub const KV_CACHE_EVENTS_APPLIED: &str = "kv_cache_events_applied";
}
// KV cache statistics metrics
pub mod kvstats {
/// Total number of KV cache blocks available on the worker
pub const TOTAL_BLOCKS: &str = "total_blocks";
/// GPU cache usage as a percentage (0.0-1.0)
pub const GPU_CACHE_USAGE_PERCENT: &str = "gpu_cache_usage_percent";
}
// Model information metrics
pub mod model_info {
/// Model load time in seconds
pub const LOAD_TIME_SECONDS: &str = "model_load_time_seconds";
}
// Shared regex patterns for Prometheus sanitization
static METRIC_INVALID_CHARS_PATTERN: Lazy<Regex> =
Lazy::new(|| Regex::new(r"[^a-zA-Z0-9_:]").unwrap());
......
......@@ -573,8 +573,9 @@ class MetricsPayload(BasePayload):
# Examples:
# - dynamo_component_requests_total{model="Qwen/Qwen3-0.6B"} 6
# - dynamo_component_uptime_seconds 150.390999059
# Note: Supports scientific notation (e.g., 8.34e-05)
def metric_pattern(name):
return rf"{name}(?:\{{[^}}]*\}})?\s+([\d.]+)"
return rf"{name}(?:\{{[^}}]*\}})?\s+([\d.eE+-]+)"
metrics_to_check = [
MetricCheck(
......@@ -601,6 +602,27 @@ class MetricsPayload(BasePayload):
error_msg=lambda name, value: f"{name} should be > 0, but got {value}",
success_msg=lambda name, value: f"SUCCESS: Found {name} = {value}s",
),
MetricCheck(
name=f"{prefix}_{prometheus_names.kvstats.TOTAL_BLOCKS}",
pattern=metric_pattern,
validator=lambda value: float(value) >= 0,
error_msg=lambda name, value: f"{name} should be >= 0, but got {value}",
success_msg=lambda name, value: f"SUCCESS: Found {name} = {value}",
),
MetricCheck(
name=f"{prefix}_{prometheus_names.kvstats.GPU_CACHE_USAGE_PERCENT}",
pattern=metric_pattern,
validator=lambda value: 0.0 <= float(value) <= 1.0,
error_msg=lambda name, value: f"{name} should be between 0.0 and 1.0, but got {value}",
success_msg=lambda name, value: f"SUCCESS: Found {name} = {value}",
),
MetricCheck(
name=f"{prefix}_{prometheus_names.model_info.LOAD_TIME_SECONDS}",
pattern=metric_pattern,
validator=lambda value: float(value) > 0,
error_msg=lambda name, value: f"{name} should be > 0, but got {value}",
success_msg=lambda name, value: f"SUCCESS: Found {name} = {float(value):.2f}s",
),
]
# Add backend-specific metric checks
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment