Unverified Commit 439e977d authored by jain-ria's avatar jain-ria Committed by GitHub
Browse files

feat: vllm speculative decoding metrics (#1549)


Signed-off-by: default avatarjain-ria <riajain@NVIDIA.com>
Co-authored-by: default avatarAlec <35311602+alec-flowers@users.noreply.github.com>
parent a9c0e0c7
...@@ -14,7 +14,9 @@ ...@@ -14,7 +14,9 @@
// limitations under the License. // limitations under the License.
use dynamo_llm::kv_router::{ use dynamo_llm::kv_router::{
protocols::ForwardPassMetrics, scheduler::KVHitRateEvent, KV_HIT_RATE_SUBJECT, protocols::{ForwardPassMetrics, KvStats, WorkerStats},
scheduler::KVHitRateEvent,
KV_HIT_RATE_SUBJECT,
}; };
use dynamo_runtime::{ use dynamo_runtime::{
component::{service::EndpointStats, Namespace}, component::{service::EndpointStats, Namespace},
...@@ -114,16 +116,28 @@ fn mock_stats_handler(_stats: EndpointStats) -> serde_json::Value { ...@@ -114,16 +116,28 @@ fn mock_stats_handler(_stats: EndpointStats) -> serde_json::Value {
let num_requests_waiting = rand::rng().random_range(0..=100); let num_requests_waiting = rand::rng().random_range(0..=100);
let gpu_cache_usage_perc = rand::rng().random_range(0.0..=1.0); let gpu_cache_usage_perc = rand::rng().random_range(0.0..=1.0);
let gpu_prefix_cache_hit_rate = rand::rng().random_range(0.0..=1.0); let gpu_prefix_cache_hit_rate = rand::rng().random_range(0.0..=1.0);
let stats = ForwardPassMetrics {
let worker_stats = WorkerStats {
data_parallel_rank: None, // Default for backwards compatibility data_parallel_rank: None, // Default for backwards compatibility
request_active_slots, request_active_slots,
request_total_slots, request_total_slots,
num_requests_waiting,
};
let kv_stats = KvStats {
kv_active_blocks, kv_active_blocks,
kv_total_blocks, kv_total_blocks,
num_requests_waiting,
gpu_cache_usage_perc, gpu_cache_usage_perc,
gpu_prefix_cache_hit_rate, gpu_prefix_cache_hit_rate,
}; };
let spec_decode_stats = None;
let stats = ForwardPassMetrics {
worker_stats,
kv_stats,
spec_decode_stats,
};
tracing::info!("Stats: {stats:?}"); tracing::info!("Stats: {stats:?}");
serde_json::to_value(stats).unwrap() serde_json::to_value(stats).unwrap()
} }
......
...@@ -455,31 +455,31 @@ impl PrometheusMetrics { ...@@ -455,31 +455,31 @@ impl PrometheusMetrics {
&self.kv_blocks_active, &self.kv_blocks_active,
config, config,
&worker_id, &worker_id,
metrics.kv_active_blocks as f64, metrics.kv_stats.kv_active_blocks as f64,
); );
self.set_worker_gauge( self.set_worker_gauge(
&self.kv_blocks_total, &self.kv_blocks_total,
config, config,
&worker_id, &worker_id,
metrics.kv_total_blocks as f64, metrics.kv_stats.kv_total_blocks as f64,
); );
self.set_worker_gauge( self.set_worker_gauge(
&self.requests_active, &self.requests_active,
config, config,
&worker_id, &worker_id,
metrics.request_active_slots as f64, metrics.worker_stats.request_active_slots as f64,
); );
self.set_worker_gauge( self.set_worker_gauge(
&self.requests_total, &self.requests_total,
config, config,
&worker_id, &worker_id,
metrics.request_total_slots as f64, metrics.worker_stats.request_total_slots as f64,
); );
self.set_worker_gauge( self.set_worker_gauge(
&self.kv_hit_rate_percent, &self.kv_hit_rate_percent,
config, config,
&worker_id, &worker_id,
metrics.gpu_prefix_cache_hit_rate as f64, metrics.kv_stats.gpu_prefix_cache_hit_rate as f64,
); );
} }
......
...@@ -3392,6 +3392,12 @@ index cafd8150b..6a5e45b4e 100644 ...@@ -3392,6 +3392,12 @@ index cafd8150b..6a5e45b4e 100644
+ num_requests_waiting: int + num_requests_waiting: int
+ gpu_cache_usage_perc: float + gpu_cache_usage_perc: float
+ gpu_prefix_cache_hit_rate: float + gpu_prefix_cache_hit_rate: float
+ spec_decode_draft_acceptance_rate: Optional[float] = None
+ spec_decode_system_efficiency: Optional[float] = None
+ spec_decode_draft_tokens: Optional[int] = None
+ spec_decode_emitted_tokens: Optional[int] = None
+ spec_decode_accepted_tokens: Optional[int] = None
+ spec_decode_num_spec_tokens: Optional[int] = None
diff --git a/vllm/engine/multiprocessing/client.py b/vllm/engine/multiprocessing/client.py diff --git a/vllm/engine/multiprocessing/client.py b/vllm/engine/multiprocessing/client.py
index f058b1329..2fdb5b8bf 100644 index f058b1329..2fdb5b8bf 100644
--- a/vllm/engine/multiprocessing/client.py --- a/vllm/engine/multiprocessing/client.py
......
...@@ -226,7 +226,13 @@ import logging ...@@ -226,7 +226,13 @@ import logging
import random import random
from pydantic import BaseModel from pydantic import BaseModel
from dynamo.llm import WorkerMetricsPublisher from dynamo.llm import (
WorkerMetricsPublisher,
ForwardPassMetrics,
KvStats,
SpecDecodeStats,
WorkerStats
)
from dynamo.sdk import endpoint, service, dynamo_context from dynamo.sdk import endpoint, service, dynamo_context
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -260,18 +266,29 @@ class YourWorker: ...@@ -260,18 +266,29 @@ class YourWorker:
self.gpu_cache_usage_perc = 0.0 self.gpu_cache_usage_perc = 0.0
self.gpu_prefix_cache_hit_rate = 0.0 self.gpu_prefix_cache_hit_rate = 0.0
# Publish some initial metrics to register worker_stats = WorkerStats(
# this worker as a candidate for KV Routing. data_parallel_rank=None,
self.metrics_publisher.publish(
self.request_active_slots, self.request_active_slots,
self.request_total_slots, self.request_total_slots,
self.num_requests_waiting
)
kv_stats = KvStats(
self.kv_active_blocks, self.kv_active_blocks,
self.kv_total_blocks, self.kv_total_blocks,
self.num_requests_waiting,
self.gpu_cache_usage_perc, self.gpu_cache_usage_perc,
self.gpu_prefix_cache_hit_rate, self.gpu_prefix_cache_hit_rate
) )
# Publish some initial metrics to register
# this worker as a candidate for KV Routing.
metrics = ForwardPassMetrics(
worker_stats=worker_stats,
kv_stats=kv_stats,
spec_decode_stats=None,
)
self.metrics_publisher.publish(metrics)
def publish_kv_metrics(self): def publish_kv_metrics(self):
# Populate the frequently changing metrics with random data for # Populate the frequently changing metrics with random data for
# demonstration. These values should be tracked by the implementation, # demonstration. These values should be tracked by the implementation,
...@@ -282,15 +299,26 @@ class YourWorker: ...@@ -282,15 +299,26 @@ class YourWorker:
self.gpu_prefix_cache_hit_rate = random.uniform(0, 1.0) self.gpu_prefix_cache_hit_rate = random.uniform(0, 1.0)
# Publish the metrics with the current state # Publish the metrics with the current state
self.metrics_publisher.publish( worker_stats = WorkerStats(
data_parallel_rank=None,
self.request_active_slots, self.request_active_slots,
self.request_total_slots, self.request_total_slots,
self.num_requests_waiting
)
kv_stats = KvStats(
self.kv_active_blocks, self.kv_active_blocks,
self.kv_total_blocks, self.kv_total_blocks,
self.num_requests_waiting,
self.gpu_cache_usage_perc, self.gpu_cache_usage_perc,
self.gpu_prefix_cache_hit_rate, self.gpu_prefix_cache_hit_rate
)
metrics = ForwardPassMetrics(
worker_stats=worker_stats,
kv_stats=kv_stats,
spec_decode_stats=None,
) )
self.metrics_publisher.publish(metrics)
@endpoint() @endpoint()
async def generate(self, request: RequestType): async def generate(self, request: RequestType):
......
...@@ -31,7 +31,7 @@ from vllm.entrypoints.openai.api_server import ( ...@@ -31,7 +31,7 @@ from vllm.entrypoints.openai.api_server import (
from vllm.remote_prefill import RemotePrefillParams, RemotePrefillRequest from vllm.remote_prefill import RemotePrefillParams, RemotePrefillRequest
from vllm.sampling_params import RequestOutputKind from vllm.sampling_params import RequestOutputKind
from dynamo.llm import WorkerMetricsPublisher from dynamo.llm import ForwardPassMetrics, KvStats, WorkerMetricsPublisher, WorkerStats
from dynamo.sdk import async_on_start, depends, dynamo_context, endpoint, service from dynamo.sdk import async_on_start, depends, dynamo_context, endpoint, service
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -102,17 +102,30 @@ class VllmWorker: ...@@ -102,17 +102,30 @@ class VllmWorker:
else: else:
raise RuntimeError("Failed to initialize engine client") raise RuntimeError("Failed to initialize engine client")
self.engine_client.set_metrics_publisher(self.metrics_publisher) self.engine_client.set_metrics_publisher(self.metrics_publisher)
# Initially send dummy metrics to kick start, # Initially send dummy metrics to kick start,
# vLLM will not update stat until forward pass is triggered # vLLM will not update stat until forward pass is triggered
self.metrics_publisher.publish( worker_stats = WorkerStats(
0, # request_active_slots 0, # request_active_slots
1024, # request_total_slots 1024, # request_total_slots
0, # num_requests_waiting
None, # data_parallel_rank
)
kv_stats = KvStats(
0, # kv_active_blocks 0, # kv_active_blocks
1024, # kv_total_blocks 1024, # kv_total_blocks
0, # num_requests_waiting
0.0, # gpu_cache_usage_perc 0.0, # gpu_cache_usage_perc
0.0, # gpu_prefix_cache_hit_rate 0.0, # gpu_prefix_cache_hit_rate
) )
metrics = ForwardPassMetrics(
worker_stats=worker_stats,
kv_stats=kv_stats,
spec_decode_stats=None,
)
self.metrics_publisher.publish(metrics)
task = asyncio.create_task(self.create_metrics_publisher_endpoint()) task = asyncio.create_task(self.create_metrics_publisher_endpoint())
task.add_done_callback( task.add_done_callback(
lambda _: logger.info("metrics publisher endpoint created") lambda _: logger.info("metrics publisher endpoint created")
......
...@@ -17,8 +17,11 @@ from utils.protocol import DisaggPreprocessedRequest ...@@ -17,8 +17,11 @@ from utils.protocol import DisaggPreprocessedRequest
from utils.sgl_utils import parse_sglang_args_inc from utils.sgl_utils import parse_sglang_args_inc
from dynamo.llm import ( from dynamo.llm import (
ForwardPassMetrics,
KvStats,
ModelType, ModelType,
WorkerMetricsPublisher, WorkerMetricsPublisher,
WorkerStats,
ZmqKvEventPublisher, ZmqKvEventPublisher,
ZmqKvEventPublisherConfig, ZmqKvEventPublisherConfig,
register_llm, register_llm,
...@@ -57,15 +60,26 @@ class RequestHandler: ...@@ -57,15 +60,26 @@ class RequestHandler:
def setup_metrics(self): def setup_metrics(self):
"""Set up metrics publisher - call this after handler creation""" """Set up metrics publisher - call this after handler creation"""
self.metrics_publisher.publish( worker_stats = WorkerStats(
request_active_slots=0, request_active_slots=0,
request_total_slots=1024, request_total_slots=1024,
num_requests_waiting=0,
data_parallel_rank=None,
)
kv_stats = KvStats(
kv_active_blocks=0, kv_active_blocks=0,
kv_total_blocks=1024, kv_total_blocks=1024,
num_requests_waiting=0,
gpu_cache_usage_perc=0.0, gpu_cache_usage_perc=0.0,
gpu_prefix_cache_hit_rate=0.0, gpu_prefix_cache_hit_rate=0.0,
) )
metrics = ForwardPassMetrics(
worker_stats=worker_stats,
kv_stats=kv_stats,
spec_decode_stats=None,
)
self.metrics_publisher.publish(metrics)
task = asyncio.create_task(self.create_metrics_publisher_endpoint()) task = asyncio.create_task(self.create_metrics_publisher_endpoint())
task.add_done_callback( task.add_done_callback(
lambda _: logging.debug("metrics publisher endpoint created") lambda _: logging.debug("metrics publisher endpoint created")
...@@ -82,16 +96,31 @@ class RequestHandler: ...@@ -82,16 +96,31 @@ class RequestHandler:
logging.warning( logging.warning(
"Publishing placeholder metrics in SGLangWorker; these are NOT real engine metrics yet and will be replaced once upstream support lands." "Publishing placeholder metrics in SGLangWorker; these are NOT real engine metrics yet and will be replaced once upstream support lands."
) )
self.metrics_publisher.publish(
request_active_slots=1, worker_stats = WorkerStats(
request_total_slots=100, request_active_slots=0,
request_total_slots=1024,
num_requests_waiting=0,
data_parallel_rank=None,
)
kv_stats = KvStats(
kv_active_blocks=random.randint(0, 500), kv_active_blocks=random.randint(0, 500),
kv_total_blocks=1000, kv_total_blocks=1000,
num_requests_waiting=0,
gpu_cache_usage_perc=random.uniform(0.1, 0.8), gpu_cache_usage_perc=random.uniform(0.1, 0.8),
gpu_prefix_cache_hit_rate=random.uniform(0.0, 0.5), gpu_prefix_cache_hit_rate=random.uniform(0.0, 0.5),
) )
# TODO: get spec_dec_stats from sglang once real engine metrics are available
spec_dec_stats = None
metrics = ForwardPassMetrics(
worker_stats=worker_stats,
kv_stats=kv_stats,
spec_decode_stats=spec_dec_stats,
)
self.metrics_publisher.publish(metrics)
def _get_bootstrap_info(self): def _get_bootstrap_info(self):
"""Bootstrap info from tokenizer manager""" """Bootstrap info from tokenizer manager"""
inner_tm = self.engine.tokenizer_manager inner_tm = self.engine.tokenizer_manager
......
...@@ -33,8 +33,12 @@ from vllm.v1.metrics.loggers import StatLoggerBase ...@@ -33,8 +33,12 @@ from vllm.v1.metrics.loggers import StatLoggerBase
from vllm.v1.metrics.stats import IterationStats, SchedulerStats from vllm.v1.metrics.stats import IterationStats, SchedulerStats
from dynamo.llm import ( from dynamo.llm import (
ForwardPassMetrics,
KvStats,
ModelType, ModelType,
SpecDecodeStats,
WorkerMetricsPublisher, WorkerMetricsPublisher,
WorkerStats,
ZmqKvEventPublisher, ZmqKvEventPublisher,
ZmqKvEventPublisherConfig, ZmqKvEventPublisherConfig,
register_llm, register_llm,
...@@ -85,18 +89,37 @@ class DynamoStatLoggerPublisher(StatLoggerBase): ...@@ -85,18 +89,37 @@ class DynamoStatLoggerPublisher(StatLoggerBase):
/ scheduler_stats.prefix_cache_stats.queries / scheduler_stats.prefix_cache_stats.queries
) )
# TODO Manage DP Ranks in metrics aggregation. worker_stats = WorkerStats(
self.inner.publish(
request_active_slots=scheduler_stats.num_running_reqs, request_active_slots=scheduler_stats.num_running_reqs,
request_total_slots=0, # TODO - remove from metrics request_total_slots=0, # TODO - remove from metrics
num_requests_waiting=scheduler_stats.num_waiting_reqs,
data_parallel_rank=None,
)
kv_stats = KvStats(
kv_active_blocks=0, # TODO - need to calculate this kv_active_blocks=0, # TODO - need to calculate this
kv_total_blocks=0, # TODO - remove from metrics kv_total_blocks=0, # TODO - remove from metrics
num_requests_waiting=scheduler_stats.num_waiting_reqs, # used in current cost function
gpu_cache_usage_perc=scheduler_stats.gpu_cache_usage, # used in current cost function gpu_cache_usage_perc=scheduler_stats.gpu_cache_usage, # used in current cost function
gpu_prefix_cache_hit_rate=hit_rate, gpu_prefix_cache_hit_rate=hit_rate,
data_parallel_rank=self.dp_rank,
) )
spec_dec_stats = scheduler_stats.spec_decoding_stats
if spec_dec_stats:
spec_dec_stats = SpecDecodeStats(
num_spec_tokens=spec_dec_stats.num_spec_tokens,
num_drafts=spec_dec_stats.num_drafts,
num_draft_tokens=spec_dec_stats.num_draft_tokens,
num_accepted_tokens=spec_dec_stats.num_accepted_tokens,
num_accepted_tokens_per_pos=spec_dec_stats.num_accepted_tokens_per_pos,
)
metrics = ForwardPassMetrics(
worker_stats=worker_stats,
kv_stats=kv_stats,
spec_decode_stats=spec_dec_stats,
)
self.inner.publish(metrics)
def log_engine_initialized(self) -> None: def log_engine_initialized(self) -> None:
pass pass
......
...@@ -96,6 +96,10 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> { ...@@ -96,6 +96,10 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<http::HttpAsyncEngine>()?; m.add_class::<http::HttpAsyncEngine>()?;
m.add_class::<EtcdKvCache>()?; m.add_class::<EtcdKvCache>()?;
m.add_class::<ModelType>()?; m.add_class::<ModelType>()?;
m.add_class::<llm::kv::ForwardPassMetrics>()?;
m.add_class::<llm::kv::WorkerStats>()?;
m.add_class::<llm::kv::KvStats>()?;
m.add_class::<llm::kv::SpecDecodeStats>()?;
m.add_class::<RouterMode>()?; m.add_class::<RouterMode>()?;
engine::add_to_module(m)?; engine::add_to_module(m)?;
......
...@@ -19,6 +19,10 @@ use std::sync::atomic::AtomicU32; ...@@ -19,6 +19,10 @@ use std::sync::atomic::AtomicU32;
use super::*; use super::*;
use llm_rs::kv_router::indexer::compute_block_hash_for_seq; use llm_rs::kv_router::indexer::compute_block_hash_for_seq;
use llm_rs::kv_router::indexer::KvIndexerInterface; use llm_rs::kv_router::indexer::KvIndexerInterface;
use llm_rs::kv_router::protocols::ForwardPassMetrics as RsForwardPassMetrics;
use llm_rs::kv_router::protocols::KvStats as RsKvStats;
use llm_rs::kv_router::protocols::SpecDecodeStats as RsSpecDecodeStats;
use llm_rs::kv_router::protocols::WorkerStats as RsWorkerStats;
use rs::traits::events::EventSubscriber; use rs::traits::events::EventSubscriber;
use tracing; use tracing;
...@@ -113,34 +117,11 @@ impl WorkerMetricsPublisher { ...@@ -113,34 +117,11 @@ impl WorkerMetricsPublisher {
}) })
} }
#[allow(clippy::too_many_arguments)] #[pyo3(signature = (metrics))]
#[pyo3(signature = (request_active_slots, request_total_slots, kv_active_blocks, kv_total_blocks, num_requests_waiting, gpu_cache_usage_perc, gpu_prefix_cache_hit_rate, data_parallel_rank = 0))] fn publish(&self, _py: Python, metrics: &ForwardPassMetrics) -> PyResult<()> {
fn publish( // Create and publish the complete metrics
&self,
_py: Python,
request_active_slots: u64,
request_total_slots: u64,
kv_active_blocks: u64,
kv_total_blocks: u64,
num_requests_waiting: u64,
gpu_cache_usage_perc: f32,
gpu_prefix_cache_hit_rate: f32,
data_parallel_rank: u32,
) -> PyResult<()> {
self.inner self.inner
.publish( .publish(metrics.0.clone().into())
llm_rs::kv_router::protocols::ForwardPassMetrics {
data_parallel_rank: Some(data_parallel_rank),
request_active_slots,
request_total_slots,
kv_active_blocks,
kv_total_blocks,
num_requests_waiting,
gpu_cache_usage_perc,
gpu_prefix_cache_hit_rate,
}
.into(),
)
.map_err(to_pyerr) .map_err(to_pyerr)
} }
} }
...@@ -634,19 +615,20 @@ impl KvMetricsAggregator { ...@@ -634,19 +615,20 @@ impl KvMetricsAggregator {
} }
fn get_metrics<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> { fn get_metrics<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
// TODO: update EndpointKvMetrics to match the new ForwardPassMetrics struct
let endpoints = self.inner.get_endpoints(); let endpoints = self.inner.get_endpoints();
let endpoint_kv_metrics = endpoints let endpoint_kv_metrics = endpoints
.endpoints .endpoints
.iter() .iter()
.map(|(worker_id, x)| EndpointKvMetrics { .map(|(worker_id, endpoint)| EndpointKvMetrics {
worker_id: *worker_id, worker_id: *worker_id,
request_active_slots: x.data.request_active_slots, request_active_slots: endpoint.data.worker_stats.request_active_slots,
request_total_slots: x.data.request_total_slots, request_total_slots: endpoint.data.worker_stats.request_total_slots,
kv_active_blocks: x.data.kv_active_blocks, kv_active_blocks: endpoint.data.kv_stats.kv_active_blocks,
kv_total_blocks: x.data.kv_total_blocks, kv_total_blocks: endpoint.data.kv_stats.kv_total_blocks,
num_requests_waiting: x.data.num_requests_waiting, num_requests_waiting: endpoint.data.worker_stats.num_requests_waiting,
gpu_cache_usage_perc: x.data.gpu_cache_usage_perc, gpu_cache_usage_perc: endpoint.data.kv_stats.gpu_cache_usage_perc,
gpu_prefix_cache_hit_rate: x.data.gpu_prefix_cache_hit_rate, gpu_prefix_cache_hit_rate: endpoint.data.kv_stats.gpu_prefix_cache_hit_rate,
}) })
.collect(); .collect();
pyo3_async_runtimes::tokio::future_into_py(py, async move { pyo3_async_runtimes::tokio::future_into_py(py, async move {
...@@ -777,3 +759,95 @@ impl KvRecorder { ...@@ -777,3 +759,95 @@ impl KvRecorder {
Ok(()) Ok(())
} }
} }
#[pyclass]
#[repr(transparent)]
pub struct ForwardPassMetrics(pub RsForwardPassMetrics);
#[pyclass]
#[repr(transparent)]
pub struct WorkerStats(pub RsWorkerStats);
#[pyclass]
#[repr(transparent)]
pub struct KvStats(pub RsKvStats);
#[pyclass]
#[repr(transparent)]
pub struct SpecDecodeStats(pub RsSpecDecodeStats);
#[pymethods]
impl ForwardPassMetrics {
#[new]
#[pyo3(signature = (worker_stats, kv_stats, spec_decode_stats = None))]
fn new(
worker_stats: &WorkerStats,
kv_stats: &KvStats,
spec_decode_stats: Option<&SpecDecodeStats>,
) -> Self {
Self(RsForwardPassMetrics {
worker_stats: worker_stats.0.clone(),
kv_stats: kv_stats.0.clone(),
spec_decode_stats: spec_decode_stats.map(|s| s.0.clone()),
})
}
}
#[pymethods]
impl WorkerStats {
#[new]
#[pyo3(signature = (request_active_slots, request_total_slots, num_requests_waiting, data_parallel_rank=None))]
fn new(
request_active_slots: u64,
request_total_slots: u64,
num_requests_waiting: u64,
data_parallel_rank: Option<u32>,
) -> Self {
Self(RsWorkerStats {
data_parallel_rank,
request_active_slots,
request_total_slots,
num_requests_waiting,
})
}
}
#[pymethods]
impl KvStats {
#[new]
#[pyo3(signature = (kv_active_blocks, kv_total_blocks, gpu_cache_usage_perc, gpu_prefix_cache_hit_rate))]
fn new(
kv_active_blocks: u64,
kv_total_blocks: u64,
gpu_cache_usage_perc: f32,
gpu_prefix_cache_hit_rate: f32,
) -> Self {
Self(RsKvStats {
kv_active_blocks,
kv_total_blocks,
gpu_cache_usage_perc,
gpu_prefix_cache_hit_rate,
})
}
}
#[pymethods]
impl SpecDecodeStats {
#[new]
#[pyo3(signature = (num_spec_tokens, num_drafts, num_draft_tokens, num_accepted_tokens, num_accepted_tokens_per_pos))]
fn new(
num_spec_tokens: Option<u32>,
num_drafts: Option<u32>,
num_draft_tokens: Option<u32>,
num_accepted_tokens: Option<u32>,
num_accepted_tokens_per_pos: Option<Vec<u32>>,
) -> Self {
Self(RsSpecDecodeStats {
num_spec_tokens,
num_drafts,
num_draft_tokens,
num_accepted_tokens,
num_accepted_tokens_per_pos,
})
}
}
...@@ -355,8 +355,86 @@ def compute_block_hash_for_seq_py(tokens: List[int], kv_block_size: int) -> List ...@@ -355,8 +355,86 @@ def compute_block_hash_for_seq_py(tokens: List[int], kv_block_size: int) -> List
Returns: Returns:
List of block hashes as integers List of block hashes as integers
""" """
...
class WorkerStats:
"""
Worker stats.
"""
...
def __init__(
self,
request_active_slots: int,
request_total_slots: int,
num_requests_waiting: int,
data_parallel_rank: Optional[int] = None,
) -> None:
"""
Create a `WorkerStats` object.
"""
...
class KvStats:
"""
KV stats.
"""
... ...
def __init__(
self,
kv_active_blocks: int,
kv_total_blocks: int,
gpu_cache_usage_perc: float,
gpu_prefix_cache_hit_rate: float,
) -> None:
"""
Create a `KvStats` object.
"""
...
class SpecDecodeStats:
"""
Speculative decoding stats.
"""
...
def __init__(
self,
num_spec_tokens: int,
num_drafts: int,
num_draft_tokens: int,
num_accepted_tokens: int,
num_accepted_tokens_per_pos: List[int],
) -> None:
"""
Create a `SpecDecodeStats` object when running with speculative decoding.
"""
...
class ForwardPassMetrics:
"""
A collection of metrics for a forward pass.
Includes worker stats, KV stats, and speculative decoding stats.
"""
...
def __init__(
self,
worker_stats: WorkerStats,
kv_stats: KvStats,
spec_decode_stats: Optional[SpecDecodeStats] = None,
) -> None:
"""
Create a `ForwardPassMetrics` object
"""
...
class WorkerMetricsPublisher: class WorkerMetricsPublisher:
""" """
A metrics publisher will provide metrics to the router. A metrics publisher will provide metrics to the router.
...@@ -377,17 +455,10 @@ class WorkerMetricsPublisher: ...@@ -377,17 +455,10 @@ class WorkerMetricsPublisher:
def publish( def publish(
self, self,
request_active_slots: int, metrics: ForwardPassMetrics
request_total_slots: int,
kv_active_blocks: int,
kv_total_blocks: int,
num_requests_waiting: int,
gpu_cache_usage_perc: float,
gpu_prefix_cache_hit_rate: float,
data_parallel_rank: int = 0,
) -> None: ) -> None:
""" """
Update the KV metrics being reported. Update the metrics being reported.
""" """
... ...
......
...@@ -24,6 +24,7 @@ except ImportError: ...@@ -24,6 +24,7 @@ except ImportError:
from dynamo._core import ApproxKvIndexer as ApproxKvIndexer from dynamo._core import ApproxKvIndexer as ApproxKvIndexer
from dynamo._core import DisaggregatedRouter as DisaggregatedRouter from dynamo._core import DisaggregatedRouter as DisaggregatedRouter
from dynamo._core import ForwardPassMetrics as ForwardPassMetrics
from dynamo._core import HttpAsyncEngine as HttpAsyncEngine from dynamo._core import HttpAsyncEngine as HttpAsyncEngine
from dynamo._core import HttpError as HttpError from dynamo._core import HttpError as HttpError
from dynamo._core import HttpService as HttpService from dynamo._core import HttpService as HttpService
...@@ -32,10 +33,13 @@ from dynamo._core import KvIndexer as KvIndexer ...@@ -32,10 +33,13 @@ from dynamo._core import KvIndexer as KvIndexer
from dynamo._core import KvMetricsAggregator as KvMetricsAggregator from dynamo._core import KvMetricsAggregator as KvMetricsAggregator
from dynamo._core import KvRecorder as KvRecorder from dynamo._core import KvRecorder as KvRecorder
from dynamo._core import KvRouter as KvRouter from dynamo._core import KvRouter as KvRouter
from dynamo._core import KvStats as KvStats
from dynamo._core import ModelType as ModelType from dynamo._core import ModelType as ModelType
from dynamo._core import OverlapScores as OverlapScores from dynamo._core import OverlapScores as OverlapScores
from dynamo._core import RadixTree as RadixTree from dynamo._core import RadixTree as RadixTree
from dynamo._core import SpecDecodeStats as SpecDecodeStats
from dynamo._core import WorkerMetricsPublisher as WorkerMetricsPublisher from dynamo._core import WorkerMetricsPublisher as WorkerMetricsPublisher
from dynamo._core import WorkerStats as WorkerStats
from dynamo._core import ZmqKvEventListener as ZmqKvEventListener from dynamo._core import ZmqKvEventListener as ZmqKvEventListener
from dynamo._core import ZmqKvEventPublisher as ZmqKvEventPublisher from dynamo._core import ZmqKvEventPublisher as ZmqKvEventPublisher
from dynamo._core import ZmqKvEventPublisherConfig as ZmqKvEventPublisherConfig from dynamo._core import ZmqKvEventPublisherConfig as ZmqKvEventPublisherConfig
......
...@@ -11,7 +11,13 @@ from contextlib import asynccontextmanager ...@@ -11,7 +11,13 @@ from contextlib import asynccontextmanager
from queue import Queue from queue import Queue
from typing import Callable, Optional, Union from typing import Callable, Optional, Union
from dynamo.llm import KvEventPublisher, WorkerMetricsPublisher from dynamo.llm import (
ForwardPassMetrics,
KvEventPublisher,
KvStats,
WorkerMetricsPublisher,
WorkerStats,
)
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
...@@ -157,16 +163,27 @@ class Publisher: ...@@ -157,16 +163,27 @@ class Publisher:
logging.error("KV metrics publisher not initialized!") logging.error("KV metrics publisher not initialized!")
return return
self.metrics_publisher.publish( worker_stats = WorkerStats(
request_active_slots, request_active_slots=request_active_slots,
request_total_slots, request_total_slots=request_total_slots,
kv_active_block, num_requests_waiting=num_requests_waiting,
kv_total_blocks, data_parallel_rank=None,
num_requests_waiting,
gpu_cache_usage_perc,
gpu_prefix_cache_hit_rate,
) )
kv_stats = KvStats(
kv_active_blocks=kv_active_block,
kv_total_blocks=kv_total_blocks,
gpu_cache_usage_perc=gpu_cache_usage_perc,
gpu_prefix_cache_hit_rate=gpu_prefix_cache_hit_rate,
)
metrics = ForwardPassMetrics(
worker_stats=worker_stats,
kv_stats=kv_stats,
spec_decode_stats=None,
)
self.metrics_publisher.publish(metrics)
# Prepare threads for publishing stats but don't start them yet. # Prepare threads for publishing stats but don't start them yet.
# TRTLLM needs to start generating tokens first before stats # TRTLLM needs to start generating tokens first before stats
# can be retrieved. # can be retrieved.
...@@ -224,15 +241,29 @@ class Publisher: ...@@ -224,15 +241,29 @@ class Publisher:
f"Publishing stats: request_active_slots: {request_active_slots}, request_total_slots: {request_total_slots}, kv_active_block: {kv_active_block}, kv_total_blocks: {kv_total_blocks}, num_requests_waiting: {num_requests_waiting}, reused_blocks: {reused_blocks}, freeNumBlocks: {freeNumBlocks}, allocTotalBlocks: {allocTotalBlocks}, allocNewBlocks: {allocNewBlocks}, gpu_cache_usage_perc: {gpu_cache_usage_perc}, gpu_prefix_cache_hit_rate: {gpu_prefix_cache_hit_rate}" f"Publishing stats: request_active_slots: {request_active_slots}, request_total_slots: {request_total_slots}, kv_active_block: {kv_active_block}, kv_total_blocks: {kv_total_blocks}, num_requests_waiting: {num_requests_waiting}, reused_blocks: {reused_blocks}, freeNumBlocks: {freeNumBlocks}, allocTotalBlocks: {allocTotalBlocks}, allocNewBlocks: {allocNewBlocks}, gpu_cache_usage_perc: {gpu_cache_usage_perc}, gpu_prefix_cache_hit_rate: {gpu_prefix_cache_hit_rate}"
) )
self.metrics_publisher.publish( worker_stats = WorkerStats(
request_active_slots, request_active_slots=request_active_slots,
request_total_slots, request_total_slots=request_total_slots,
kv_active_block, num_requests_waiting=num_requests_waiting,
kv_total_blocks, data_parallel_rank=None,
num_requests_waiting, )
gpu_cache_usage_perc,
gpu_prefix_cache_hit_rate, kv_stats = KvStats(
kv_active_blocks=kv_active_block,
kv_total_blocks=kv_total_blocks,
gpu_cache_usage_perc=gpu_cache_usage_perc,
gpu_prefix_cache_hit_rate=gpu_prefix_cache_hit_rate,
)
# TODO: get spec_decode_stats from engine
spec_decode_stats = None
metrics = ForwardPassMetrics(
worker_stats=worker_stats,
kv_stats=kv_stats,
spec_decode_stats=spec_decode_stats,
) )
self.metrics_publisher.publish(metrics)
return True return True
......
...@@ -26,11 +26,14 @@ import pytest ...@@ -26,11 +26,14 @@ import pytest
from dynamo.llm import ( from dynamo.llm import (
ApproxKvIndexer, ApproxKvIndexer,
ForwardPassMetrics,
KvEventPublisher, KvEventPublisher,
KvIndexer, KvIndexer,
KvMetricsAggregator, KvMetricsAggregator,
KvStats,
RadixTree, RadixTree,
WorkerMetricsPublisher, WorkerMetricsPublisher,
WorkerStats,
) )
from dynamo.runtime import Component, DistributedRuntime from dynamo.runtime import Component, DistributedRuntime
...@@ -332,14 +335,31 @@ async def test_metrics_aggregator(distributed_runtime): ...@@ -332,14 +335,31 @@ async def test_metrics_aggregator(distributed_runtime):
async def metrics_publisher_task(kv_listener, expected_metrics): async def metrics_publisher_task(kv_listener, expected_metrics):
# Construct the structured ForwardPassMetrics payload expected by the
# current Rust bindings instead of passing the individual scalar values
# directly. The API for `WorkerMetricsPublisher.publish`
# changed from a list of positional scalars to a single
# `ForwardPassMetrics` object.
metrics_publisher = WorkerMetricsPublisher() metrics_publisher = WorkerMetricsPublisher()
metrics_publisher.publish(
worker_stats = WorkerStats(
expected_metrics["request_active_slots"], expected_metrics["request_active_slots"],
expected_metrics["request_total_slots"], expected_metrics["request_total_slots"],
expected_metrics["num_requests_waiting"],
None,
)
kv_stats = KvStats(
expected_metrics["kv_active_blocks"], expected_metrics["kv_active_blocks"],
expected_metrics["kv_total_blocks"], expected_metrics["kv_total_blocks"],
expected_metrics["num_requests_waiting"],
expected_metrics["gpu_cache_usage_perc"], expected_metrics["gpu_cache_usage_perc"],
expected_metrics["gpu_prefix_cache_hit_rate"], expected_metrics["gpu_prefix_cache_hit_rate"],
) )
metrics = ForwardPassMetrics(worker_stats, kv_stats, None)
# Publish and expose the metrics via the endpoint so that the aggregator
# test can discover them.
metrics_publisher.publish(metrics)
await metrics_publisher.create_endpoint(kv_listener) await metrics_publisher.create_endpoint(kv_listener)
...@@ -41,21 +41,39 @@ pub struct WorkerSelectionResult { ...@@ -41,21 +41,39 @@ pub struct WorkerSelectionResult {
#[derive(Debug, Clone, Serialize, Deserialize, Default)] #[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ForwardPassMetrics { pub struct ForwardPassMetrics {
pub worker_stats: WorkerStats,
pub kv_stats: KvStats,
pub spec_decode_stats: Option<SpecDecodeStats>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WorkerStats {
// https://lmsys.org/blog/2024-12-04-sglang-v0-4/#data-parallelism-attention-for-deepseek-models // https://lmsys.org/blog/2024-12-04-sglang-v0-4/#data-parallelism-attention-for-deepseek-models
// Data parallel ranks are semi-independent, so we need to track metrics at the DP level pub data_parallel_rank: Option<u32>,
pub data_parallel_rank: Option<u32>, // Optional for backwards compatibility
pub request_active_slots: u64, pub request_active_slots: u64,
pub request_total_slots: u64, pub request_total_slots: u64,
pub num_requests_waiting: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct KvStats {
pub kv_active_blocks: u64, pub kv_active_blocks: u64,
pub kv_total_blocks: u64, pub kv_total_blocks: u64,
// integer from 0 to large number
pub num_requests_waiting: u64,
// percentage represented as a float from 0 to 1 // percentage represented as a float from 0 to 1
pub gpu_cache_usage_perc: f32, pub gpu_cache_usage_perc: f32,
// percentage represented as a float from 0 to 1 // percentage represented as a float from 0 to 1
pub gpu_prefix_cache_hit_rate: f32, pub gpu_prefix_cache_hit_rate: f32,
} }
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct SpecDecodeStats {
pub num_spec_tokens: Option<u32>,
pub num_drafts: Option<u32>,
pub num_draft_tokens: Option<u32>,
pub num_accepted_tokens: Option<u32>,
pub num_accepted_tokens_per_pos: Option<Vec<u32>>,
}
/// A [`LocalBlockHash`] is a hash computed from the tokens_ids, extra_token_ids and the optional /// A [`LocalBlockHash`] is a hash computed from the tokens_ids, extra_token_ids and the optional
/// lora_id of a block. /// lora_id of a block.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Ord, PartialOrd)] #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Ord, PartialOrd)]
......
...@@ -23,7 +23,7 @@ use std::collections::HashMap; ...@@ -23,7 +23,7 @@ use std::collections::HashMap;
use super::protocols::WorkerSelectionResult; use super::protocols::WorkerSelectionResult;
use super::WorkerSelector; use super::WorkerSelector;
use crate::kv_router::indexer::OverlapScores; use crate::kv_router::indexer::OverlapScores;
pub use crate::kv_router::protocols::ForwardPassMetrics; use crate::kv_router::protocols::ForwardPassMetrics;
use crate::kv_router::scoring::ProcessedEndpoints; use crate::kv_router::scoring::ProcessedEndpoints;
use crate::kv_router::KvRouterConfig; use crate::kv_router::KvRouterConfig;
use crate::kv_router::KV_HIT_RATE_SUBJECT; use crate::kv_router::KV_HIT_RATE_SUBJECT;
...@@ -211,7 +211,8 @@ pub fn process_worker_selection( ...@@ -211,7 +211,8 @@ pub fn process_worker_selection(
// Update worker state predictively // Update worker state predictively
// Will be overwritten on next polling of metrics // Will be overwritten on next polling of metrics
worker.data.kv_active_blocks += selection
worker.data.kv_stats.kv_active_blocks += selection
.required_blocks .required_blocks
.saturating_sub(selection.overlap_blocks as u64); .saturating_sub(selection.overlap_blocks as u64);
...@@ -319,12 +320,12 @@ impl WorkerSelector for DefaultWorkerSelector { ...@@ -319,12 +320,12 @@ impl WorkerSelector for DefaultWorkerSelector {
request.overlap.scores.get(&worker_id).copied().unwrap_or(0) as f64; request.overlap.scores.get(&worker_id).copied().unwrap_or(0) as f64;
let new_blocks = request_blocks as f64 - overlap_blocks; let new_blocks = request_blocks as f64 - overlap_blocks;
let kv_total_blocks = ep.data.kv_total_blocks as f64; let kv_total_blocks = ep.data.kv_stats.kv_total_blocks as f64;
assert!(kv_total_blocks > 0.0); assert!(kv_total_blocks > 0.0);
let normalized_new_blocks = new_blocks / kv_total_blocks; let normalized_new_blocks = new_blocks / kv_total_blocks;
let gpu_cache_usage = (ep.data.kv_active_blocks as f64) / kv_total_blocks; let gpu_cache_usage = ep.data.kv_stats.gpu_cache_usage_perc as f64;
let num_requests_waiting = ep.data.num_requests_waiting as f64; let num_requests_waiting = ep.data.worker_stats.num_requests_waiting as f64;
// Calculate logit (lower is better) // Calculate logit (lower is better)
let logit = self.kv_router_config.overlap_score_weight * normalized_new_blocks let logit = self.kv_router_config.overlap_score_weight * normalized_new_blocks
...@@ -386,6 +387,7 @@ impl WorkerSelector for DefaultWorkerSelector { ...@@ -386,6 +387,7 @@ impl WorkerSelector for DefaultWorkerSelector {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::kv_router::protocols::{KvStats, WorkerStats};
#[test] #[test]
fn test_softmax_sample_single_key() { fn test_softmax_sample_single_key() {
...@@ -424,8 +426,14 @@ mod tests { ...@@ -424,8 +426,14 @@ mod tests {
name: format!("worker-{}", worker_id), name: format!("worker-{}", worker_id),
subject: format!("worker-subject-{:x}", worker_id), subject: format!("worker-subject-{:x}", worker_id),
data: ForwardPassMetrics { data: ForwardPassMetrics {
gpu_cache_usage_perc, kv_stats: KvStats {
num_requests_waiting, gpu_cache_usage_perc,
..Default::default()
},
worker_stats: WorkerStats {
num_requests_waiting,
..Default::default()
},
// Other fields can be default initialized for this test // Other fields can be default initialized for this test
..Default::default() ..Default::default()
}, },
......
...@@ -32,7 +32,7 @@ impl ProcessedEndpoints { ...@@ -32,7 +32,7 @@ impl ProcessedEndpoints {
// compute some basic statistics // compute some basic statistics
let load_values: Vec<f64> = endpoints let load_values: Vec<f64> = endpoints
.iter() .iter()
.map(|x| x.data.kv_active_blocks as f64) .map(|x| x.data.kv_stats.kv_active_blocks as f64)
.collect(); .collect();
let load_avg = load_values.iter().copied().sum::<f64>() / load_values.len() as f64; let load_avg = load_values.iter().copied().sum::<f64>() / load_values.len() as f64;
let variance = load_values let variance = load_values
......
...@@ -40,7 +40,7 @@ ...@@ -40,7 +40,7 @@
//! ## NOTE //! ## NOTE
//! The current prefill and decoding time simulations are not scientific at all and are WIP //! The current prefill and decoding time simulations are not scientific at all and are WIP
use crate::kv_router::protocols::{ForwardPassMetrics, KvCacheEventData}; use crate::kv_router::protocols::{ForwardPassMetrics, KvCacheEventData, KvStats, WorkerStats};
use crate::mocker::evictor::LRUEvictor; use crate::mocker::evictor::LRUEvictor;
use crate::mocker::kv_manager::KvManager; use crate::mocker::kv_manager::KvManager;
use crate::mocker::protocols::{ use crate::mocker::protocols::{
...@@ -476,16 +476,26 @@ impl Scheduler { ...@@ -476,16 +476,26 @@ impl Scheduler {
sum / hit_rates_guard.len() as f32 sum / hit_rates_guard.len() as f32
}; };
ForwardPassMetrics { let worker_stats = WorkerStats {
data_parallel_rank: self.dp_rank, data_parallel_rank: self.dp_rank,
request_active_slots, request_active_slots,
// vllm max_num_seqs for gpu >= 70 vram, otherwise 256, fallback is 128 request_total_slots: 1024, // vllm max_num_seqs for gpu >= 70 vram, otherwise 256, fallback is 128
request_total_slots: 1024, num_requests_waiting,
};
let kv_stats = KvStats {
kv_active_blocks: active_blocks_count, kv_active_blocks: active_blocks_count,
kv_total_blocks: total_capacity, kv_total_blocks: total_capacity,
num_requests_waiting,
gpu_cache_usage_perc, gpu_cache_usage_perc,
gpu_prefix_cache_hit_rate, gpu_prefix_cache_hit_rate,
};
let spec_decode_stats = None;
ForwardPassMetrics {
worker_stats,
kv_stats,
spec_decode_stats,
} }
// Guards drop naturally here in reverse order (LIFO): hit_rates_guard, kv_manager, state // Guards drop naturally here in reverse order (LIFO): hit_rates_guard, kv_manager, state
} }
...@@ -754,20 +764,20 @@ mod tests { ...@@ -754,20 +764,20 @@ mod tests {
let metrics = scheduler.get_forward_pass_metrics().await; let metrics = scheduler.get_forward_pass_metrics().await;
assert_eq!( assert_eq!(
metrics.num_requests_waiting, 0, metrics.worker_stats.num_requests_waiting, 0,
"Expected no waiting requests, got {}", "Expected no waiting requests, got {}",
metrics.num_requests_waiting metrics.worker_stats.num_requests_waiting
); );
assert!( assert!(
metrics.gpu_prefix_cache_hit_rate > 0.8, metrics.kv_stats.gpu_prefix_cache_hit_rate > 0.8,
"Expected cache hit rate > 0.8, got {}", "Expected cache hit rate > 0.8, got {}",
metrics.gpu_prefix_cache_hit_rate metrics.kv_stats.gpu_prefix_cache_hit_rate
); );
println!( println!(
"Test passed! Cache hit rate: {:.3}", "Test passed! Cache hit rate: {:.3}",
metrics.gpu_prefix_cache_hit_rate metrics.kv_stats.gpu_prefix_cache_hit_rate
); );
println!("Received {received_tokens} tokens"); println!("Received {received_tokens} tokens");
} }
...@@ -823,16 +833,16 @@ mod tests { ...@@ -823,16 +833,16 @@ mod tests {
let metrics = scheduler.get_forward_pass_metrics().await; let metrics = scheduler.get_forward_pass_metrics().await;
assert_eq!( assert_eq!(
metrics.gpu_cache_usage_perc, metrics.kv_stats.gpu_cache_usage_perc,
0.0, 0.0,
"Expected GPU cache usage to be 0%, got {}%", "Expected GPU cache usage to be 0%, got {}%",
metrics.gpu_cache_usage_perc * 100.0 metrics.kv_stats.gpu_cache_usage_perc * 100.0
); );
assert_eq!( assert_eq!(
metrics.kv_active_blocks, 0, metrics.kv_stats.kv_active_blocks, 0,
"Expected 0 active blocks, got {}", "Expected 0 active blocks, got {}",
metrics.kv_active_blocks metrics.kv_stats.kv_active_blocks
); );
} }
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment