Unverified Commit b2605a8e authored by Kris Hung's avatar Kris Hung Committed by GitHub
Browse files

feat: Report KVBM cache hit rate (#4333)


Signed-off-by: default avatarkrishung5 <krish@nvidia.com>
parent b0605788
...@@ -32,7 +32,7 @@ ...@@ -32,7 +32,7 @@
}, },
"id": 7, "id": 7,
"panels": [], "panels": [],
"title": "General", "title": "Cache Hit",
"type": "row" "type": "row"
}, },
{ {
...@@ -69,6 +69,7 @@ ...@@ -69,6 +69,7 @@
"type": "linear" "type": "linear"
}, },
"showPoints": "auto", "showPoints": "auto",
"showValues": false,
"spanNulls": false, "spanNulls": false,
"stacking": { "stacking": {
"group": "A", "group": "A",
...@@ -83,7 +84,8 @@ ...@@ -83,7 +84,8 @@
"mode": "absolute", "mode": "absolute",
"steps": [ "steps": [
{ {
"color": "green" "color": "green",
"value": 0
}, },
{ {
"color": "red", "color": "red",
...@@ -100,6 +102,195 @@ ...@@ -100,6 +102,195 @@
"x": 0, "x": 0,
"y": 1 "y": 1
}, },
"id": 13,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "12.2.0",
"targets": [
{
"editorMode": "builder",
"expr": "kvbm_host_cache_hit_rate",
"interval": "",
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Host Cache Hit Rate",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "P1809F7CD0C75ACF3"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"showValues": false,
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": 0
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 1
},
"id": 14,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "12.2.0",
"targets": [
{
"editorMode": "builder",
"expr": "kvbm_disk_cache_hit_rate",
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Disk Cache Hit Rate",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "P1809F7CD0C75ACF3"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"showValues": false,
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": 0
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 9
},
"id": 10, "id": 10,
"options": { "options": {
"legend": { "legend": {
...@@ -114,7 +305,7 @@ ...@@ -114,7 +305,7 @@
"sort": "none" "sort": "none"
} }
}, },
"pluginVersion": "12.0.1", "pluginVersion": "12.2.0",
"targets": [ "targets": [
{ {
"disableTextWrap": false, "disableTextWrap": false,
...@@ -137,7 +328,7 @@ ...@@ -137,7 +328,7 @@
"h": 1, "h": 1,
"w": 24, "w": 24,
"x": 0, "x": 0,
"y": 9 "y": 17
}, },
"id": 5, "id": 5,
"panels": [], "panels": [],
...@@ -178,6 +369,7 @@ ...@@ -178,6 +369,7 @@
"type": "linear" "type": "linear"
}, },
"showPoints": "auto", "showPoints": "auto",
"showValues": false,
"spanNulls": false, "spanNulls": false,
"stacking": { "stacking": {
"group": "A", "group": "A",
...@@ -192,7 +384,8 @@ ...@@ -192,7 +384,8 @@
"mode": "absolute", "mode": "absolute",
"steps": [ "steps": [
{ {
"color": "green" "color": "green",
"value": 0
}, },
{ {
"color": "red", "color": "red",
...@@ -207,7 +400,7 @@ ...@@ -207,7 +400,7 @@
"h": 8, "h": 8,
"w": 12, "w": 12,
"x": 0, "x": 0,
"y": 10 "y": 18
}, },
"id": 3, "id": 3,
"options": { "options": {
...@@ -223,7 +416,7 @@ ...@@ -223,7 +416,7 @@
"sort": "none" "sort": "none"
} }
}, },
"pluginVersion": "12.0.1", "pluginVersion": "12.2.0",
"targets": [ "targets": [
{ {
"disableTextWrap": false, "disableTextWrap": false,
...@@ -274,6 +467,7 @@ ...@@ -274,6 +467,7 @@
"type": "linear" "type": "linear"
}, },
"showPoints": "auto", "showPoints": "auto",
"showValues": false,
"spanNulls": false, "spanNulls": false,
"stacking": { "stacking": {
"group": "A", "group": "A",
...@@ -288,7 +482,8 @@ ...@@ -288,7 +482,8 @@
"mode": "absolute", "mode": "absolute",
"steps": [ "steps": [
{ {
"color": "green" "color": "green",
"value": 0
}, },
{ {
"color": "red", "color": "red",
...@@ -303,7 +498,7 @@ ...@@ -303,7 +498,7 @@
"h": 8, "h": 8,
"w": 12, "w": 12,
"x": 12, "x": 12,
"y": 10 "y": 18
}, },
"id": 11, "id": 11,
"options": { "options": {
...@@ -319,7 +514,7 @@ ...@@ -319,7 +514,7 @@
"sort": "none" "sort": "none"
} }
}, },
"pluginVersion": "12.0.1", "pluginVersion": "12.2.0",
"targets": [ "targets": [
{ {
"disableTextWrap": false, "disableTextWrap": false,
...@@ -370,6 +565,7 @@ ...@@ -370,6 +565,7 @@
"type": "linear" "type": "linear"
}, },
"showPoints": "auto", "showPoints": "auto",
"showValues": false,
"spanNulls": false, "spanNulls": false,
"stacking": { "stacking": {
"group": "A", "group": "A",
...@@ -384,7 +580,8 @@ ...@@ -384,7 +580,8 @@
"mode": "absolute", "mode": "absolute",
"steps": [ "steps": [
{ {
"color": "green" "color": "green",
"value": 0
}, },
{ {
"color": "red", "color": "red",
...@@ -399,7 +596,7 @@ ...@@ -399,7 +596,7 @@
"h": 8, "h": 8,
"w": 12, "w": 12,
"x": 0, "x": 0,
"y": 18 "y": 26
}, },
"id": 12, "id": 12,
"options": { "options": {
...@@ -415,7 +612,7 @@ ...@@ -415,7 +612,7 @@
"sort": "none" "sort": "none"
} }
}, },
"pluginVersion": "12.0.1", "pluginVersion": "12.2.0",
"targets": [ "targets": [
{ {
"disableTextWrap": false, "disableTextWrap": false,
...@@ -438,7 +635,7 @@ ...@@ -438,7 +635,7 @@
"h": 1, "h": 1,
"w": 24, "w": 24,
"x": 0, "x": 0,
"y": 26 "y": 34
}, },
"id": 6, "id": 6,
"panels": [], "panels": [],
...@@ -479,6 +676,7 @@ ...@@ -479,6 +676,7 @@
"type": "linear" "type": "linear"
}, },
"showPoints": "auto", "showPoints": "auto",
"showValues": false,
"spanNulls": false, "spanNulls": false,
"stacking": { "stacking": {
"group": "A", "group": "A",
...@@ -493,7 +691,8 @@ ...@@ -493,7 +691,8 @@
"mode": "absolute", "mode": "absolute",
"steps": [ "steps": [
{ {
"color": "green" "color": "green",
"value": 0
}, },
{ {
"color": "red", "color": "red",
...@@ -508,7 +707,7 @@ ...@@ -508,7 +707,7 @@
"h": 8, "h": 8,
"w": 12, "w": 12,
"x": 0, "x": 0,
"y": 27 "y": 35
}, },
"id": 4, "id": 4,
"options": { "options": {
...@@ -524,7 +723,7 @@ ...@@ -524,7 +723,7 @@
"sort": "none" "sort": "none"
} }
}, },
"pluginVersion": "12.0.1", "pluginVersion": "12.2.0",
"targets": [ "targets": [
{ {
"disableTextWrap": false, "disableTextWrap": false,
...@@ -575,6 +774,7 @@ ...@@ -575,6 +774,7 @@
"type": "linear" "type": "linear"
}, },
"showPoints": "auto", "showPoints": "auto",
"showValues": false,
"spanNulls": false, "spanNulls": false,
"stacking": { "stacking": {
"group": "A", "group": "A",
...@@ -589,7 +789,8 @@ ...@@ -589,7 +789,8 @@
"mode": "absolute", "mode": "absolute",
"steps": [ "steps": [
{ {
"color": "green" "color": "green",
"value": 0
}, },
{ {
"color": "red", "color": "red",
...@@ -604,7 +805,7 @@ ...@@ -604,7 +805,7 @@
"h": 8, "h": 8,
"w": 12, "w": 12,
"x": 12, "x": 12,
"y": 27 "y": 35
}, },
"id": 8, "id": 8,
"options": { "options": {
...@@ -620,7 +821,7 @@ ...@@ -620,7 +821,7 @@
"sort": "none" "sort": "none"
} }
}, },
"pluginVersion": "12.0.1", "pluginVersion": "12.2.0",
"targets": [ "targets": [
{ {
"disableTextWrap": false, "disableTextWrap": false,
...@@ -640,7 +841,7 @@ ...@@ -640,7 +841,7 @@
], ],
"preload": false, "preload": false,
"refresh": "5s", "refresh": "5s",
"schemaVersion": 41, "schemaVersion": 42,
"tags": [], "tags": [],
"templating": { "templating": {
"list": [] "list": []
...@@ -653,5 +854,5 @@ ...@@ -653,5 +854,5 @@
"timezone": "browser", "timezone": "browser",
"title": "KVBM Dashboard", "title": "KVBM Dashboard",
"uid": "3f679257-70a5-402c-92b4-05382337b548", "uid": "3f679257-70a5-402c-92b4-05382337b548",
"version": 4 "version": 3
} }
\ No newline at end of file
...@@ -141,6 +141,8 @@ KVBM currently provides following types of metrics out of the box: ...@@ -141,6 +141,8 @@ KVBM currently provides following types of metrics out of the box:
- `kvbm_offload_blocks_d2d`: The number of offload blocks from device to disk (bypassing host memory) - `kvbm_offload_blocks_d2d`: The number of offload blocks from device to disk (bypassing host memory)
- `kvbm_onboard_blocks_d2d`: The number of onboard blocks from disk to device - `kvbm_onboard_blocks_d2d`: The number of onboard blocks from disk to device
- `kvbm_onboard_blocks_h2d`: The number of onboard blocks from host to device - `kvbm_onboard_blocks_h2d`: The number of onboard blocks from host to device
- `kvbm_host_cache_hit_rate`: Host cache hit rate (0.0-1.0) from sliding window
- `kvbm_disk_cache_hit_rate`: Disk cache hit rate (0.0-1.0) from sliding window
## Troubleshooting ## Troubleshooting
......
...@@ -134,6 +134,8 @@ KVBM currently provides following types of metrics out of the box: ...@@ -134,6 +134,8 @@ KVBM currently provides following types of metrics out of the box:
- `kvbm_offload_blocks_d2d`: The number of offload blocks from device to disk (bypassing host memory) - `kvbm_offload_blocks_d2d`: The number of offload blocks from device to disk (bypassing host memory)
- `kvbm_onboard_blocks_d2d`: The number of onboard blocks from disk to device - `kvbm_onboard_blocks_d2d`: The number of onboard blocks from disk to device
- `kvbm_onboard_blocks_h2d`: The number of onboard blocks from host to device - `kvbm_onboard_blocks_h2d`: The number of onboard blocks from host to device
- `kvbm_host_cache_hit_rate`: Host cache hit rate (0.0-1.0) from sliding window
- `kvbm_disk_cache_hit_rate`: Disk cache hit rate (0.0-1.0) from sliding window
## Troubleshooting ## Troubleshooting
......
...@@ -15,6 +15,7 @@ use pyo3::PyResult; ...@@ -15,6 +15,7 @@ use pyo3::PyResult;
use std::time::Duration; use std::time::Duration;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
pub mod cache_stats;
mod controller; mod controller;
mod distributed; mod distributed;
......
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! KVBM cache statistics tracking and periodic logging.
//!
//! This module provides cache statistics tracking with a sliding window
//! approach for tracking host and disk cache hit rates.
use std::collections::VecDeque;
use std::sync::Mutex;
use std::time::{Duration, Instant};
/// Default maximum number of recent requests to track in the sliding window
const DEFAULT_MAX_RECENT_REQUESTS: usize = 1000;
const DEFAULT_LOG_INTERVAL_SECS: u64 = 5;
/// Cache statistics entry for a single request
#[derive(Clone, Copy, Debug)]
struct CacheStatsEntry {
host_blocks: u64, // Blocks found in host cache
disk_blocks: u64, // Blocks found in disk cache
total_blocks: u64, // Total blocks queried from host/disk
}
/// Aggregated cache statistics for the current sliding window
#[derive(Default)]
struct AggregatedStats {
total_blocks_queried: u64, // Total blocks queried from host/disk (same for both tiers)
host_blocks_hit: u64, // Blocks found in host cache
disk_blocks_hit: u64, // Blocks found in disk cache
}
/// Cache statistics tracker with sliding window
/// Tracks the most recent N requests (default: 1000) for cache hit rate calculation
pub struct CacheStatsTracker {
/// Maximum number of recent requests to track
max_recent_requests: usize,
/// Queue of recent cache stats entries
entries: Mutex<VecDeque<CacheStatsEntry>>,
/// Aggregated values for the current window (single lock for all counters)
aggregated: Mutex<AggregatedStats>,
/// Last time we logged statistics
last_log_time: Mutex<Instant>,
/// Interval between log messages
log_interval: Duration,
/// Optional identifier for this tracker (e.g., worker_id, engine_index)
/// Used in log messages to distinguish between multiple KVBM instances
identifier: Option<String>,
/// Last logged values to avoid duplicate logs when values haven't changed
/// Format: (total_blocks_queried, host_blocks_hit, disk_blocks_hit)
last_logged_values: Mutex<Option<(u64, u64, u64)>>,
}
impl CacheStatsTracker {
/// Create a new cache statistics tracker
///
/// # Arguments
/// * `identifier` - Optional identifier for this tracker (e.g., worker_id, engine_index).
/// Used in log messages to distinguish between multiple KVBM instances.
///
/// The maximum number of recent requests is read from `DYN_KVBM_CACHE_STATS_MAX_REQUESTS` env var
/// if set, otherwise defaults to 1000.
///
/// The log interval is read from `DYN_KVBM_CACHE_STATS_LOG_INTERVAL_SECS` env var if set,
/// otherwise defaults to 5 seconds.
pub fn new(identifier: Option<String>) -> Self {
let max_recent_requests = std::env::var("DYN_KVBM_CACHE_STATS_MAX_REQUESTS")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(DEFAULT_MAX_RECENT_REQUESTS);
let log_interval_secs = std::env::var("DYN_KVBM_CACHE_STATS_LOG_INTERVAL_SECS")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(DEFAULT_LOG_INTERVAL_SECS);
Self {
max_recent_requests,
entries: Mutex::new(VecDeque::new()),
aggregated: Mutex::new(AggregatedStats::default()),
last_log_time: Mutex::new(Instant::now()),
log_interval: Duration::from_secs(log_interval_secs),
identifier,
last_logged_values: Mutex::new(None),
}
}
/// Record cache statistics for a completed request
/// Uses sliding window: when max_recent_requests is exceeded, oldest entries are removed
pub fn record(&self, host_blocks: usize, disk_blocks: usize, total_blocks: usize) {
if total_blocks == 0 {
// Skip empty requests
return;
}
let entry = CacheStatsEntry {
host_blocks: host_blocks as u64,
disk_blocks: disk_blocks as u64,
total_blocks: total_blocks as u64,
};
// Lock entries and aggregated stats separately to minimize lock contention
let mut entries = self.entries.lock().unwrap();
let mut aggregated = self.aggregated.lock().unwrap();
// Add new entry and update aggregated stats
entries.push_back(entry);
aggregated.total_blocks_queried += entry.total_blocks;
aggregated.host_blocks_hit += entry.host_blocks;
aggregated.disk_blocks_hit += entry.disk_blocks;
// Remove oldest entries if we exceed the limit
// Keep at least one entry (the latest)
while entries.len() > 1 && entries.len() > self.max_recent_requests {
if let Some(old_entry) = entries.pop_front() {
aggregated.total_blocks_queried -= old_entry.total_blocks;
aggregated.host_blocks_hit -= old_entry.host_blocks;
aggregated.disk_blocks_hit -= old_entry.disk_blocks;
}
}
}
/// Check if we should log and do so if enough time has passed
/// Returns true if logging occurred, false otherwise
pub fn maybe_log(&self) -> bool {
let now = Instant::now();
let should_log = {
let mut last_log = self.last_log_time.lock().unwrap();
let elapsed = now.duration_since(*last_log);
if elapsed >= self.log_interval {
*last_log = now;
true
} else {
false
}
};
if should_log {
// Read aggregated stats with minimal lock time
let (total_blocks_queried, host_blocks_hit, disk_blocks_hit) = {
let aggregated = self.aggregated.lock().unwrap();
(
aggregated.total_blocks_queried,
aggregated.host_blocks_hit,
aggregated.disk_blocks_hit,
)
};
// Only log if there's activity
if total_blocks_queried > 0 {
// Check if values have changed since last log
let should_log_values = {
let mut last_logged = self.last_logged_values.lock().unwrap();
let current_values = (total_blocks_queried, host_blocks_hit, disk_blocks_hit);
match *last_logged {
Some(prev) if prev == current_values => {
// Values haven't changed, skip logging
false
}
_ => {
// Values changed or first log, update and log
*last_logged = Some(current_values);
true
}
}
};
if should_log_values {
let host_rate = if total_blocks_queried == 0 {
0.0
} else {
(host_blocks_hit as f32 / total_blocks_queried as f32) * 100.0
};
let disk_rate = if total_blocks_queried == 0 {
0.0
} else {
(disk_blocks_hit as f32 / total_blocks_queried as f32) * 100.0
};
// Include identifier in log message if available
let prefix = if let Some(ref id) = self.identifier {
format!("KVBM [{}] Cache Hit Rates", id)
} else {
"KVBM Cache Hit Rates".to_string()
};
tracing::info!(
"{} - Host: {:.1}% ({}/{}), Disk: {:.1}% ({}/{})",
prefix,
host_rate,
host_blocks_hit,
total_blocks_queried,
disk_rate,
disk_blocks_hit,
total_blocks_queried,
);
return true;
}
}
}
false
}
/// Get current host cache hit rate (0.0-1.0) from the sliding window
pub fn host_hit_rate(&self) -> f32 {
let aggregated = self.aggregated.lock().unwrap();
if aggregated.total_blocks_queried == 0 {
0.0
} else {
aggregated.host_blocks_hit as f32 / aggregated.total_blocks_queried as f32
}
}
/// Get current disk cache hit rate (0.0-1.0) from the sliding window
pub fn disk_hit_rate(&self) -> f32 {
let aggregated = self.aggregated.lock().unwrap();
if aggregated.total_blocks_queried == 0 {
0.0
} else {
aggregated.disk_blocks_hit as f32 / aggregated.total_blocks_queried as f32
}
}
/// Reset the statistics (clears the sliding window)
/// Useful for test isolation
#[cfg(test)]
#[allow(dead_code)] // Keep for test utilities, even if not currently used
pub fn reset(&self) {
let mut entries = self.entries.lock().unwrap();
let mut aggregated = self.aggregated.lock().unwrap();
let mut last_logged = self.last_logged_values.lock().unwrap();
entries.clear();
*aggregated = AggregatedStats::default();
*last_logged = None;
}
}
impl Default for CacheStatsTracker {
fn default() -> Self {
Self::new(None)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cache_stats_tracking() {
// Use a small window size for testing
unsafe {
std::env::set_var("DYN_KVBM_CACHE_STATS_MAX_REQUESTS", "10");
}
let tracker = CacheStatsTracker::new(None);
unsafe {
std::env::remove_var("DYN_KVBM_CACHE_STATS_MAX_REQUESTS");
}
// Record some cache hits
tracker.record(5, 3, 10); // 50% host, 30% disk
tracker.record(8, 2, 10); // 80% host, 20% disk
// Overall: 13/20 = 65% host, 5/20 = 25% disk
let host_rate = tracker.host_hit_rate();
let disk_rate = tracker.disk_hit_rate();
assert!((host_rate - 0.65).abs() < 0.01);
assert!((disk_rate - 0.25).abs() < 0.01);
}
#[test]
fn test_sliding_window() {
// Use a small window size for testing
unsafe {
std::env::set_var("DYN_KVBM_CACHE_STATS_MAX_REQUESTS", "3");
}
let tracker = CacheStatsTracker::new(None);
unsafe {
std::env::remove_var("DYN_KVBM_CACHE_STATS_MAX_REQUESTS");
}
// Add 5 entries, but max is 3
tracker.record(10, 5, 10); // Entry 1: 100% host, 50% disk
tracker.record(0, 0, 10); // Entry 2: 0% host, 0% disk
tracker.record(5, 5, 10); // Entry 3: 50% host, 50% disk
tracker.record(10, 10, 10); // Entry 4: 100% host, 100% disk (should remove entry 1)
tracker.record(0, 0, 10); // Entry 5: 0% host, 0% disk (should remove entry 2)
// Window should contain entries 3, 4, 5
// Entry 3: 5/10 host, 5/10 disk
// Entry 4: 10/10 host, 10/10 disk
// Entry 5: 0/10 host, 0/10 disk
// Total: 15/30 host = 50%, 15/30 disk = 50%
let host_rate = tracker.host_hit_rate();
let disk_rate = tracker.disk_hit_rate();
assert!(
(host_rate - 0.5).abs() < 0.01,
"host_rate={}, expected=0.5",
host_rate
);
assert!(
(disk_rate - 0.5).abs() < 0.01,
"disk_rate={}, expected=0.5",
disk_rate
);
// Verify window size
let entries_len = tracker.entries.lock().unwrap().len();
assert_eq!(
entries_len, 3,
"Expected 3 entries in window, got {}",
entries_len
);
}
}
...@@ -160,6 +160,7 @@ impl KvConnectorLeader { ...@@ -160,6 +160,7 @@ impl KvConnectorLeader {
block_manager.get_block_manager().clone(), block_manager.get_block_manager().clone(),
leader.clone(), leader.clone(),
kvbm_metrics_clone.clone(), kvbm_metrics_clone.clone(),
Some(format!("worker-{}", worker_id)), // identifier for cache stats
); );
let _ = slot_manager_cell.set(sm); let _ = slot_manager_cell.set(sm);
......
...@@ -168,6 +168,7 @@ impl KvConnectorLeaderRecorder { ...@@ -168,6 +168,7 @@ impl KvConnectorLeaderRecorder {
block_manager.get_block_manager().clone(), block_manager.get_block_manager().clone(),
leader.clone(), leader.clone(),
kvbm_metrics_clone.clone(), kvbm_metrics_clone.clone(),
None, // Recorder doesn't need identifier
); );
let _ = slot_manager_cell.set(sm); let _ = slot_manager_cell.set(sm);
......
...@@ -16,7 +16,8 @@ use dynamo_llm::{ ...@@ -16,7 +16,8 @@ use dynamo_llm::{
use dynamo_runtime::utils::task::CriticalTaskExecutionHandle; use dynamo_runtime::utils::task::CriticalTaskExecutionHandle;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use crate::get_current_cancel_token; use crate::block_manager::cache_stats::CacheStatsTracker;
use crate::{get_current_cancel_token, get_current_tokio_handle};
use super::*; use super::*;
...@@ -181,6 +182,10 @@ pub struct ConnectorSlotManager<R: RequestKey> { ...@@ -181,6 +182,10 @@ pub struct ConnectorSlotManager<R: RequestKey> {
/// use this to issue [`LocalTransferRequest`]s to the transfer engine /// use this to issue [`LocalTransferRequest`]s to the transfer engine
xfer_tx: mpsc::UnboundedSender<LocalTransferRequest>, xfer_tx: mpsc::UnboundedSender<LocalTransferRequest>,
_transfer_engine_handle: Option<CriticalTaskExecutionHandle>, _transfer_engine_handle: Option<CriticalTaskExecutionHandle>,
/// Cache statistics tracker
cache_stats: Arc<CacheStatsTracker>,
/// KVBM metrics for exposing cache hit rates
kvbm_metrics: KvbmMetrics,
} }
impl std::fmt::Debug for ConnectorSlotManager<String> { impl std::fmt::Debug for ConnectorSlotManager<String> {
...@@ -194,7 +199,26 @@ impl<R: RequestKey> ConnectorSlotManager<R> { ...@@ -194,7 +199,26 @@ impl<R: RequestKey> ConnectorSlotManager<R> {
block_manager: VllmBlockManager, block_manager: VllmBlockManager,
leader: Arc<KvbmLeader>, leader: Arc<KvbmLeader>,
kvbm_metrics: KvbmMetrics, kvbm_metrics: KvbmMetrics,
identifier: Option<String>,
) -> Self { ) -> Self {
let cache_stats = Arc::new(CacheStatsTracker::new(identifier));
let kvbm_metrics_clone = kvbm_metrics.clone();
let cache_stats_clone = cache_stats.clone();
// Spawn a background task to periodically update metrics and log cache hit rates
let handle = get_current_tokio_handle();
handle.spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
loop {
interval.tick().await;
// Update Prometheus metrics
let host_rate = cache_stats_clone.host_hit_rate();
let disk_rate = cache_stats_clone.disk_hit_rate();
kvbm_metrics_clone.update_cache_hit_rates(host_rate, disk_rate);
// Also log cache hit rates periodically
cache_stats_clone.maybe_log();
}
});
tracing::debug!( tracing::debug!(
"creating slot manager with block size: {}", "creating slot manager with block size: {}",
block_manager.block_size() block_manager.block_size()
...@@ -207,6 +231,7 @@ impl<R: RequestKey> ConnectorSlotManager<R> { ...@@ -207,6 +231,7 @@ impl<R: RequestKey> ConnectorSlotManager<R> {
let primary_token_clone = primary_token.clone(); let primary_token_clone = primary_token.clone();
let runtime_primary = get_current_tokio_handle(); let runtime_primary = get_current_tokio_handle();
let runtime_primary_clone = runtime_primary.clone(); let runtime_primary_clone = runtime_primary.clone();
let kvbm_metrics_clone = kvbm_metrics.clone();
let xfer_engine_task = CriticalTaskExecutionHandle::new_with_runtime( let xfer_engine_task = CriticalTaskExecutionHandle::new_with_runtime(
|cancellation_token| async move { |cancellation_token| async move {
...@@ -215,7 +240,7 @@ impl<R: RequestKey> ConnectorSlotManager<R> { ...@@ -215,7 +240,7 @@ impl<R: RequestKey> ConnectorSlotManager<R> {
cancellation_token, cancellation_token,
runtime_primary_clone, runtime_primary_clone,
primary_token_clone, primary_token_clone,
kvbm_metrics, kvbm_metrics_clone,
) )
.await .await
}, },
...@@ -230,6 +255,8 @@ impl<R: RequestKey> ConnectorSlotManager<R> { ...@@ -230,6 +255,8 @@ impl<R: RequestKey> ConnectorSlotManager<R> {
block_manager, block_manager,
xfer_tx, xfer_tx,
_transfer_engine_handle: Some(xfer_engine_task), _transfer_engine_handle: Some(xfer_engine_task),
cache_stats,
kvbm_metrics: kvbm_metrics.clone(),
} }
} }
} }
...@@ -258,6 +285,7 @@ impl<R: RequestKey> SlotManager<R> for ConnectorSlotManager<R> { ...@@ -258,6 +285,7 @@ impl<R: RequestKey> SlotManager<R> for ConnectorSlotManager<R> {
salt_hash, salt_hash,
self.block_manager.clone(), self.block_manager.clone(),
self.xfer_tx.clone(), self.xfer_tx.clone(),
self.cache_stats.clone(),
); );
self.slots self.slots
.lock() .lock()
...@@ -341,6 +369,15 @@ pub struct VllmConnectorSlot { ...@@ -341,6 +369,15 @@ pub struct VllmConnectorSlot {
/// The number of blocks that have been evaluated by the policy. /// The number of blocks that have been evaluated by the policy.
/// Each policy evaluation will skip the already evaluated blocks. /// Each policy evaluation will skip the already evaluated blocks.
evaluated_blocks: usize, evaluated_blocks: usize,
/// Whether we actually performed a cache lookup for this request
performed_cache_lookup: bool,
/// Total number of blocks queried from host/disk cache
total_blocks_queried: usize,
/// Cache statistics tracker for this KVBM instance
cache_stats: Arc<CacheStatsTracker>,
} }
impl VllmConnectorSlot { impl VllmConnectorSlot {
...@@ -350,6 +387,7 @@ impl VllmConnectorSlot { ...@@ -350,6 +387,7 @@ impl VllmConnectorSlot {
salt_hash: SaltHash, salt_hash: SaltHash,
block_manager: VllmBlockManager, block_manager: VllmBlockManager,
xfer_tx: mpsc::UnboundedSender<LocalTransferRequest>, xfer_tx: mpsc::UnboundedSender<LocalTransferRequest>,
cache_stats: Arc<CacheStatsTracker>,
) -> Self { ) -> Self {
assert!(!tokens.is_empty(), "tokens must be non-empty"); assert!(!tokens.is_empty(), "tokens must be non-empty");
let block_size = block_manager.block_size(); let block_size = block_manager.block_size();
...@@ -374,6 +412,9 @@ impl VllmConnectorSlot { ...@@ -374,6 +412,9 @@ impl VllmConnectorSlot {
tokens_cached_from_device: 0, tokens_cached_from_device: 0,
tokens_cached_from_host: 0, tokens_cached_from_host: 0,
tokens_cached_from_disk: 0, tokens_cached_from_disk: 0,
performed_cache_lookup: false,
total_blocks_queried: 0,
cache_stats,
} }
} }
...@@ -449,6 +490,8 @@ impl Slot for VllmConnectorSlot { ...@@ -449,6 +490,8 @@ impl Slot for VllmConnectorSlot {
self.tokens_cached_from_device = 0; self.tokens_cached_from_device = 0;
self.tokens_cached_from_host = 0; self.tokens_cached_from_host = 0;
self.tokens_cached_from_disk = 0; self.tokens_cached_from_disk = 0;
self.performed_cache_lookup = false;
self.total_blocks_queried = 0;
} }
fn reset(&mut self) { fn reset(&mut self) {
...@@ -712,6 +755,28 @@ impl Slot for VllmConnectorSlot { ...@@ -712,6 +755,28 @@ impl Slot for VllmConnectorSlot {
} }
fn mark_as_finished(&mut self, _iteration: u64) -> Result<(), SlotError> { fn mark_as_finished(&mut self, _iteration: u64) -> Result<(), SlotError> {
// Report cache statistics if we performed a cache lookup
if self.performed_cache_lookup {
let block_size = self.block_size;
// Convert cached tokens to blocks (rounding up)
let host_blocks = (self.tokens_cached_from_host + block_size - 1) / block_size;
let disk_blocks = (self.tokens_cached_from_disk + block_size - 1) / block_size;
tracing::debug!(
request_id = %self.request_id,
"Reporting cache stats: host_blocks={}, disk_blocks={}, total_blocks_queried={}, tokens_from_host={}, tokens_from_disk={}",
host_blocks,
disk_blocks,
self.total_blocks_queried,
self.tokens_cached_from_host,
self.tokens_cached_from_disk
);
self.cache_stats
.record(host_blocks, disk_blocks, self.total_blocks_queried);
}
// Check if there are any pending operations // Check if there are any pending operations
let has_pending_ops = self let has_pending_ops = self
.pending_operations .pending_operations
...@@ -792,9 +857,32 @@ impl Slot for VllmConnectorSlot { ...@@ -792,9 +857,32 @@ impl Slot for VllmConnectorSlot {
// we start matching non-device blocks after the device blocks // we start matching non-device blocks after the device blocks
let search_offset = num_computed_blocks; let search_offset = num_computed_blocks;
// Calculate how many blocks we're querying from host/disk
let blocks_to_lookup = &sequence_hashes[search_offset..];
tracing::debug!("matching against {} block hashes", blocks_to_lookup.len());
// If there are no blocks to lookup (GPU has everything), return early
if blocks_to_lookup.is_empty() {
tracing::debug!(
request_id = %self.request_id,
"no blocks to lookup from host/disk; GPU has all blocks"
);
// Still mark that we performed a lookup (even though we didn't need to query)
self.performed_cache_lookup = true;
self.total_blocks_queried = 0;
return Ok(());
}
// Mark that we're performing a cache lookup and track the total blocks
self.performed_cache_lookup = true;
self.total_blocks_queried = blocks_to_lookup.len();
tracing::debug!( tracing::debug!(
"matching against {} block hashes", request_id = %self.request_id,
sequence_hashes[search_offset..].len() "Starting cache lookup: querying {} blocks from host/disk (num_computed_blocks={})",
blocks_to_lookup.len(),
num_computed_blocks
); );
// we should do this opportunistically after this operation is done // we should do this opportunistically after this operation is done
...@@ -811,7 +899,7 @@ impl Slot for VllmConnectorSlot { ...@@ -811,7 +899,7 @@ impl Slot for VllmConnectorSlot {
let mut host_blocks = self let mut host_blocks = self
.block_manager .block_manager
.host() .host()
.map(|host| host.match_sequence_hashes_blocking(&sequence_hashes[search_offset..])) .map(|host| host.match_sequence_hashes_blocking(blocks_to_lookup))
.transpose()? .transpose()?
.unwrap_or_default(); .unwrap_or_default();
......
...@@ -115,6 +115,7 @@ impl KvConnectorLeader { ...@@ -115,6 +115,7 @@ impl KvConnectorLeader {
block_manager.get_block_manager().clone(), block_manager.get_block_manager().clone(),
leader.clone(), leader.clone(),
kvbm_metrics_clone.clone(), kvbm_metrics_clone.clone(),
Some(format!("worker-{}", worker_id)),
); );
let _ = slot_manager_cell.set(sm); let _ = slot_manager_cell.set(sm);
......
...@@ -4,12 +4,12 @@ ...@@ -4,12 +4,12 @@
use axum::Router; use axum::Router;
use dynamo_runtime::metrics::prometheus_names::{ use dynamo_runtime::metrics::prometheus_names::{
kvbm::{ kvbm::{
MATCHED_TOKENS, OFFLOAD_BLOCKS_D2D, OFFLOAD_BLOCKS_D2H, OFFLOAD_BLOCKS_H2D, DISK_CACHE_HIT_RATE, HOST_CACHE_HIT_RATE, MATCHED_TOKENS, OFFLOAD_BLOCKS_D2D,
ONBOARD_BLOCKS_D2D, ONBOARD_BLOCKS_H2D, OFFLOAD_BLOCKS_D2H, OFFLOAD_BLOCKS_H2D, ONBOARD_BLOCKS_D2D, ONBOARD_BLOCKS_H2D,
}, },
sanitize_prometheus_name, sanitize_prometheus_name,
}; };
use prometheus::{IntCounter, Opts, Registry}; use prometheus::{Gauge, IntCounter, Opts, Registry};
use std::{collections::HashMap, net::SocketAddr, sync::Arc, thread}; use std::{collections::HashMap, net::SocketAddr, sync::Arc, thread};
use tokio::{net::TcpListener, sync::Notify}; use tokio::{net::TcpListener, sync::Notify};
...@@ -35,6 +35,12 @@ pub struct KvbmMetrics { ...@@ -35,6 +35,12 @@ pub struct KvbmMetrics {
// number of matched tokens from KVBM // number of matched tokens from KVBM
pub matched_tokens: IntCounter, pub matched_tokens: IntCounter,
// host cache hit rate (0.0-1.0) from the sliding window
pub host_cache_hit_rate: Gauge,
// disk cache hit rate (0.0-1.0) from the sliding window
pub disk_cache_hit_rate: Gauge,
shutdown_notify: Option<Arc<Notify>>, shutdown_notify: Option<Arc<Notify>>,
} }
...@@ -81,6 +87,20 @@ impl KvbmMetrics { ...@@ -81,6 +87,20 @@ impl KvbmMetrics {
let matched_tokens = mr let matched_tokens = mr
.create_intcounter(MATCHED_TOKENS, "The number of matched tokens", &[]) .create_intcounter(MATCHED_TOKENS, "The number of matched tokens", &[])
.unwrap(); .unwrap();
let host_cache_hit_rate = mr
.create_gauge(
HOST_CACHE_HIT_RATE,
"Host cache hit rate (0.0-1.0) from the sliding window",
&[],
)
.unwrap();
let disk_cache_hit_rate = mr
.create_gauge(
DISK_CACHE_HIT_RATE,
"Disk cache hit rate (0.0-1.0) from the sliding window",
&[],
)
.unwrap();
// early return if no endpoint is needed // early return if no endpoint is needed
if !create_endpoint { if !create_endpoint {
...@@ -91,6 +111,8 @@ impl KvbmMetrics { ...@@ -91,6 +111,8 @@ impl KvbmMetrics {
onboard_blocks_h2d, onboard_blocks_h2d,
onboard_blocks_d2d, onboard_blocks_d2d,
matched_tokens, matched_tokens,
host_cache_hit_rate,
disk_cache_hit_rate,
shutdown_notify: None, shutdown_notify: None,
}; };
} }
...@@ -145,9 +167,17 @@ impl KvbmMetrics { ...@@ -145,9 +167,17 @@ impl KvbmMetrics {
onboard_blocks_h2d, onboard_blocks_h2d,
onboard_blocks_d2d, onboard_blocks_d2d,
matched_tokens, matched_tokens,
host_cache_hit_rate,
disk_cache_hit_rate,
shutdown_notify: Some(notify), shutdown_notify: Some(notify),
} }
} }
/// Update cache hit rate metrics from a CacheStatsTracker
pub fn update_cache_hit_rates(&self, host_rate: f32, disk_rate: f32) {
self.host_cache_hit_rate.set(host_rate as f64);
self.disk_cache_hit_rate.set(disk_rate as f64);
}
} }
impl Drop for KvbmMetrics { impl Drop for KvbmMetrics {
...@@ -194,6 +224,23 @@ impl KvbmMetricsRegistry { ...@@ -194,6 +224,23 @@ impl KvbmMetricsRegistry {
Ok(c) Ok(c)
} }
pub fn create_gauge(
&self,
name: &str,
description: &str,
labels: &[(&str, &str)],
) -> anyhow::Result<Gauge> {
let metrics_name = sanitize_prometheus_name(&format!("{}_{}", self.prefix, name))?;
let const_labels: HashMap<String, String> = labels
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();
let opts = Opts::new(metrics_name, description).const_labels(const_labels);
let g = Gauge::with_opts(opts)?;
self.registry.register(Box::new(g.clone()))?;
Ok(g)
}
pub fn inner(&self) -> Arc<Registry> { pub fn inner(&self) -> Arc<Registry> {
Arc::clone(&self.registry) Arc::clone(&self.registry)
} }
......
...@@ -339,6 +339,12 @@ pub mod kvbm { ...@@ -339,6 +339,12 @@ pub mod kvbm {
/// The number of matched tokens /// The number of matched tokens
pub const MATCHED_TOKENS: &str = "matched_tokens"; pub const MATCHED_TOKENS: &str = "matched_tokens";
/// Host cache hit rate (0.0-1.0) from the sliding window
pub const HOST_CACHE_HIT_RATE: &str = "host_cache_hit_rate";
/// Disk cache hit rate (0.0-1.0) from the sliding window
pub const DISK_CACHE_HIT_RATE: &str = "disk_cache_hit_rate";
} }
/// KvStats metrics from LLM workers /// KvStats metrics from LLM workers
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment