Unverified Commit feb6d272 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

chore: nuke ForwardPassMetrics (#5531)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 30c6228b
......@@ -16,11 +16,7 @@ if TYPE_CHECKING:
from dynamo.common.utils.prometheus import register_engine_metrics_callback
from dynamo.llm import (
ForwardPassMetrics,
KvStats,
SpecDecodeStats,
WorkerMetricsPublisher,
WorkerStats,
ZmqKvEventPublisher,
ZmqKvEventPublisherConfig,
)
......@@ -80,14 +76,10 @@ class DynamoSglangPublisher:
self.generate_endpoint = generate_endpoint
self.component = component
self.metrics_publisher = WorkerMetricsPublisher()
self.metrics_publisher.create_endpoint(component)
# Endpoint creation is deferred to async context in setup_sgl_metrics
# Set default values (can be overridden later if needed)
self.request_total_slots = 1024
self.dp_rank = 0
# TODO: Get actual GPU blocks from SGLang engine instead of hardcoded value
# This hardcoded value causes dynamo_component_kvstats_total_blocks to be incorrect.
self.num_gpu_block = 1024
# ZMQ setup for receiving scheduler metrics
self._ctx = zmq.asyncio.Context() # type: ignore
......@@ -100,16 +92,12 @@ class DynamoSglangPublisher:
while True:
try:
kv_metrics = await self._sock.recv_pyobj() # type: ignore
self._record_values(
request_active_slots=kv_metrics.request_active_slots,
request_total_slots=kv_metrics.request_total_slots,
kv_active_blocks=kv_metrics.kv_active_blocks,
kv_total_blocks=kv_metrics.kv_total_blocks,
num_requests_waiting=kv_metrics.num_requests_waiting,
gpu_cache_usage_perc=kv_metrics.gpu_cache_usage_perc,
gpu_prefix_cache_hit_rate=kv_metrics.gpu_prefix_cache_hit_rate,
data_parallel_rank=kv_metrics.data_parallel_rank,
dp_rank = (
kv_metrics.data_parallel_rank
if kv_metrics.data_parallel_rank is not None
else self.dp_rank
)
self.metrics_publisher.publish(dp_rank, kv_metrics.kv_active_blocks)
except Exception:
logging.exception(
"Failed to receive or publish SGLang scheduler metrics"
......@@ -117,26 +105,8 @@ class DynamoSglangPublisher:
def init_engine_metrics_publish(self) -> None:
"""Publish initial dummy metrics to bootstrap the metrics endpoint."""
worker_stats = WorkerStats(
request_active_slots=0,
request_total_slots=self.request_total_slots,
num_requests_waiting=0,
data_parallel_rank=self.dp_rank,
)
kv_stats = KvStats(
kv_active_blocks=0,
# TODO: num_gpu_block to get actual GPU blocks from SGLang engine instead of hardcoded value
kv_total_blocks=self.num_gpu_block,
gpu_cache_usage_perc=0.0,
gpu_prefix_cache_hit_rate=0.0,
)
metrics = ForwardPassMetrics(
worker_stats=worker_stats,
kv_stats=kv_stats,
spec_decode_stats=None,
)
logging.info("Sending dummy metrics to initialize")
self.metrics_publisher.publish(metrics)
self.metrics_publisher.publish(self.dp_rank, 0)
def init_kv_event_publish(self) -> Optional[ZmqKvEventPublisher]:
"""Initialize KV event publisher if configured.
......@@ -162,67 +132,6 @@ class DynamoSglangPublisher:
)
return self.kv_publisher
def _record(
self,
worker_stats: WorkerStats,
kv_stats: KvStats,
spec_decode_stats: Optional[SpecDecodeStats] = None,
) -> None:
"""Package and publish metrics.
Args:
worker_stats: Worker-level statistics.
kv_stats: KV cache statistics.
spec_decode_stats: Optional speculative decoding statistics.
"""
metrics = ForwardPassMetrics(
worker_stats=worker_stats,
kv_stats=kv_stats,
spec_decode_stats=spec_decode_stats,
)
self.metrics_publisher.publish(metrics)
def _record_values(
self,
request_active_slots: int,
request_total_slots: int,
kv_active_blocks: int,
kv_total_blocks: int,
num_requests_waiting: int,
gpu_cache_usage_perc: float,
gpu_prefix_cache_hit_rate: float,
data_parallel_rank: Optional[int] = None,
spec_decode_stats: Optional[SpecDecodeStats] = None,
) -> None:
"""Create stats objects from raw values and publish.
Args:
request_active_slots: Number of active request slots.
request_total_slots: Total number of request slots.
kv_active_blocks: Number of active KV cache blocks.
kv_total_blocks: Total number of KV cache blocks.
num_requests_waiting: Number of queued requests.
gpu_cache_usage_perc: GPU cache utilization percentage.
gpu_prefix_cache_hit_rate: Prefix cache hit rate.
data_parallel_rank: Optional data parallel rank.
spec_decode_stats: Optional speculative decoding statistics.
"""
worker_stats = WorkerStats(
request_active_slots=request_active_slots,
request_total_slots=request_total_slots,
num_requests_waiting=num_requests_waiting,
data_parallel_rank=data_parallel_rank
if data_parallel_rank is not None
else self.dp_rank,
)
kv_stats = KvStats(
kv_active_blocks=kv_active_blocks,
kv_total_blocks=kv_total_blocks,
gpu_cache_usage_perc=gpu_cache_usage_perc,
gpu_prefix_cache_hit_rate=gpu_prefix_cache_hit_rate,
)
self._record(worker_stats, kv_stats, spec_decode_stats)
def setup_prometheus_registry(
engine: sgl.Engine, generate_endpoint: Endpoint
......@@ -280,6 +189,10 @@ async def setup_sgl_metrics(
publisher = DynamoSglangPublisher(
engine, config, component, generate_endpoint, metrics_labels
)
# Create endpoint in async context (must await before publishing)
await publisher.metrics_publisher.create_endpoint(component)
logging.debug("SGLang metrics publisher endpoint created")
publisher.init_engine_metrics_publish()
publisher.init_kv_event_publish()
......
......@@ -33,13 +33,7 @@ from typing import Awaitable, Callable, Optional, Union
import msgpack
import zmq
from dynamo.llm import (
ForwardPassMetrics,
KvEventPublisher,
KvStats,
WorkerMetricsPublisher,
WorkerStats,
)
from dynamo.llm import KvEventPublisher, WorkerMetricsPublisher
logging.basicConfig(level=logging.DEBUG)
......@@ -360,43 +354,12 @@ class Publisher:
def _init_publish_metrics_thread(self):
# Need to publish stats once so that worker can be selected.
# Publishing some dummy values...
request_active_slots = 0
request_total_slots = 4
kv_active_block = 0
kv_total_blocks = 4
num_requests_waiting = 0
gpu_cache_usage_perc = 0.0
gpu_prefix_cache_hit_rate = 0.0
num_requests_waiting = 0
gpu_cache_usage_perc = 0.0
gpu_prefix_cache_hit_rate = 0.0
if self.metrics_publisher is None:
logging.error("KV metrics publisher not initialized!")
return
worker_stats = WorkerStats(
request_active_slots=request_active_slots,
request_total_slots=request_total_slots,
num_requests_waiting=num_requests_waiting,
data_parallel_rank=None,
)
kv_stats = KvStats(
kv_active_blocks=kv_active_block,
kv_total_blocks=kv_total_blocks,
gpu_cache_usage_perc=gpu_cache_usage_perc,
gpu_prefix_cache_hit_rate=gpu_prefix_cache_hit_rate,
)
metrics = ForwardPassMetrics(
worker_stats=worker_stats,
kv_stats=kv_stats,
spec_decode_stats=None,
)
self.metrics_publisher.publish(metrics)
# Publish initial metrics with 0 active blocks
self.metrics_publisher.publish(None, 0)
# Prepare threads for publishing stats but don't start them yet.
# TRTLLM needs to start generating tokens first before stats
......@@ -432,49 +395,12 @@ class Publisher:
stats = self.engine.llm.get_stats_async(timeout=5)
async for stat in stats:
request_active_slots = stat["numActiveRequests"]
request_total_slots = stat["maxNumActiveRequests"]
kv_active_block = stat["kvCacheStats"]["usedNumBlocks"]
kv_total_blocks = stat["kvCacheStats"]["maxNumBlocks"]
reused_blocks = stat["kvCacheStats"]["reusedBlocks"]
freeNumBlocks = stat["kvCacheStats"]["freeNumBlocks"]
allocTotalBlocks = stat["kvCacheStats"]["allocTotalBlocks"]
allocNewBlocks = stat["kvCacheStats"]["allocNewBlocks"]
# NOTE: num paused requests is always 0 when using guarantee no evict scheduler (default).
num_requests_waiting = (
stat["numQueuedRequests"]
+ stat["inflightBatchingStats"]["numPausedRequests"]
)
gpu_cache_usage_perc = allocTotalBlocks / kv_total_blocks
gpu_prefix_cache_hit_rate = stat["kvCacheStats"]["cacheHitRate"]
kv_active_blocks = stat["kvCacheStats"]["usedNumBlocks"]
logging.debug(
f"Publishing stats: request_active_slots: {request_active_slots}, request_total_slots: {request_total_slots}, kv_active_block: {kv_active_block}, kv_total_blocks: {kv_total_blocks}, num_requests_waiting: {num_requests_waiting}, reused_blocks: {reused_blocks}, freeNumBlocks: {freeNumBlocks}, allocTotalBlocks: {allocTotalBlocks}, allocNewBlocks: {allocNewBlocks}, gpu_cache_usage_perc: {gpu_cache_usage_perc}, gpu_prefix_cache_hit_rate: {gpu_prefix_cache_hit_rate}"
)
worker_stats = WorkerStats(
request_active_slots=request_active_slots,
request_total_slots=request_total_slots,
num_requests_waiting=num_requests_waiting,
data_parallel_rank=None,
)
logging.debug(f"Publishing stats: kv_active_blocks: {kv_active_blocks}")
kv_stats = KvStats(
kv_active_blocks=kv_active_block,
kv_total_blocks=kv_total_blocks,
gpu_cache_usage_perc=gpu_cache_usage_perc,
gpu_prefix_cache_hit_rate=gpu_prefix_cache_hit_rate,
)
# TODO: get spec_decode_stats from engine
spec_decode_stats = None
metrics = ForwardPassMetrics(
worker_stats=worker_stats,
kv_stats=kv_stats,
spec_decode_stats=spec_decode_stats,
)
self.metrics_publisher.publish(metrics)
# TRT-LLM doesn't use data parallelism currently (dp_rank=None)
self.metrics_publisher.publish(None, kv_active_blocks)
return True
......
......@@ -526,7 +526,6 @@ async def init(runtime: DistributedRuntime, config: Config):
# TODO Hack to get data, move this to registering in TBD
factory.set_num_gpu_blocks_all(vllm_config.cache_config.num_gpu_blocks)
factory.set_request_total_slots_all(vllm_config.scheduler_config.max_num_seqs)
factory.init_publish()
handler = DecodeWorkerHandler(
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import asyncio
import logging
from typing import List, Optional, Tuple
from vllm.config import VllmConfig
from vllm.v1.metrics.loggers import StatLoggerBase
from vllm.v1.metrics.stats import IterationStats, SchedulerStats
from dynamo.llm import (
ForwardPassMetrics,
KvStats,
SpecDecodeStats,
WorkerMetricsPublisher,
WorkerStats,
)
from dynamo.llm import WorkerMetricsPublisher
from dynamo.runtime import Component
......@@ -45,21 +41,25 @@ class DynamoStatLoggerPublisher(StatLoggerBase):
metrics_labels: Optional[List[Tuple[str, str]]] = None,
) -> None:
self.inner = WorkerMetricsPublisher()
# Use labels directly for the new create_endpoint signature
metrics_labels = metrics_labels or []
self.inner.create_endpoint(component)
self._component = component
self.dp_rank = dp_rank
self.num_gpu_block = 1
self.request_total_slots = 1
# Schedule async endpoint creation
self._endpoint_task = asyncio.create_task(self._create_endpoint())
async def _create_endpoint(self) -> None:
"""Create the NATS endpoint asynchronously."""
try:
await self.inner.create_endpoint(self._component)
logging.debug("vLLM metrics publisher endpoint created")
except Exception:
logging.exception("Failed to create vLLM metrics publisher endpoint")
raise
# TODO: Remove this and pass as metadata through shared storage
def set_num_gpu_block(self, num_blocks):
self.num_gpu_block = num_blocks
# TODO: Remove this and pass as metadata through shared storage
def set_num_request_total_slots(self, request_total_slots):
self.request_total_slots = request_total_slots
def record(
self,
scheduler_stats: SchedulerStats,
......@@ -68,70 +68,11 @@ class DynamoStatLoggerPublisher(StatLoggerBase):
*args,
**kwargs,
):
# request_total_slots and kv_total_blocks are properties of model + gpu
# we should only publish them once, not every metric update
# they should be part of some runtime metadata tied to MDC or put in shared storage ?
hit_rate = 0
if scheduler_stats.prefix_cache_stats.queries > 0:
hit_rate = (
scheduler_stats.prefix_cache_stats.hits
/ scheduler_stats.prefix_cache_stats.queries
)
worker_stats = WorkerStats(
request_active_slots=scheduler_stats.num_running_reqs,
request_total_slots=self.request_total_slots,
num_requests_waiting=scheduler_stats.num_waiting_reqs,
data_parallel_rank=self.dp_rank,
)
kv_stats = KvStats(
kv_active_blocks=int(self.num_gpu_block * scheduler_stats.kv_cache_usage),
kv_total_blocks=self.num_gpu_block,
gpu_cache_usage_perc=scheduler_stats.kv_cache_usage,
gpu_prefix_cache_hit_rate=hit_rate, # TODO: This is a point in time update, not cumulative. Will be problematic on router side if we try to use it.
)
spec_dec_stats = scheduler_stats.spec_decoding_stats
if spec_dec_stats:
spec_dec_stats = SpecDecodeStats(
num_spec_tokens=spec_dec_stats.num_spec_tokens,
num_drafts=spec_dec_stats.num_drafts,
num_draft_tokens=spec_dec_stats.num_draft_tokens,
num_accepted_tokens=spec_dec_stats.num_accepted_tokens,
num_accepted_tokens_per_pos=spec_dec_stats.num_accepted_tokens_per_pos,
)
metrics = ForwardPassMetrics(
worker_stats=worker_stats,
kv_stats=kv_stats,
spec_decode_stats=spec_dec_stats,
)
self.inner.publish(metrics)
active_decode_blocks = int(self.num_gpu_block * scheduler_stats.kv_cache_usage)
self.inner.publish(self.dp_rank, active_decode_blocks)
def init_publish(self):
worker_stats = WorkerStats(
request_active_slots=0,
request_total_slots=self.request_total_slots,
num_requests_waiting=0,
data_parallel_rank=self.dp_rank,
)
kv_stats = KvStats(
kv_active_blocks=0,
kv_total_blocks=self.num_gpu_block,
gpu_cache_usage_perc=0,
gpu_prefix_cache_hit_rate=0,
)
metrics = ForwardPassMetrics(
worker_stats=worker_stats,
kv_stats=kv_stats,
spec_decode_stats=None,
)
self.inner.publish(metrics)
self.inner.publish(self.dp_rank, 0)
def log_engine_initialized(self) -> None:
pass
......@@ -169,10 +110,6 @@ class StatLoggerFactory:
if self.created_logger:
self.created_logger.set_num_gpu_block(num_blocks)
def set_request_total_slots_all(self, request_total_slots):
if self.created_logger:
self.created_logger.set_num_request_total_slots(request_total_slots)
def init_publish(self):
if self.created_logger:
self.created_logger.init_publish()
......@@ -33,7 +33,7 @@ The main KV-aware routing arguments:
- `--no-assume-kv-reuse`: When tracking active blocks, disables the assumption of KV cache reuse. By default (`router_assume_kv_reuse=true`), the router computes actual block hashes for sequence tracking to deduplicate blocks and optimize load balancing. When disabled via this flag, the router generates random hashes for sequence blocks, treating each request's blocks as unique. This is useful in disaggregated setups where prefill transfers blocks to decode workers that may already have those blocks cached, but the engine cannot coordinate transfers to avoid duplication. Without this flag, the router's load balancing heuristics would undercount decode blocks when duplicates exist.
- `--active-decode-blocks-threshold`: Initial threshold (0.0-1.0) for determining when a worker is considered busy based on KV cache block utilization. When a worker's KV cache active blocks exceed this percentage of total blocks, it will be marked as busy and excluded from routing. If not set, blocks-based busy detection is disabled. This feature works with all routing modes (`--router-mode kv|round-robin|random`) as long as backend engines emit `ForwardPassMetrics`. The threshold can be dynamically updated at runtime via the `/busy_threshold` HTTP endpoint (see [Dynamic Threshold Configuration](#dynamic-threshold-configuration)).
- `--active-decode-blocks-threshold`: Initial threshold (0.0-1.0) for determining when a worker is considered busy based on KV cache block utilization. When a worker's KV cache active blocks exceed this percentage of total blocks, it will be marked as busy and excluded from routing. If not set, blocks-based busy detection is disabled. This feature works with all routing modes (`--router-mode kv|round-robin|random`) as long as backend engines publish load metrics. The threshold can be dynamically updated at runtime via the `/busy_threshold` HTTP endpoint (see [Dynamic Threshold Configuration](#dynamic-threshold-configuration)).
- `--active-prefill-tokens-threshold`: Literal token count threshold for determining when a worker is considered busy based on prefill token utilization. When active prefill tokens exceed this threshold, the worker is marked as busy. If not set, tokens-based busy detection is disabled.
......
......@@ -13,19 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import logging
from typing import List, Optional, Tuple
from vllm.config import VllmConfig
from vllm.v1.metrics.loggers import StatLoggerBase
from vllm.v1.metrics.stats import IterationStats, SchedulerStats
from dynamo.llm import (
ForwardPassMetrics,
KvStats,
SpecDecodeStats,
WorkerMetricsPublisher,
WorkerStats,
)
from dynamo.llm import WorkerMetricsPublisher
from dynamo.runtime import Component
......@@ -56,19 +52,25 @@ class DynamoStatLoggerPublisher(StatLoggerBase):
dp_rank: int,
) -> None:
self.inner = WorkerMetricsPublisher()
self.inner.create_endpoint(component)
self._component = component
self.dp_rank = dp_rank
self.num_gpu_block = 1
self.request_total_slots = 1
# Schedule async endpoint creation
self._endpoint_task = asyncio.create_task(self._create_endpoint())
async def _create_endpoint(self) -> None:
"""Create the NATS endpoint asynchronously."""
try:
await self.inner.create_endpoint(self._component)
logging.debug("Multimodal metrics publisher endpoint created")
except Exception:
logging.exception("Failed to create multimodal metrics publisher endpoint")
raise
# TODO: Remove this and pass as metadata through etcd
def set_num_gpu_block(self, num_blocks):
self.num_gpu_block = num_blocks
# TODO: Remove this and pass as metadata through etcd
def set_num_request_total_slots(self, request_total_slots):
self.request_total_slots = request_total_slots
def record(
self,
scheduler_stats: SchedulerStats,
......@@ -77,70 +79,11 @@ class DynamoStatLoggerPublisher(StatLoggerBase):
*args,
**kwargs,
):
# request_total_slots and kv_total_blocks are properties of model + gpu
# we should only publish them once, not every metric update
# they should be part of some runtime metadata tied to MDC or put in etcd ?
hit_rate = 0
if scheduler_stats.prefix_cache_stats.queries > 0:
hit_rate = (
scheduler_stats.prefix_cache_stats.hits
/ scheduler_stats.prefix_cache_stats.queries
)
worker_stats = WorkerStats(
request_active_slots=scheduler_stats.num_running_reqs,
request_total_slots=self.request_total_slots,
num_requests_waiting=scheduler_stats.num_waiting_reqs,
data_parallel_rank=self.dp_rank,
)
kv_stats = KvStats(
kv_active_blocks=int(self.num_gpu_block * scheduler_stats.kv_cache_usage),
kv_total_blocks=self.num_gpu_block,
gpu_cache_usage_perc=scheduler_stats.kv_cache_usage,
gpu_prefix_cache_hit_rate=hit_rate, # TODO: This is a point in time update, not cumulative. Will be problematic on router side if we try to use it.
)
spec_dec_stats = scheduler_stats.spec_decoding_stats
if spec_dec_stats:
spec_dec_stats = SpecDecodeStats(
num_spec_tokens=spec_dec_stats.num_spec_tokens,
num_drafts=spec_dec_stats.num_drafts,
num_draft_tokens=spec_dec_stats.num_draft_tokens,
num_accepted_tokens=spec_dec_stats.num_accepted_tokens,
num_accepted_tokens_per_pos=spec_dec_stats.num_accepted_tokens_per_pos,
)
metrics = ForwardPassMetrics(
worker_stats=worker_stats,
kv_stats=kv_stats,
spec_decode_stats=spec_dec_stats,
)
self.inner.publish(metrics)
active_decode_blocks = int(self.num_gpu_block * scheduler_stats.kv_cache_usage)
self.inner.publish(self.dp_rank, active_decode_blocks)
def init_publish(self):
worker_stats = WorkerStats(
request_active_slots=0,
request_total_slots=self.request_total_slots,
num_requests_waiting=0,
data_parallel_rank=self.dp_rank,
)
kv_stats = KvStats(
kv_active_blocks=0,
kv_total_blocks=self.num_gpu_block,
gpu_cache_usage_perc=0,
gpu_prefix_cache_hit_rate=0,
)
metrics = ForwardPassMetrics(
worker_stats=worker_stats,
kv_stats=kv_stats,
spec_decode_stats=None,
)
self.inner.publish(metrics)
self.inner.publish(self.dp_rank, 0)
def log_engine_initialized(self) -> None:
pass
......@@ -176,10 +119,6 @@ class StatLoggerFactory:
if self.created_logger:
self.created_logger.set_num_gpu_block(num_blocks)
def set_request_total_slots_all(self, request_total_slots):
if self.created_logger:
self.created_logger.set_num_request_total_slots(request_total_slots)
def init_publish(self):
if self.created_logger:
self.created_logger.init_publish()
......@@ -154,9 +154,6 @@ class VllmBaseWorker:
self.stats_logger.set_num_gpu_blocks_all(
vllm_config.cache_config.num_gpu_blocks
)
self.stats_logger.set_request_total_slots_all(
vllm_config.scheduler_config.max_num_seqs
)
self.stats_logger.init_publish()
# TODO: We start off with a valid endpoint, then we increment it by dp_rank
......
......@@ -17,12 +17,12 @@ Usage (both patterns supported):
# Pattern 1: Import module
from dynamo import prometheus_names
print(prometheus_names.frontend_service.REQUESTS_TOTAL) # "requests_total"
print(prometheus_names.kvstats.ACTIVE_BLOCKS) # "kvstats_active_blocks"
print(prometheus_names.work_handler.ERRORS_TOTAL) # "errors_total"
# Pattern 2: Import specific classes
from dynamo.prometheus_names import frontend_service, kvstats
from dynamo.prometheus_names import frontend_service, work_handler
print(frontend_service.REQUESTS_TOTAL) # "requests_total"
print(kvstats.ACTIVE_BLOCKS) # "kvstats_active_blocks"
print(work_handler.ERRORS_TOTAL) # "errors_total"
"""
from __future__ import annotations
......
......@@ -176,10 +176,6 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<context::Context>()?;
m.add_class::<ModelType>()?;
m.add_class::<ModelInput>()?;
m.add_class::<llm::kv::ForwardPassMetrics>()?;
m.add_class::<llm::kv::WorkerStats>()?;
m.add_class::<llm::kv::KvStats>()?;
m.add_class::<llm::kv::SpecDecodeStats>()?;
m.add_class::<llm::kv::KvPushRouter>()?;
m.add_class::<llm::kv::KvPushRouterStream>()?;
m.add_class::<RouterMode>()?;
......
......@@ -11,10 +11,6 @@ use tokio_stream::StreamExt;
use super::*;
use crate::Component;
use llm_rs::kv_router::indexer::KvIndexerInterface;
use llm_rs::kv_router::protocols::ForwardPassMetrics as RsForwardPassMetrics;
use llm_rs::kv_router::protocols::KvStats as RsKvStats;
use llm_rs::kv_router::protocols::SpecDecodeStats as RsSpecDecodeStats;
use llm_rs::kv_router::protocols::WorkerStats as RsWorkerStats;
use llm_rs::kv_router::protocols::compute_block_hash_for_seq;
use rs::pipeline::{AsyncEngine, SingleIn};
use rs::traits::events::EventSubscriber;
......@@ -84,11 +80,6 @@ impl WorkerMetricsPublisher {
let rs_publisher = self.inner.clone();
let rs_component = component.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
// Register Prometheus metrics first
rs_publisher
.register_prometheus_metrics(&rs_component)
.map_err(to_pyerr)?;
rs_publisher
.create_endpoint(rs_component)
.await
......@@ -97,11 +88,15 @@ impl WorkerMetricsPublisher {
})
}
#[pyo3(signature = (metrics))]
fn publish(&self, _py: Python, metrics: &ForwardPassMetrics) -> PyResult<()> {
// Create and publish the complete metrics
/// Publish worker metrics for load monitoring.
///
/// # Arguments
/// * `dp_rank` - Data parallel rank of the worker (None defaults to 0)
/// * `active_decode_blocks` - Number of active KV cache blocks
#[pyo3(signature = (dp_rank, active_decode_blocks))]
fn publish(&self, dp_rank: Option<u32>, active_decode_blocks: u64) -> PyResult<()> {
self.inner
.publish(metrics.0.clone().into())
.publish(dp_rank, active_decode_blocks)
.map_err(to_pyerr)
}
}
......@@ -969,98 +964,6 @@ impl KvRecorder {
}
}
#[pyclass]
#[repr(transparent)]
pub struct ForwardPassMetrics(pub RsForwardPassMetrics);
#[pyclass]
#[repr(transparent)]
pub struct WorkerStats(pub RsWorkerStats);
#[pyclass]
#[repr(transparent)]
pub struct KvStats(pub RsKvStats);
#[pyclass]
#[repr(transparent)]
pub struct SpecDecodeStats(pub RsSpecDecodeStats);
#[pymethods]
impl ForwardPassMetrics {
#[new]
#[pyo3(signature = (worker_stats, kv_stats, spec_decode_stats = None))]
fn new(
worker_stats: &WorkerStats,
kv_stats: &KvStats,
spec_decode_stats: Option<&SpecDecodeStats>,
) -> Self {
Self(RsForwardPassMetrics {
worker_stats: worker_stats.0.clone(),
kv_stats: kv_stats.0.clone(),
spec_decode_stats: spec_decode_stats.map(|s| s.0.clone()),
})
}
}
#[pymethods]
impl WorkerStats {
#[new]
#[pyo3(signature = (request_active_slots, request_total_slots, num_requests_waiting, data_parallel_rank=None))]
fn new(
request_active_slots: u64,
request_total_slots: u64,
num_requests_waiting: u64,
data_parallel_rank: Option<DpRank>,
) -> Self {
Self(RsWorkerStats {
data_parallel_rank,
request_active_slots,
request_total_slots,
num_requests_waiting,
})
}
}
#[pymethods]
impl KvStats {
#[new]
#[pyo3(signature = (kv_active_blocks, kv_total_blocks, gpu_cache_usage_perc, gpu_prefix_cache_hit_rate))]
fn new(
kv_active_blocks: u64,
kv_total_blocks: u64,
gpu_cache_usage_perc: f32,
gpu_prefix_cache_hit_rate: f32,
) -> Self {
Self(RsKvStats {
kv_active_blocks,
kv_total_blocks,
gpu_cache_usage_perc,
gpu_prefix_cache_hit_rate,
})
}
}
#[pymethods]
impl SpecDecodeStats {
#[new]
#[pyo3(signature = (num_spec_tokens, num_drafts, num_draft_tokens, num_accepted_tokens, num_accepted_tokens_per_pos))]
fn new(
num_spec_tokens: Option<u32>,
num_drafts: Option<u32>,
num_draft_tokens: Option<u32>,
num_accepted_tokens: Option<u32>,
num_accepted_tokens_per_pos: Option<Vec<u32>>,
) -> Self {
Self(RsSpecDecodeStats {
num_spec_tokens,
num_drafts,
num_draft_tokens,
num_accepted_tokens,
num_accepted_tokens_per_pos,
})
}
}
/// Helper function to create a KV router from an endpoint using the ModelManager
/// to ensure proper etcd registration
async fn create_kv_router_from_endpoint(
......
......@@ -397,86 +397,9 @@ class Context:
"""
...
class WorkerStats:
"""
Worker stats.
"""
...
def __init__(
self,
request_active_slots: int,
request_total_slots: int,
num_requests_waiting: int,
data_parallel_rank: Optional[int] = None,
) -> None:
"""
Create a `WorkerStats` object.
"""
...
class KvStats:
"""
KV stats.
"""
...
def __init__(
self,
kv_active_blocks: int,
kv_total_blocks: int,
gpu_cache_usage_perc: float,
gpu_prefix_cache_hit_rate: float,
) -> None:
"""
Create a `KvStats` object.
"""
...
class SpecDecodeStats:
"""
Speculative decoding stats.
"""
...
def __init__(
self,
num_spec_tokens: int,
num_drafts: int,
num_draft_tokens: int,
num_accepted_tokens: int,
num_accepted_tokens_per_pos: List[int],
) -> None:
"""
Create a `SpecDecodeStats` object when running with speculative decoding.
"""
...
class ForwardPassMetrics:
"""
A collection of metrics for a forward pass.
Includes worker stats, KV stats, and speculative decoding stats.
"""
...
def __init__(
self,
worker_stats: WorkerStats,
kv_stats: KvStats,
spec_decode_stats: Optional[SpecDecodeStats] = None,
) -> None:
"""
Create a `ForwardPassMetrics` object
"""
...
class WorkerMetricsPublisher:
"""
A metrics publisher will provide metrics to the router.
A metrics publisher will provide metrics to the router for load monitoring.
"""
...
......@@ -486,8 +409,10 @@ class WorkerMetricsPublisher:
Create a `WorkerMetricsPublisher` object
"""
def create_endpoint(self, component: Component) -> None:
async def create_endpoint(self, component: Component) -> None:
"""
Create the NATS endpoint for metrics publishing. Must be awaited.
Only service created through this method will interact with KV router of the same component.
Args:
......@@ -496,10 +421,15 @@ class WorkerMetricsPublisher:
def publish(
self,
metrics: ForwardPassMetrics
dp_rank: Optional[int],
active_decode_blocks: int,
) -> None:
"""
Update the metrics being reported.
Publish worker metrics for load monitoring.
Args:
dp_rank: Data parallel rank of the worker (None defaults to 0)
active_decode_blocks: Number of active KV cache blocks
"""
...
......
......@@ -8,7 +8,6 @@ import logging
from dynamo._core import ApproxKvIndexer as ApproxKvIndexer
from dynamo._core import EngineType
from dynamo._core import EntrypointArgs as EntrypointArgs
from dynamo._core import ForwardPassMetrics as ForwardPassMetrics
from dynamo._core import HttpAsyncEngine as HttpAsyncEngine
from dynamo._core import HttpService as HttpService
from dynamo._core import KserveGrpcService as KserveGrpcService
......@@ -17,7 +16,6 @@ from dynamo._core import KvIndexer as KvIndexer
from dynamo._core import KvPushRouter as KvPushRouter
from dynamo._core import KvRecorder as KvRecorder
from dynamo._core import KvRouterConfig as KvRouterConfig
from dynamo._core import KvStats as KvStats
from dynamo._core import LoRADownloader as LoRADownloader
from dynamo._core import MediaDecoder as MediaDecoder
from dynamo._core import MediaFetcher as MediaFetcher
......@@ -30,9 +28,7 @@ from dynamo._core import PythonAsyncEngine as PythonAsyncEngine
from dynamo._core import RadixTree as RadixTree
from dynamo._core import RouterConfig as RouterConfig
from dynamo._core import RouterMode as RouterMode
from dynamo._core import SpecDecodeStats as SpecDecodeStats
from dynamo._core import WorkerMetricsPublisher as WorkerMetricsPublisher
from dynamo._core import WorkerStats as WorkerStats
from dynamo._core import ZmqKvEventListener as ZmqKvEventListener
from dynamo._core import ZmqKvEventPublisher as ZmqKvEventPublisher
from dynamo._core import ZmqKvEventPublisherConfig as ZmqKvEventPublisherConfig
......
......@@ -17,12 +17,12 @@ Usage (both patterns supported):
# Pattern 1: Import module
from dynamo import prometheus_names
print(prometheus_names.frontend_service.REQUESTS_TOTAL) # "requests_total"
print(prometheus_names.kvstats.ACTIVE_BLOCKS) # "kvstats_active_blocks"
print(prometheus_names.work_handler.ERRORS_TOTAL) # "errors_total"
# Pattern 2: Import specific classes
from dynamo.prometheus_names import frontend_service, kvstats
from dynamo.prometheus_names import frontend_service, work_handler
print(frontend_service.REQUESTS_TOTAL) # "requests_total"
print(kvstats.ACTIVE_BLOCKS) # "kvstats_active_blocks"
print(work_handler.ERRORS_TOTAL) # "errors_total"
"""
from __future__ import annotations
......@@ -103,6 +103,20 @@ class kvbm:
HOST_CACHE_HIT_RATE = "host_cache_hit_rate"
# Disk cache hit rate (0.0-1.0) from the sliding window
DISK_CACHE_HIT_RATE = "disk_cache_hit_rate"
# Object storage cache hit rate (0.0-1.0) from the sliding window
OBJECT_CACHE_HIT_RATE = "object_cache_hit_rate"
# Number of blocks offloaded from device to object storage
OFFLOAD_BLOCKS_D2O = "offload_blocks_d2o"
# Number of blocks onboarded from object storage to device
ONBOARD_BLOCKS_O2D = "onboard_blocks_o2d"
# Bytes transferred to object storage (offload)
OFFLOAD_BYTES_OBJECT = "offload_bytes_object"
# Bytes transferred from object storage (onboard)
ONBOARD_BYTES_OBJECT = "onboard_bytes_object"
# Number of failed object storage read operations (blocks)
OBJECT_READ_FAILURES = "object_read_failures"
# Number of failed object storage write operations (blocks)
OBJECT_WRITE_FAILURES = "object_write_failures"
class kvrouter:
......@@ -110,21 +124,6 @@ class kvrouter:
KV_CACHE_EVENTS_APPLIED = "kv_cache_events_applied"
class kvstats:
"""KvStats metrics from LLM workers"""
# Prefix for all KvStats metrics
PREFIX = ""
# Number of active KV cache blocks currently in use
ACTIVE_BLOCKS = "kvstats_active_blocks"
# Total number of KV cache blocks available
TOTAL_BLOCKS = "kvstats_total_blocks"
# GPU cache usage as a percentage (0.0-1.0)
GPU_CACHE_USAGE_PERCENT = "kvstats_gpu_cache_usage_percent"
# GPU prefix cache hit rate as a percentage (0.0-1.0)
GPU_PREFIX_CACHE_HIT_RATE = "kvstats_gpu_prefix_cache_hit_rate"
class labels:
"""Automatically inserted Prometheus label names used across the metrics system"""
......
......@@ -155,68 +155,6 @@ pub struct WorkerSelectionResult {
pub overlap_blocks: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
pub struct ForwardPassMetrics {
pub worker_stats: WorkerStats,
pub kv_stats: KvStats,
pub spec_decode_stats: Option<SpecDecodeStats>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
pub struct WorkerStats {
// https://lmsys.org/blog/2024-12-04-sglang-v0-4/#data-parallelism-attention-for-deepseek-models
pub data_parallel_rank: Option<DpRank>,
pub request_active_slots: u64,
pub request_total_slots: u64,
pub num_requests_waiting: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
pub struct KvStats {
pub kv_active_blocks: u64,
pub kv_total_blocks: u64,
// percentage represented as a float from 0 to 1
pub gpu_cache_usage_perc: f32,
// percentage represented as a float from 0 to 1
pub gpu_prefix_cache_hit_rate: f32,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
pub struct PredictiveLoadMetrics {
pub kv_active_blocks: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum LoadMetrics {
EngineLoadMetrics(ForwardPassMetrics),
PredictiveLoadMetrics(PredictiveLoadMetrics),
}
impl LoadMetrics {
pub fn kv_active_blocks(&self) -> u64 {
match self {
LoadMetrics::EngineLoadMetrics(metrics) => metrics.kv_stats.kv_active_blocks,
LoadMetrics::PredictiveLoadMetrics(metrics) => metrics.kv_active_blocks,
}
}
}
impl Default for LoadMetrics {
fn default() -> Self {
LoadMetrics::PredictiveLoadMetrics(PredictiveLoadMetrics::default())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
pub struct SpecDecodeStats {
pub num_spec_tokens: Option<u32>,
pub num_drafts: Option<u32>,
pub num_draft_tokens: Option<u32>,
pub num_accepted_tokens: Option<u32>,
pub num_accepted_tokens_per_pos: Option<Vec<u32>>,
}
/// Active load metrics for a worker, used for busy detection.
///
/// Published by workers (with only `active_decode_blocks`) and by the scheduler
......
......@@ -2,8 +2,8 @@
// SPDX-License-Identifier: Apache-2.0
use std::fmt;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, OnceLock};
use std::time::Duration;
use anyhow::Result;
......@@ -15,7 +15,6 @@ use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use zeromq::{Socket, SocketRecv, SubSocket};
use dynamo_runtime::metrics::{MetricsHierarchy, prometheus_names::kvstats};
use dynamo_runtime::traits::{DistributedRuntimeProvider, events::EventPublisher};
use dynamo_runtime::{
component::{Component, Namespace},
......@@ -824,106 +823,42 @@ impl<'de> Visitor<'de> for RawKvEventVisitor {
// Metrics Publishers ------------------------------------------------------
// -------------------------------------------------------------------------
pub struct WorkerMetricsPublisher {
tx: tokio::sync::watch::Sender<Arc<ForwardPassMetrics>>,
rx: tokio::sync::watch::Receiver<Arc<ForwardPassMetrics>>,
/// Prometheus gauges for KvStats metrics
/// We use OnceLock for efficient one-time initialization and lock-free reads
/// The gauges are set once during register_prometheus_metrics and then only read
prometheus_gauges: OnceLock<KvStatsPrometheusGauges>,
}
struct KvStatsPrometheusGauges {
kv_active_blocks_gauge: prometheus::Gauge,
kv_total_blocks_gauge: prometheus::Gauge,
gpu_cache_usage_gauge: prometheus::Gauge,
gpu_prefix_cache_hit_rate_gauge: prometheus::Gauge,
/// Metrics data passed through the channel for NATS publishing
#[derive(Debug, Clone, Default)]
struct WorkerMetrics {
dp_rank: DpRank,
active_decode_blocks: u64,
}
impl KvStatsPrometheusGauges {
/// Create a new KvStatsPrometheusGauges instance with all metrics registered
fn new(component: &Component) -> Result<Self> {
let kv_active_blocks_gauge = component.metrics().create_gauge(
kvstats::ACTIVE_BLOCKS,
"Number of active KV cache blocks currently in use",
&[],
)?;
let kv_total_blocks_gauge = component.metrics().create_gauge(
kvstats::TOTAL_BLOCKS,
"Total number of KV cache blocks available",
&[],
)?;
let gpu_cache_usage_gauge = component.metrics().create_gauge(
kvstats::GPU_CACHE_USAGE_PERCENT,
"GPU cache usage as a percentage (0.0-1.0)",
&[],
)?;
let gpu_prefix_cache_hit_rate_gauge = component.metrics().create_gauge(
kvstats::GPU_PREFIX_CACHE_HIT_RATE,
"GPU prefix cache hit rate as a percentage (0.0-1.0)",
&[],
)?;
tracing::info!("Registered KvStats Prometheus metrics");
Ok(KvStatsPrometheusGauges {
kv_active_blocks_gauge,
kv_total_blocks_gauge,
gpu_cache_usage_gauge,
gpu_prefix_cache_hit_rate_gauge,
})
}
/// Update all gauges with values from KvStats
fn update_from_kvstats(&self, kv_stats: &KvStats) {
self.kv_active_blocks_gauge
.set(kv_stats.kv_active_blocks as f64);
self.kv_total_blocks_gauge
.set(kv_stats.kv_total_blocks as f64);
self.gpu_cache_usage_gauge
.set(kv_stats.gpu_cache_usage_perc as f64);
self.gpu_prefix_cache_hit_rate_gauge
.set(kv_stats.gpu_prefix_cache_hit_rate as f64);
}
pub struct WorkerMetricsPublisher {
tx: tokio::sync::watch::Sender<WorkerMetrics>,
rx: tokio::sync::watch::Receiver<WorkerMetrics>,
}
impl WorkerMetricsPublisher {
pub fn new() -> Result<Self> {
let (tx, rx) = tokio::sync::watch::channel(Arc::new(ForwardPassMetrics::default()));
Ok(WorkerMetricsPublisher {
tx,
rx,
prometheus_gauges: OnceLock::new(),
})
}
pub fn publish(
&self,
metrics: Arc<ForwardPassMetrics>,
) -> Result<(), tokio::sync::watch::error::SendError<Arc<ForwardPassMetrics>>> {
tracing::trace!("Publish metrics: {metrics:?}");
// Update Prometheus gauges - OnceLock provides lock-free reads after initialization
// This is the hot path - we only read the Arc, no locking overhead
if let Some(gauges) = self.prometheus_gauges.get() {
gauges.update_from_kvstats(&metrics.kv_stats);
let (tx, rx) = tokio::sync::watch::channel(WorkerMetrics::default());
Ok(WorkerMetricsPublisher { tx, rx })
}
self.tx.send(metrics)
}
/// Register KvStats Prometheus metrics with the component's registry
pub fn register_prometheus_metrics(&self, component: &Component) -> Result<()> {
// Use get_or_init for thread-safe one-time initialization
// This will only initialize once, subsequent calls will return immediately
self.prometheus_gauges.get_or_init(|| {
KvStatsPrometheusGauges::new(component).expect("Failed to create Prometheus gauges")
});
Ok(())
/// Publish worker metrics for load monitoring.
///
/// # Arguments
/// * `dp_rank` - Data parallel rank of the worker (None defaults to 0)
/// * `active_decode_blocks` - Number of active KV cache blocks
pub fn publish(&self, dp_rank: Option<DpRank>, active_decode_blocks: u64) -> Result<()> {
let metrics = WorkerMetrics {
dp_rank: dp_rank.unwrap_or(0),
active_decode_blocks,
};
tracing::trace!(
"Publish metrics: dp_rank={}, active_decode_blocks={}",
metrics.dp_rank,
metrics.active_decode_blocks
);
self.tx
.send(metrics)
.map_err(|_| anyhow::anyhow!("metrics channel closed"))
}
pub async fn create_endpoint(&self, component: Component) -> Result<()> {
......@@ -934,16 +869,15 @@ impl WorkerMetricsPublisher {
/// Starts a background task to publish metrics over NATS
///
/// This task monitors metric changes (specifically kv_active_blocks and num_requests_waiting)
/// This task monitors metric changes (specifically active_decode_blocks)
/// and publishes stable metrics to NATS after they've been unchanged for 1ms.
fn start_nats_metrics_publishing(&self, namespace: Namespace, worker_id: u64) {
let nats_rx = self.rx.clone();
tokio::spawn(async move {
let mut rx = nats_rx;
let mut last_kv_active_blocks: Option<u64> = Some(0);
let mut last_num_requests_waiting: Option<u64> = Some(0);
let mut pending_publish: Option<Arc<ForwardPassMetrics>> = None;
let mut last_active_decode_blocks: Option<u64> = Some(0);
let mut pending_publish: Option<WorkerMetrics> = None;
let mut publish_timer =
Box::pin(tokio::time::sleep(tokio::time::Duration::from_secs(0)));
publish_timer.as_mut().reset(tokio::time::Instant::now()); // Complete immediately
......@@ -961,25 +895,16 @@ impl WorkerMetricsPublisher {
let metrics = rx.borrow_and_update().clone();
// Extract the values we care about
let current_kv_active_blocks = metrics.kv_stats.kv_active_blocks;
let current_num_requests_waiting =
metrics.worker_stats.num_requests_waiting;
// Check if these specific metrics have changed
let has_changed = match (last_kv_active_blocks, last_num_requests_waiting) {
(Some(last_kv), Some(last_requests)) => {
last_kv != current_kv_active_blocks
|| last_requests != current_num_requests_waiting
}
_ => true, // First time, consider it changed
// Check if active_decode_blocks has changed
let has_changed = match last_active_decode_blocks {
Some(last) => last != metrics.active_decode_blocks,
None => true, // First time, consider it changed
};
// If load metrics changed, schedule a publish
if has_changed {
pending_publish = Some(metrics.clone());
last_kv_active_blocks = Some(current_kv_active_blocks);
last_num_requests_waiting = Some(current_num_requests_waiting);
last_active_decode_blocks = Some(metrics.active_decode_blocks);
// Start the 1ms timer
publish_timer.as_mut().reset(
......@@ -990,11 +915,10 @@ impl WorkerMetricsPublisher {
// Timer expired - publish if we have pending metrics
_ = &mut publish_timer => {
if let Some(metrics) = pending_publish.take() {
// Create ActiveLoad with only active_decode_blocks (worker doesn't know prefill tokens)
let active_load = ActiveLoad {
worker_id,
dp_rank: metrics.worker_stats.data_parallel_rank.unwrap_or(0),
active_decode_blocks: Some(metrics.kv_stats.kv_active_blocks),
dp_rank: metrics.dp_rank,
active_decode_blocks: Some(metrics.active_decode_blocks),
active_prefill_tokens: None,
};
......@@ -1876,7 +1800,7 @@ mod test_exponential_backoff {
#[cfg(all(test, feature = "integration"))]
mod test_integration_publisher {
use super::*;
use crate::kv_router::protocols::{ActiveLoad, ForwardPassMetrics, KvStats, WorkerStats};
use crate::kv_router::protocols::ActiveLoad;
use dynamo_runtime::distributed_test_utils::create_test_drt_async;
use dynamo_runtime::traits::events::EventSubscriber;
use futures::StreamExt;
......@@ -1907,23 +1831,7 @@ mod test_integration_publisher {
// Test 1: Publish 10 different metrics with 0.5ms intervals
// Only the last one should be published after 1ms of stability
for i in 0..10 {
let metrics = Arc::new(ForwardPassMetrics {
kv_stats: KvStats {
kv_active_blocks: (i * 100) as u64, // Changing load metric
kv_total_blocks: 1000,
gpu_cache_usage_perc: 0.5,
gpu_prefix_cache_hit_rate: 0.8,
},
worker_stats: WorkerStats {
num_requests_waiting: (i * 10) as u64, // Changing load metric
data_parallel_rank: None,
request_active_slots: 50,
request_total_slots: 100,
},
spec_decode_stats: None,
});
publisher.publish(metrics).unwrap();
publisher.publish(None, (i * 100) as u64).unwrap();
tokio::time::sleep(tokio::time::Duration::from_micros(100)).await;
}
......@@ -1946,25 +1854,9 @@ mod test_integration_publisher {
tokio::time::timeout(tokio::time::Duration::from_millis(50), subscriber.next()).await;
assert!(no_msg.is_err(), "Expected no more messages, but found one");
// Test 2: Publish 10 more metrics where everything changes EXCEPT the load metrics
for i in 0..10 {
let metrics = Arc::new(ForwardPassMetrics {
kv_stats: KvStats {
kv_active_blocks: 900, // Keep same as last published
kv_total_blocks: 1000 + (i * 100) as u64, // Change other metrics
gpu_cache_usage_perc: 0.3 + (i as f32 * 0.05), // Change other metrics
gpu_prefix_cache_hit_rate: 0.7 + (i as f32 * 0.01), // Change other metrics
},
worker_stats: WorkerStats {
num_requests_waiting: 90, // Keep same as last published
data_parallel_rank: None,
request_active_slots: 40 + (i * 5) as u64, // Change other metrics
request_total_slots: 100 + (i * 10) as u64, // Change other metrics
},
spec_decode_stats: None,
});
publisher.publish(metrics).unwrap();
// Test 2: Publish 10 more metrics with same active_decode_blocks - should not trigger publish
for _ in 0..10 {
publisher.publish(None, 900).unwrap(); // Keep same as last published
tokio::time::sleep(tokio::time::Duration::from_micros(100)).await;
}
......@@ -1983,85 +1875,4 @@ mod test_integration_publisher {
Ok(())
}
#[tokio::test]
#[ignore] // Mark as ignored as requested, because CI's integrations still don't have NATS
async fn test_kvstats_prometheus_gauge_updates() {
// Test that publish() updates Prometheus gauges correctly using real Component
let publisher = WorkerMetricsPublisher::new().unwrap();
// Create a real DRT and component for integration testing
let drt = create_test_drt_async().await;
let namespace = drt.namespace("ns2002".to_string()).unwrap();
let component = namespace.component("comp2002".to_string()).unwrap();
// Register Prometheus metrics using the real constructor
publisher.register_prometheus_metrics(&component).unwrap();
// Get references to the gauges for testing
let gauges = publisher.prometheus_gauges.get().unwrap();
let active_blocks_gauge = gauges.kv_active_blocks_gauge.clone();
let total_blocks_gauge = gauges.kv_total_blocks_gauge.clone();
let cache_usage_gauge = gauges.gpu_cache_usage_gauge.clone();
let hit_rate_gauge = gauges.gpu_prefix_cache_hit_rate_gauge.clone();
// Create test metrics with specific values
let test_metrics = Arc::new(ForwardPassMetrics {
worker_stats: WorkerStats {
data_parallel_rank: None,
request_active_slots: 5,
request_total_slots: 100,
num_requests_waiting: 2,
},
kv_stats: KvStats {
kv_active_blocks: 42,
kv_total_blocks: 12894,
gpu_cache_usage_perc: 0.5,
gpu_prefix_cache_hit_rate: 0.75,
},
spec_decode_stats: None,
});
// Test 1: Initial gauge values should be 0
assert_eq!(active_blocks_gauge.get(), 0.0);
assert_eq!(total_blocks_gauge.get(), 0.0);
assert_eq!(cache_usage_gauge.get(), 0.0);
assert_eq!(hit_rate_gauge.get(), 0.0);
// Test 2: publish() should update all gauges with correct values
let result = publisher.publish(test_metrics);
assert!(result.is_ok());
// Test 3: Verify gauges were updated correctly
assert_eq!(active_blocks_gauge.get(), 42.0);
assert_eq!(total_blocks_gauge.get(), 12894.0);
assert_eq!(cache_usage_gauge.get(), 0.5);
assert_eq!(hit_rate_gauge.get(), 0.75);
// Test 4: Verify metrics are properly registered in the component's registry
// Component implements MetricsRegistry trait which provides prometheus_expfmt()
let prometheus_output = component.metrics().prometheus_expfmt().unwrap();
// Verify metric names are present
assert!(prometheus_output.contains(kvstats::ACTIVE_BLOCKS));
assert!(prometheus_output.contains(kvstats::TOTAL_BLOCKS));
assert!(prometheus_output.contains(kvstats::GPU_CACHE_USAGE_PERCENT));
assert!(prometheus_output.contains(kvstats::GPU_PREFIX_CACHE_HIT_RATE));
// Test 5: Verify the prometheus output contains the actual values
// Print the output to debug format issues
println!("Prometheus output:\n{}", prometheus_output);
// Check for metric values - the format includes labels so we need to be more flexible
assert!(prometheus_output.contains("kvstats_active_blocks"));
assert!(prometheus_output.contains("42")); // The value should be there
assert!(prometheus_output.contains("kvstats_total_blocks"));
assert!(prometheus_output.contains("12894")); // The value should be there
assert!(prometheus_output.contains("kvstats_gpu_cache_usage_percent"));
assert!(prometheus_output.contains("kvstats_gpu_prefix_cache_hit_rate"));
println!(
"✅ KvStatsPrometheusGauges constructor and publish() work correctly with real Component"
);
}
}
......@@ -207,10 +207,9 @@ impl MockVllmEngine {
}
tracing::debug!("Starting metrics background tasks");
for (dp_rank, scheduler) in schedulers.iter().enumerate() {
for scheduler in schedulers.iter() {
let mut metrics_rx = scheduler.metrics_receiver();
let publisher = metrics_publisher.clone();
let dp_rank = dp_rank as u32;
let cancel_token = cancel_token.clone();
tokio::spawn(async move {
......@@ -221,15 +220,15 @@ impl MockVllmEngine {
// Get the latest metrics
let metrics = metrics_rx.borrow().clone();
// Publish metrics
if let Err(e) = publisher.publish(Arc::new(metrics)) {
tracing::warn!("Failed to publish metrics for DP rank {dp_rank}: {e}");
// Publish metrics using flat API
if let Err(e) = publisher.publish(Some(metrics.dp_rank), metrics.active_decode_blocks) {
tracing::warn!("Failed to publish metrics for DP rank {}: {e}", metrics.dp_rank);
} else {
tracing::trace!("Published metrics for DP rank {}", dp_rank);
tracing::trace!("Published metrics for DP rank {}", metrics.dp_rank);
}
}
_ = cancel_token.cancelled() => {
tracing::debug!("Metrics publishing cancelled for DP rank {dp_rank}");
tracing::debug!("Metrics publishing cancelled");
break;
}
}
......
......@@ -28,7 +28,7 @@
//! ## NOTE
//! The current prefill and decoding time simulations are not scientific at all and are WIP
use crate::kv_router::protocols::{ForwardPassMetrics, KvStats, WorkerStats};
use crate::kv_router::protocols::DpRank;
use crate::mocker::evictor::LRUEvictor;
use crate::mocker::kv_manager::KvManager;
use crate::mocker::perf_model::PerfModel;
......@@ -44,6 +44,13 @@ use tokio::time::Duration;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
/// Simple metrics struct for mocker's internal use
#[derive(Clone, Default, Debug)]
pub struct MockerMetrics {
pub dp_rank: DpRank,
pub active_decode_blocks: u64,
}
/// Enum representing either a direct request or an active sequence
pub enum Request {
Direct(DirectRequest),
......@@ -238,7 +245,7 @@ impl SchedulerState {
#[derive(Clone)]
pub struct Scheduler {
request_tx: mpsc::UnboundedSender<DirectRequest>,
metrics_rx: tokio::sync::watch::Receiver<ForwardPassMetrics>,
metrics_rx: tokio::sync::watch::Receiver<MockerMetrics>,
}
impl Scheduler {
......@@ -259,10 +266,12 @@ impl Scheduler {
// Create channel for request handling
let (request_tx, mut request_rx) = mpsc::unbounded_channel::<DirectRequest>();
let mut initial_metrics = ForwardPassMetrics::default();
initial_metrics.worker_stats.data_parallel_rank = Some(dp_rank);
let initial_metrics = MockerMetrics {
dp_rank,
active_decode_blocks: 0,
};
let (metrics_tx, metrics_rx) =
tokio::sync::watch::channel::<ForwardPassMetrics>(initial_metrics);
tokio::sync::watch::channel::<MockerMetrics>(initial_metrics);
let cancel_token_clone = cancellation_token.unwrap_or_default().clone();
......@@ -311,12 +320,10 @@ impl Scheduler {
let total_time = prefill_time + decode_time;
// 4. Send metrics once per forward pass (after all prefill and decode processing)
let _ = metrics_tx.send(get_fwd_pass_metrics(
&state,
&kv_manager,
&hit_rates,
let _ = metrics_tx.send(MockerMetrics {
dp_rank,
));
active_decode_blocks: kv_manager.num_active_blocks() as u64,
});
// 5. Sleep to maintain target iteration timing
let target_duration =
......@@ -345,7 +352,7 @@ impl Scheduler {
}
/// Get a watch receiver for forward pass metrics
pub fn metrics_receiver(&self) -> tokio::sync::watch::Receiver<ForwardPassMetrics> {
pub fn metrics_receiver(&self) -> tokio::sync::watch::Receiver<MockerMetrics> {
self.metrics_rx.clone()
}
}
......@@ -492,52 +499,6 @@ fn simulate_decode(
total_time
}
/// Calculate forward pass metrics from current state
fn get_fwd_pass_metrics(
state: &SchedulerState,
kv_manager: &KvManager,
hit_rates: &RunningMean<f32>,
dp_rank: u32,
) -> ForwardPassMetrics {
// Get state metrics
let request_active_slots = state.decode.len() as u64;
let num_requests_waiting = state.waiting.len() as u64;
// Get KV manager metrics
let active_blocks_count = kv_manager.num_active_blocks() as u64;
let total_capacity = kv_manager.max_capacity() as u64;
let gpu_cache_usage_perc = if total_capacity > 0 {
active_blocks_count as f32 / total_capacity as f32
} else {
0.0
};
// Get hit rate metrics - O(1) access
let gpu_prefix_cache_hit_rate = hit_rates.mean();
let worker_stats = WorkerStats {
data_parallel_rank: Some(dp_rank),
request_active_slots,
request_total_slots: 1024, // vllm max_num_seqs for gpu >= 70 vram, otherwise 256, fallback is 128
num_requests_waiting,
};
let kv_stats = KvStats {
kv_active_blocks: active_blocks_count,
kv_total_blocks: total_capacity,
gpu_cache_usage_perc,
gpu_prefix_cache_hit_rate,
};
let spec_decode_stats = None;
ForwardPassMetrics {
worker_stats,
kv_stats,
spec_decode_stats,
}
}
/// Attempts to schedule waiting requests from the state queue.
/// Returns the number of requests successfully scheduled.
fn try_schedule(
......@@ -656,27 +617,12 @@ mod tests {
use std::time::Duration;
use tokio::time::interval;
/// Helper function to verify that the scheduler is idle (no active or waiting requests/resources)
fn assert_scheduler_idle(metrics: &ForwardPassMetrics) {
assert_eq!(
metrics.worker_stats.request_active_slots, 0,
"Expected 0 active slots, got {}",
metrics.worker_stats.request_active_slots
);
assert_eq!(
metrics.worker_stats.num_requests_waiting, 0,
"Expected 0 waiting requests, got {}",
metrics.worker_stats.num_requests_waiting
);
/// Helper function to verify that the scheduler is idle (no active KV blocks)
fn assert_scheduler_idle(metrics: &MockerMetrics) {
assert_eq!(
metrics.kv_stats.kv_active_blocks, 0,
metrics.active_decode_blocks, 0,
"Expected 0 active blocks, got {}",
metrics.kv_stats.kv_active_blocks
);
assert_eq!(
metrics.kv_stats.gpu_cache_usage_perc, 0.0,
"Expected 0% GPU cache usage, got {}",
metrics.kv_stats.gpu_cache_usage_perc
metrics.active_decode_blocks
);
}
......@@ -893,21 +839,11 @@ mod tests {
// Wait a bit for final metrics update
tokio::time::sleep(Duration::from_millis(100)).await;
// Verify forward pass metrics
// Verify forward pass metrics - scheduler should be idle after completing all requests
let metrics = metrics_rx.borrow().clone();
assert_scheduler_idle(&metrics);
assert!(
metrics.kv_stats.gpu_prefix_cache_hit_rate > 0.8,
"Expected cache hit rate > 0.8, got {}",
metrics.kv_stats.gpu_prefix_cache_hit_rate
);
println!(
"Test passed! Cache hit rate: {:.3}",
metrics.kv_stats.gpu_prefix_cache_hit_rate
);
println!("Received {received_tokens} tokens");
println!("Test passed! Received {received_tokens} tokens");
}
#[tokio::test]
......
......@@ -302,39 +302,6 @@ pub mod kvbm {
pub const OBJECT_WRITE_FAILURES: &str = "object_write_failures";
}
/// KvStats metrics from LLM workers
pub mod kvstats {
/// Macro to generate KvStats metric names with the prefix
macro_rules! kvstats_name {
($name:expr) => {
concat!("kvstats_", $name)
};
}
/// Prefix for all KvStats metrics
pub const PREFIX: &str = kvstats_name!("");
/// Number of active KV cache blocks currently in use
pub const ACTIVE_BLOCKS: &str = kvstats_name!("active_blocks");
/// Total number of KV cache blocks available
pub const TOTAL_BLOCKS: &str = kvstats_name!("total_blocks");
/// GPU cache usage as a percentage (0.0-1.0)
pub const GPU_CACHE_USAGE_PERCENT: &str = kvstats_name!("gpu_cache_usage_percent");
/// GPU prefix cache hit rate as a percentage (0.0-1.0)
pub const GPU_PREFIX_CACHE_HIT_RATE: &str = kvstats_name!("gpu_prefix_cache_hit_rate");
}
/// All KvStats Prometheus metric names as an array for iteration/validation
pub const KVSTATS_METRICS: &[&str] = &[
kvstats::ACTIVE_BLOCKS,
kvstats::TOTAL_BLOCKS,
kvstats::GPU_CACHE_USAGE_PERCENT,
kvstats::GPU_PREFIX_CACHE_HIT_RATE,
];
// KvRouter (including KvInexer) Prometheus metric names
pub mod kvrouter {
/// Number of KV cache events applied to the index (including status)
......
......@@ -582,9 +582,9 @@ class MetricsPayload(BasePayload):
name=f"{prefix}_*",
pattern=lambda name: rf"^{prefix}_\w+",
validator=lambda value: len(set(value))
>= 11, # 80% of typical ~17 metrics (excluding _bucket) as of 2025-12-02
error_msg=lambda name, value: f"Expected at least 11 unique {prefix}_* metrics, but found only {len(set(value))}",
success_msg=lambda name, value: f"SUCCESS: Found {len(set(value))} unique {prefix}_* metrics (minimum required: 11)",
>= 7, # 80% of typical ~13 metrics (excluding _bucket and removed kvstats metrics)
error_msg=lambda name, value: f"Expected at least 7 unique {prefix}_* metrics, but found only {len(set(value))}",
success_msg=lambda name, value: f"SUCCESS: Found {len(set(value))} unique {prefix}_* metrics (minimum required: 7)",
multiline=True,
),
MetricCheck(
......@@ -601,14 +601,6 @@ class MetricsPayload(BasePayload):
error_msg=lambda name, value: f"{name} should be > 0, but got {value}",
success_msg=lambda name, value: f"SUCCESS: Found {name} = {value}s",
),
MetricCheck(
name=f"{prefix}_{prometheus_names.kvstats.TOTAL_BLOCKS}",
pattern=metric_pattern,
validator=lambda value: int(float(value))
>= 0, # Allow 0 for SGLang (hardcoded issue in components/src/dynamo/sglang/publisher.py:70)
error_msg=lambda name, value: f"{name} should be >= 0, but got {value}",
success_msg=lambda name, value: f"SUCCESS: Found {name} = {value}",
),
]
# Add backend-specific metric checks
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment