Unverified Commit 15539fd0 authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

feat: add Prometheus metrics integration for KvStats (#2704)


Signed-off-by: default avatarKeiven C <213854356+keivenchang@users.noreply.github.com>
parent 37adc0a8
......@@ -85,6 +85,11 @@ impl WorkerMetricsPublisher {
None
};
// Register Prometheus metrics first
rs_publisher
.register_prometheus_metrics(&rs_component)
.map_err(to_pyerr)?;
rs_publisher
.create_endpoint(rs_component, metrics_labels_ref.as_deref())
.await
......
......@@ -35,7 +35,7 @@ testing-nixl = ["dep:nixl-sys"]
testing-etcd = []
block-manager = ["dep:nixl-sys", "dep:cudarc", "dep:ndarray", "dep:nix"]
cuda = ["dep:cudarc"]
integration = []
integration = ["dynamo-runtime/integration"]
# NOTE: This feature will be enabled once ModelExpress packages are published
# model-express = ["dep:model_express_client", "dep:model_express_common"]
......
......@@ -20,6 +20,7 @@ use crate::kv_router::{
scoring::LoadEvent,
};
use async_trait::async_trait;
use dynamo_runtime::metrics::{MetricsRegistry, prometheus_names::kvstats};
use dynamo_runtime::traits::{DistributedRuntimeProvider, events::EventPublisher};
use dynamo_runtime::{
Error, Result,
......@@ -31,7 +32,7 @@ use dynamo_runtime::{
protocols::annotated::Annotated,
};
use futures::stream;
use std::sync::Arc;
use std::sync::{Arc, OnceLock};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
......@@ -482,12 +483,77 @@ enum RawKvEvent {
pub struct WorkerMetricsPublisher {
tx: tokio::sync::watch::Sender<Arc<ForwardPassMetrics>>,
rx: tokio::sync::watch::Receiver<Arc<ForwardPassMetrics>>,
/// Prometheus gauges for KvStats metrics
/// We use OnceLock for efficient one-time initialization and lock-free reads
/// The gauges are set once during register_prometheus_metrics and then only read
prometheus_gauges: OnceLock<KvStatsPrometheusGauges>,
}
struct KvStatsPrometheusGauges {
kv_active_blocks_gauge: prometheus::Gauge,
kv_total_blocks_gauge: prometheus::Gauge,
gpu_cache_usage_gauge: prometheus::Gauge,
gpu_prefix_cache_hit_rate_gauge: prometheus::Gauge,
}
impl KvStatsPrometheusGauges {
/// Create a new KvStatsPrometheusGauges instance with all metrics registered
fn new(component: &Component) -> Result<Self> {
let kv_active_blocks_gauge = component.create_gauge(
kvstats::ACTIVE_BLOCKS,
"Number of active KV cache blocks currently in use",
&[],
)?;
let kv_total_blocks_gauge = component.create_gauge(
kvstats::TOTAL_BLOCKS,
"Total number of KV cache blocks available",
&[],
)?;
let gpu_cache_usage_gauge = component.create_gauge(
kvstats::GPU_CACHE_USAGE_PERCENT,
"GPU cache usage as a percentage (0.0-1.0)",
&[],
)?;
let gpu_prefix_cache_hit_rate_gauge = component.create_gauge(
kvstats::GPU_PREFIX_CACHE_HIT_RATE,
"GPU prefix cache hit rate as a percentage (0.0-1.0)",
&[],
)?;
tracing::info!("Registered KvStats Prometheus metrics");
Ok(KvStatsPrometheusGauges {
kv_active_blocks_gauge,
kv_total_blocks_gauge,
gpu_cache_usage_gauge,
gpu_prefix_cache_hit_rate_gauge,
})
}
/// Update all gauges with values from KvStats
fn update_from_kvstats(&self, kv_stats: &KvStats) {
self.kv_active_blocks_gauge
.set(kv_stats.kv_active_blocks as f64);
self.kv_total_blocks_gauge
.set(kv_stats.kv_total_blocks as f64);
self.gpu_cache_usage_gauge
.set(kv_stats.gpu_cache_usage_perc as f64);
self.gpu_prefix_cache_hit_rate_gauge
.set(kv_stats.gpu_prefix_cache_hit_rate as f64);
}
}
impl WorkerMetricsPublisher {
pub fn new() -> Result<Self> {
let (tx, rx) = tokio::sync::watch::channel(Arc::new(ForwardPassMetrics::default()));
Ok(WorkerMetricsPublisher { tx, rx })
Ok(WorkerMetricsPublisher {
tx,
rx,
prometheus_gauges: OnceLock::new(),
})
}
pub fn publish(
......@@ -495,9 +561,27 @@ impl WorkerMetricsPublisher {
metrics: Arc<ForwardPassMetrics>,
) -> Result<(), tokio::sync::watch::error::SendError<Arc<ForwardPassMetrics>>> {
tracing::trace!("Publish metrics: {metrics:?}");
// Update Prometheus gauges - OnceLock provides lock-free reads after initialization
// This is the hot path - we only read the Arc, no locking overhead
if let Some(gauges) = self.prometheus_gauges.get() {
gauges.update_from_kvstats(&metrics.kv_stats);
}
self.tx.send(metrics)
}
/// Register KvStats Prometheus metrics with the component's registry
pub fn register_prometheus_metrics(&self, component: &Component) -> Result<()> {
// Use get_or_init for thread-safe one-time initialization
// This will only initialize once, subsequent calls will return immediately
self.prometheus_gauges.get_or_init(|| {
KvStatsPrometheusGauges::new(component).expect("Failed to create Prometheus gauges")
});
Ok(())
}
pub async fn create_endpoint(
&self,
component: Component,
......@@ -981,21 +1065,20 @@ mod test_exponential_backoff {
}
}
#[cfg(test)]
mod test_worker_metrics_publisher {
#[cfg(all(test, feature = "integration"))]
mod test_integration_publisher {
use super::*;
use crate::kv_router::protocols::{ForwardPassMetrics, KvStats, WorkerStats};
use dynamo_runtime::traits::events::EventSubscriber; // Add this import
use dynamo_runtime::{DistributedRuntime, Runtime};
use dynamo_runtime::distributed_test_utils::create_test_drt_async;
use dynamo_runtime::traits::events::EventSubscriber;
use futures::StreamExt;
#[tokio::test]
#[ignore] // Mark as ignored as requested
#[ignore] // Mark as ignored as requested, because CI's integrations still don't have NATS
async fn test_metrics_publishing_behavior() -> Result<()> {
// Set up runtime and namespace
let rt = Runtime::from_current().unwrap();
let drt = DistributedRuntime::from_settings(rt.clone()).await?;
let namespace = drt.namespace("test".to_string())?;
let drt = create_test_drt_async().await;
let namespace = drt.namespace("ns2001".to_string())?;
// Create a subscriber for the metrics events using subscribe_with_type
let mut subscriber = namespace
......@@ -1088,8 +1171,92 @@ mod test_worker_metrics_publisher {
"Expected no messages when load metrics don't change"
);
rt.shutdown();
drt.shutdown();
Ok(())
}
#[tokio::test]
#[ignore] // Mark as ignored as requested, because CI's integrations still don't have NATS
async fn test_kvstats_prometheus_gauge_updates() {
use crate::kv_router::publisher::kvstats;
use dynamo_runtime::metrics::MetricsRegistry;
// Test that publish() updates Prometheus gauges correctly using real Component
let publisher = WorkerMetricsPublisher::new().unwrap();
// Create a real DRT and component for integration testing
let drt = create_test_drt_async().await;
let namespace = drt.namespace("ns2002".to_string()).unwrap();
let component = namespace.component("comp2002".to_string()).unwrap();
// Register Prometheus metrics using the real constructor
publisher.register_prometheus_metrics(&component).unwrap();
// Get references to the gauges for testing
let gauges = publisher.prometheus_gauges.get().unwrap();
let active_blocks_gauge = gauges.kv_active_blocks_gauge.clone();
let total_blocks_gauge = gauges.kv_total_blocks_gauge.clone();
let cache_usage_gauge = gauges.gpu_cache_usage_gauge.clone();
let hit_rate_gauge = gauges.gpu_prefix_cache_hit_rate_gauge.clone();
// Create test metrics with specific values
let test_metrics = Arc::new(ForwardPassMetrics {
worker_stats: WorkerStats {
data_parallel_rank: None,
request_active_slots: 5,
request_total_slots: 100,
num_requests_waiting: 2,
},
kv_stats: KvStats {
kv_active_blocks: 42,
kv_total_blocks: 12894,
gpu_cache_usage_perc: 0.5,
gpu_prefix_cache_hit_rate: 0.75,
},
spec_decode_stats: None,
});
// Test 1: Initial gauge values should be 0
assert_eq!(active_blocks_gauge.get(), 0.0);
assert_eq!(total_blocks_gauge.get(), 0.0);
assert_eq!(cache_usage_gauge.get(), 0.0);
assert_eq!(hit_rate_gauge.get(), 0.0);
// Test 2: publish() should update all gauges with correct values
let result = publisher.publish(test_metrics);
assert!(result.is_ok());
// Test 3: Verify gauges were updated correctly
assert_eq!(active_blocks_gauge.get(), 42.0);
assert_eq!(total_blocks_gauge.get(), 12894.0);
assert_eq!(cache_usage_gauge.get(), 0.5);
assert_eq!(hit_rate_gauge.get(), 0.75);
// Test 4: Verify metrics are properly registered in the component's registry
// Component implements MetricsRegistry trait which provides prometheus_metrics_fmt()
let prometheus_output = component.prometheus_metrics_fmt().unwrap();
// Verify metric names are present
assert!(prometheus_output.contains(kvstats::ACTIVE_BLOCKS));
assert!(prometheus_output.contains(kvstats::TOTAL_BLOCKS));
assert!(prometheus_output.contains(kvstats::GPU_CACHE_USAGE_PERCENT));
assert!(prometheus_output.contains(kvstats::GPU_PREFIX_CACHE_HIT_RATE));
// Test 5: Verify the prometheus output contains the actual values
// Print the output to debug format issues
println!("Prometheus output:\n{}", prometheus_output);
// Check for metric values - the format includes labels so we need to be more flexible
assert!(prometheus_output.contains("kvstats_active_blocks"));
assert!(prometheus_output.contains("42")); // The value should be there
assert!(prometheus_output.contains("kvstats_total_blocks"));
assert!(prometheus_output.contains("12894")); // The value should be there
assert!(prometheus_output.contains("kvstats_gpu_cache_usage_percent"));
assert!(prometheus_output.contains("kvstats_gpu_prefix_cache_hit_rate"));
println!(
"✅ KvStatsPrometheusGauges constructor and publish() work correctly with real Component"
);
}
}
......@@ -787,7 +787,7 @@ mod tests {
// Manual debug ticker that prints forward pass metrics
_ = debug_interval.tick() => {
let _metrics = metrics_rx.borrow().clone();
println!("Forward Pass Metrics: {_metrics:#?}");
tracing::debug!("Forward Pass Metrics: {_metrics:#?}");
}
Some(_) = output_rx.recv() => {
......@@ -891,7 +891,7 @@ mod tests {
// Manual debug ticker that prints forward pass metrics
_ = debug_interval.tick() => {
let _metrics = metrics_rx.borrow().clone();
println!("Forward Pass Metrics: {_metrics:#?}");
tracing::debug!("Forward Pass Metrics: {_metrics:#?}");
}
Some(_signal) = output_rx.recv() => {
......
......@@ -345,12 +345,11 @@ impl DistributedConfig {
}
}
#[cfg(test)]
pub mod distributed_test_utils {
//! Common test helper functions for DistributedRuntime tests
// TODO: Use in-memory DistributedRuntime for tests instead of full runtime when available.
/// Helper function to create a DRT instance for tests
/// Helper function to create a DRT instance for integration-only tests.
/// Uses from_current to leverage existing tokio runtime
/// Note: Settings are read from environment variables inside DistributedRuntime::from_settings_without_discovery
#[cfg(feature = "integration")]
......@@ -362,8 +361,7 @@ pub mod distributed_test_utils {
}
}
#[cfg(feature = "integration")]
#[cfg(test)]
#[cfg(all(test, feature = "integration"))]
mod tests {
use super::distributed_test_utils::create_test_drt_async;
......
......@@ -56,6 +56,7 @@ pub mod utils;
pub mod worker;
pub mod distributed;
pub use distributed::distributed_test_utils;
pub use futures::stream;
pub use tokio_util::sync::CancellationToken;
pub use worker::Worker;
......
......@@ -246,6 +246,39 @@ pub mod kvbm_connector {
pub const KVBM_CONNECTOR_WORKER: &str = "kvbm_connector_worker";
}
/// KvStats metrics from LLM workers
pub mod kvstats {
/// Macro to generate KvStats metric names with the prefix
macro_rules! kvstats_name {
($name:expr) => {
concat!("kvstats_", $name)
};
}
/// Prefix for all KvStats metrics
pub const PREFIX: &str = kvstats_name!("");
/// Number of active KV cache blocks currently in use
pub const ACTIVE_BLOCKS: &str = kvstats_name!("active_blocks");
/// Total number of KV cache blocks available
pub const TOTAL_BLOCKS: &str = kvstats_name!("total_blocks");
/// GPU cache usage as a percentage (0.0-1.0)
pub const GPU_CACHE_USAGE_PERCENT: &str = kvstats_name!("gpu_cache_usage_percent");
/// GPU prefix cache hit rate as a percentage (0.0-1.0)
pub const GPU_PREFIX_CACHE_HIT_RATE: &str = kvstats_name!("gpu_prefix_cache_hit_rate");
}
/// All KvStats Prometheus metric names as an array for iteration/validation
pub const KVSTATS_METRICS: &[&str] = &[
kvstats::ACTIVE_BLOCKS,
kvstats::TOTAL_BLOCKS,
kvstats::GPU_CACHE_USAGE_PERCENT,
kvstats::GPU_PREFIX_CACHE_HIT_RATE,
];
// Shared regex patterns for Prometheus sanitization
static METRIC_INVALID_CHARS_PATTERN: Lazy<Regex> =
Lazy::new(|| Regex::new(r"[^a-zA-Z0-9_:]").unwrap());
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment