Unverified Commit 40000976 authored by blarson-b10's avatar blarson-b10 Committed by GitHub
Browse files

feat: adds kv indexer metrics (#2905)


Signed-off-by: default avatarBrian Larson <brian.larson@baseten.co>
parent 5d729c17
...@@ -383,7 +383,7 @@ impl RadixTree { ...@@ -383,7 +383,7 @@ impl RadixTree {
})?; })?;
let router_event = llm_rs::kv_router::indexer::RouterEvent::new(worker_id, kv_cache_event); let router_event = llm_rs::kv_router::indexer::RouterEvent::new(worker_id, kv_cache_event);
self.inner.apply_event(router_event); let _ = self.inner.apply_event(router_event);
Ok(()) Ok(())
} }
...@@ -415,10 +415,13 @@ impl KvIndexer { ...@@ -415,10 +415,13 @@ impl KvIndexer {
let runtime = pyo3_async_runtimes::tokio::get_runtime(); let runtime = pyo3_async_runtimes::tokio::get_runtime();
runtime.block_on(async { runtime.block_on(async {
let cancellation_token = component.inner.drt().runtime().child_token(); let cancellation_token = component.inner.drt().runtime().child_token();
let kv_indexer_metrics =
llm_rs::kv_router::indexer::KvIndexerMetrics::from_component(&component.inner);
let inner: Arc<llm_rs::kv_router::indexer::KvIndexer> = let inner: Arc<llm_rs::kv_router::indexer::KvIndexer> =
llm_rs::kv_router::indexer::KvIndexer::new( llm_rs::kv_router::indexer::KvIndexer::new(
cancellation_token.clone(), cancellation_token.clone(),
kv_block_size as u32, kv_block_size as u32,
kv_indexer_metrics,
) )
.into(); .into();
......
...@@ -235,7 +235,12 @@ impl KvRouter { ...@@ -235,7 +235,12 @@ impl KvRouter {
let runtime_configs_rx = runtime_configs_watcher.receiver(); let runtime_configs_rx = runtime_configs_watcher.receiver();
let indexer = if kv_router_config.use_kv_events { let indexer = if kv_router_config.use_kv_events {
Indexer::KvIndexer(KvIndexer::new(cancellation_token.clone(), block_size)) let kv_indexer_metrics = indexer::KvIndexerMetrics::from_component(&component);
Indexer::KvIndexer(KvIndexer::new(
cancellation_token.clone(),
block_size,
kv_indexer_metrics,
))
} else { } else {
// hard code 120 seconds for now // hard code 120 seconds for now
Indexer::ApproxKvIndexer(ApproxKvIndexer::new( Indexer::ApproxKvIndexer(ApproxKvIndexer::new(
......
...@@ -232,7 +232,7 @@ impl ApproxKvIndexer { ...@@ -232,7 +232,7 @@ impl ApproxKvIndexer {
} }
); );
trie.apply_event(event); let _ = trie.apply_event(event);
timer_manager.insert(result.sequence_hashes.iter().map(|h| TimerEntry { timer_manager.insert(result.sequence_hashes.iter().map(|h| TimerEntry {
key: ExternalSequenceBlockHash(*h), key: ExternalSequenceBlockHash(*h),
...@@ -263,7 +263,7 @@ impl ApproxKvIndexer { ...@@ -263,7 +263,7 @@ impl ApproxKvIndexer {
} }
); );
trie.apply_event(event); let _ = trie.apply_event(event);
}); });
} }
......
...@@ -43,16 +43,20 @@ ...@@ -43,16 +43,20 @@
//! //!
//! This module provides a scalable and efficient way to manage and retrieve data blocks for LLM inference, leveraging a global KV cache to optimize performance. //! This module provides a scalable and efficient way to manage and retrieve data blocks for LLM inference, leveraging a global KV cache to optimize performance.
use bytes::Bytes;
// use prometheus::{IntCounter, IntGauge};
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes;
use dynamo_runtime::{
component::Component,
metrics::{MetricsRegistry, prometheus_names::kvrouter},
};
use prometheus::{IntCounterVec, Opts};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{ use std::{
cell::RefCell, cell::RefCell,
collections::{HashMap, HashSet, VecDeque}, collections::{HashMap, HashSet, VecDeque},
iter, iter,
rc::Rc, rc::Rc,
sync::OnceLock, sync::{Arc, OnceLock},
thread::JoinHandle, thread::JoinHandle,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
...@@ -78,6 +82,16 @@ pub enum KvRouterError { ...@@ -78,6 +82,16 @@ pub enum KvRouterError {
IndexerDroppedRequest, IndexerDroppedRequest,
} }
/// Errors that can occur during KV Cache Event processing.
#[derive(Debug, thiserror::Error)]
pub enum KvCacheEventError {
#[error("Failed to find parent block")]
ParentBlockNotFound,
#[error("Failed to find block")]
BlockNotFound,
}
/// Identifier of a LLM worker which emits events to the router. /// Identifier of a LLM worker which emits events to the router.
pub type WorkerId = i64; pub type WorkerId = i64;
...@@ -315,7 +329,7 @@ impl RadixTree { ...@@ -315,7 +329,7 @@ impl RadixTree {
/// ### Arguments /// ### Arguments
/// ///
/// * `event` - The `RouterEvent` to apply. /// * `event` - The `RouterEvent` to apply.
pub fn apply_event(&mut self, event: RouterEvent) { pub fn apply_event(&mut self, event: RouterEvent) -> Result<(), KvCacheEventError> {
let (worker_id, event) = (event.worker_id, event.event); let (worker_id, event) = (event.worker_id, event.event);
let (id, op) = (event.event_id, event.data); let (id, op) = (event.event_id, event.data);
tracing::trace!(id, "Store operation: {:?}", op); tracing::trace!(id, "Store operation: {:?}", op);
...@@ -341,7 +355,7 @@ impl RadixTree { ...@@ -341,7 +355,7 @@ impl RadixTree {
parent_hash = ?op.parent_hash, parent_hash = ?op.parent_hash,
"Failed to find parent block; skipping store operation" "Failed to find parent block; skipping store operation"
); );
return; return Err(KvCacheEventError::ParentBlockNotFound);
} }
}; };
...@@ -376,6 +390,7 @@ impl RadixTree { ...@@ -376,6 +390,7 @@ impl RadixTree {
current = block; current = block;
} }
Ok(())
} }
KvCacheEventData::Removed(remove) => { KvCacheEventData::Removed(remove) => {
// tracing::trace!(id, "KV Remove Operation: {:?}", op); // tracing::trace!(id, "KV Remove Operation: {:?}", op);
...@@ -394,7 +409,7 @@ impl RadixTree { ...@@ -394,7 +409,7 @@ impl RadixTree {
id, id,
"Failed to find block to remove; skipping remove operation" "Failed to find block to remove; skipping remove operation"
); );
continue; return Err(KvCacheEventError::BlockNotFound);
} }
}; };
...@@ -407,9 +422,11 @@ impl RadixTree { ...@@ -407,9 +422,11 @@ impl RadixTree {
// remove the block from the lookup table // remove the block from the lookup table
worker_lookup.remove(&block); worker_lookup.remove(&block);
} }
Ok(())
} }
KvCacheEventData::Cleared => { KvCacheEventData::Cleared => {
self.clear_all_blocks(worker_id); self.clear_all_blocks(worker_id);
Ok(())
} }
} }
} }
...@@ -515,6 +532,98 @@ impl RadixTree { ...@@ -515,6 +532,98 @@ impl RadixTree {
} }
} }
/// Metrics for the KV Indexer.
#[derive(Clone)]
pub struct KvIndexerMetrics {
/// Counter of events applied.
pub kv_cache_events_applied: IntCounterVec,
}
/// Metric status labels.
pub const METRIC_STATUS_OK: &str = "ok";
pub const METRIC_STATUS_PARENT_NOT_FOUND: &str = "parent_block_not_found";
pub const METRIC_STATUS_BLOCK_NOT_FOUND: &str = "block_not_found";
/// Metric event labels.
pub const METRIC_EVENT_STORED: &str = "stored";
pub const METRIC_EVENT_REMOVED: &str = "removed";
pub const METRIC_EVENT_CLEARED: &str = "cleared";
static KV_INDEXER_METRICS: OnceLock<Arc<KvIndexerMetrics>> = OnceLock::new();
impl KvIndexerMetrics {
fn new(kv_cache_events_applied: IntCounterVec) -> Self {
Self {
kv_cache_events_applied,
}
}
/// Creates a new KvIndexerMetrics from a Component, memoizing the result in
/// KV_INDEXER_METRICS to avoid duplicate registration issues.
pub fn from_component(component: &Component) -> Arc<Self> {
KV_INDEXER_METRICS.get_or_init(|| {
match component.create_intcountervec(
kvrouter::KV_CACHE_EVENTS_APPLIED,
"Total number of KV cache events applied to index",
&["event_type", "status"],
&[],
) {
Ok(kv_cache_events_applied) => Arc::new(Self::new(kv_cache_events_applied)),
Err(e) => {
tracing::warn!("Failed to create kv indexer metrics from component: {}. Using unregistered metrics as fallback.", e);
Arc::new(Self::new_unregistered())
}
}
}).clone()
}
/// Creates a new KvIndexerMetrics which is not registered with a MetricsRegistry.
/// This may be used for tests or as a fallback for when a MetricsRegistry is not available / has errored.
pub fn new_unregistered() -> Self {
Self {
kv_cache_events_applied: IntCounterVec::new(
Opts::new(
kvrouter::KV_CACHE_EVENTS_APPLIED,
"Total number of KV cache events applied to index",
),
&["event_type", "status"],
)
.unwrap(),
}
}
pub fn get_event_type(event_data: &KvCacheEventData) -> &'static str {
match event_data {
KvCacheEventData::Stored(_) => METRIC_EVENT_STORED,
KvCacheEventData::Removed(_) => METRIC_EVENT_REMOVED,
KvCacheEventData::Cleared => METRIC_EVENT_CLEARED,
}
}
pub fn increment_event_applied(
&self,
event_type: &'static str,
result: Result<(), KvCacheEventError>,
) {
match result {
Ok(_) => {
self.kv_cache_events_applied
.with_label_values(&[event_type, METRIC_STATUS_OK])
.inc_by(1);
}
Err(e) => {
let error_label = match e {
KvCacheEventError::ParentBlockNotFound => METRIC_STATUS_PARENT_NOT_FOUND,
KvCacheEventError::BlockNotFound => METRIC_STATUS_BLOCK_NOT_FOUND,
};
self.kv_cache_events_applied
.with_label_values(&[event_type, error_label])
.inc_by(1);
}
}
}
}
/// Scores representing the overlap of workers. /// Scores representing the overlap of workers.
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OverlapScores { pub struct OverlapScores {
...@@ -670,12 +779,14 @@ impl KvIndexer { ...@@ -670,12 +779,14 @@ impl KvIndexer {
token: CancellationToken, token: CancellationToken,
expiration_duration: Option<Duration>, expiration_duration: Option<Duration>,
kv_block_size: u32, kv_block_size: u32,
metrics: Arc<KvIndexerMetrics>,
) -> Self { ) -> Self {
let (event_tx, event_rx) = mpsc::channel::<RouterEvent>(2048); let (event_tx, event_rx) = mpsc::channel::<RouterEvent>(2048);
let (match_tx, match_rx) = mpsc::channel::<MatchRequest>(128); let (match_tx, match_rx) = mpsc::channel::<MatchRequest>(128);
let (remove_worker_tx, remove_worker_rx) = mpsc::channel::<WorkerId>(16); let (remove_worker_tx, remove_worker_rx) = mpsc::channel::<WorkerId>(16);
let (dump_tx, dump_rx) = mpsc::channel::<DumpRequest>(16); let (dump_tx, dump_rx) = mpsc::channel::<DumpRequest>(16);
let cancel_clone = token.clone(); let cancel_clone = token.clone();
let task = std::thread::spawn(move || { let task = std::thread::spawn(move || {
// create a new tokio runtime which will only perform work on a single thread // create a new tokio runtime which will only perform work on a single thread
let runtime = tokio::runtime::Builder::new_multi_thread() let runtime = tokio::runtime::Builder::new_multi_thread()
...@@ -718,7 +829,9 @@ impl KvIndexer { ...@@ -718,7 +829,9 @@ impl KvIndexer {
} }
Some(event) = event_rx.recv() => { Some(event) = event_rx.recv() => {
trie.apply_event(event); let event_type = KvIndexerMetrics::get_event_type(&event.event.data);
let result = trie.apply_event(event);
metrics.increment_event_applied(event_type, result);
} }
} }
} }
...@@ -748,8 +861,12 @@ impl KvIndexer { ...@@ -748,8 +861,12 @@ impl KvIndexer {
self.kv_block_size self.kv_block_size
} }
pub fn new(token: CancellationToken, kv_block_size: u32) -> Self { pub fn new(
Self::new_with_frequency(token, None, kv_block_size) token: CancellationToken,
kv_block_size: u32,
metrics: Arc<KvIndexerMetrics>,
) -> Self {
Self::new_with_frequency(token, None, kv_block_size, metrics)
} }
/// Get a sender for `RouterEvent`s. /// Get a sender for `RouterEvent`s.
...@@ -894,6 +1011,7 @@ impl KvIndexerSharded { ...@@ -894,6 +1011,7 @@ impl KvIndexerSharded {
num_shards: usize, num_shards: usize,
expiration_duration: Option<Duration>, expiration_duration: Option<Duration>,
kv_block_size: u32, kv_block_size: u32,
metrics: Arc<KvIndexerMetrics>,
) -> Self { ) -> Self {
let worker_assignments: HashMap<WorkerId, usize> = HashMap::new(); let worker_assignments: HashMap<WorkerId, usize> = HashMap::new();
let worker_counts: Vec<usize> = vec![0; num_shards]; let worker_counts: Vec<usize> = vec![0; num_shards];
...@@ -912,6 +1030,7 @@ impl KvIndexerSharded { ...@@ -912,6 +1030,7 @@ impl KvIndexerSharded {
let (shard_dump_tx, mut shard_dump_rx) = mpsc::channel::<DumpRequest>(16); // Add dump channel let (shard_dump_tx, mut shard_dump_rx) = mpsc::channel::<DumpRequest>(16); // Add dump channel
let mut shard_broadcast_rx = request_broadcast_tx.subscribe(); let mut shard_broadcast_rx = request_broadcast_tx.subscribe();
let cancel = token.clone(); let cancel = token.clone();
let metrics = metrics.clone();
event_tx.push(shard_event_tx); event_tx.push(shard_event_tx);
remove_worker_tx.push(shard_remove_worker_tx); remove_worker_tx.push(shard_remove_worker_tx);
...@@ -955,7 +1074,9 @@ impl KvIndexerSharded { ...@@ -955,7 +1074,9 @@ impl KvIndexerSharded {
} }
Some(event) = shard_event_rx.recv() => { Some(event) = shard_event_rx.recv() => {
trie.apply_event(event); let event_type = KvIndexerMetrics::get_event_type(&event.event.data);
let result = trie.apply_event(event);
metrics.increment_event_applied(event_type, result);
} }
} }
} }
...@@ -985,8 +1106,13 @@ impl KvIndexerSharded { ...@@ -985,8 +1106,13 @@ impl KvIndexerSharded {
self.kv_block_size self.kv_block_size
} }
pub fn new(token: CancellationToken, num_shards: usize, kv_block_size: u32) -> Self { pub fn new(
Self::new_with_frequency(token, num_shards, None, kv_block_size) token: CancellationToken,
num_shards: usize,
kv_block_size: u32,
metrics: Arc<KvIndexerMetrics>,
) -> Self {
Self::new_with_frequency(token, num_shards, None, kv_block_size, metrics)
} }
} }
...@@ -1187,7 +1313,8 @@ mod tests { ...@@ -1187,7 +1313,8 @@ mod tests {
let worker_1 = 0; let worker_1 = 0;
let worker_2 = 1; let worker_2 = 1;
trie.apply_event(create_store_event(worker_1, 1, vec![1, 2, 3], None)); trie.apply_event(create_store_event(worker_1, 1, vec![1, 2, 3], None))
.unwrap();
let scores = trie.find_matches( let scores = trie.find_matches(
vec![LocalBlockHash(1), LocalBlockHash(2), LocalBlockHash(3)], vec![LocalBlockHash(1), LocalBlockHash(2), LocalBlockHash(3)],
...@@ -1222,7 +1349,8 @@ mod tests { ...@@ -1222,7 +1349,8 @@ mod tests {
1 1
); );
trie.apply_event(create_store_event(worker_2, 1, vec![1, 4, 5], None)); trie.apply_event(create_store_event(worker_2, 1, vec![1, 4, 5], None))
.unwrap();
let scores = trie.find_matches( let scores = trie.find_matches(
vec![LocalBlockHash(1), LocalBlockHash(2), LocalBlockHash(3)], vec![LocalBlockHash(1), LocalBlockHash(2), LocalBlockHash(3)],
...@@ -1259,7 +1387,8 @@ mod tests { ...@@ -1259,7 +1387,8 @@ mod tests {
2 2
); );
trie.apply_event(create_remove_event(worker_2, 2, vec![5])); trie.apply_event(create_remove_event(worker_2, 2, vec![5]))
.unwrap();
assert_eq!(trie.lookup.len(), 2); assert_eq!(trie.lookup.len(), 2);
assert_eq!(trie.lookup.get(&worker_1).unwrap().len(), 3); assert_eq!(trie.lookup.get(&worker_1).unwrap().len(), 3);
assert_eq!(trie.lookup.get(&worker_2).unwrap().len(), 2); assert_eq!(trie.lookup.get(&worker_2).unwrap().len(), 2);
...@@ -1288,7 +1417,8 @@ mod tests { ...@@ -1288,7 +1417,8 @@ mod tests {
2 2
); );
trie.apply_event(create_remove_event(worker_2, 3, vec![4])); trie.apply_event(create_remove_event(worker_2, 3, vec![4]))
.unwrap();
assert_eq!(trie.lookup.len(), 2); assert_eq!(trie.lookup.len(), 2);
assert_eq!(trie.lookup.get(&worker_1).unwrap().len(), 3); assert_eq!(trie.lookup.get(&worker_1).unwrap().len(), 3);
...@@ -1323,7 +1453,8 @@ mod tests { ...@@ -1323,7 +1453,8 @@ mod tests {
4, 4,
vec![2, 6, 7], vec![2, 6, 7],
Some(ExternalSequenceBlockHash(100)), Some(ExternalSequenceBlockHash(100)),
)); ))
.unwrap();
let scores = trie.find_matches( let scores = trie.find_matches(
vec![LocalBlockHash(1), LocalBlockHash(2), LocalBlockHash(3)], vec![LocalBlockHash(1), LocalBlockHash(2), LocalBlockHash(3)],
...@@ -1383,6 +1514,33 @@ mod tests { ...@@ -1383,6 +1514,33 @@ mod tests {
); );
} }
#[test]
fn test_radix_tree_apply_event_errors() {
let mut trie = RadixTree::new();
let worker_0 = 0;
// Parent block not found
let result = trie.apply_event(create_store_event(
worker_0,
0,
vec![1, 2, 3],
Some(ExternalSequenceBlockHash(12345)),
));
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
KvCacheEventError::ParentBlockNotFound
));
// Block not found for remove event.
let result = trie.apply_event(create_remove_event(worker_0, 0, vec![1, 2, 3]));
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
KvCacheEventError::BlockNotFound
));
}
#[test] #[test]
fn test_remove_worker() { fn test_remove_worker() {
setup(); setup();
...@@ -1397,8 +1555,10 @@ mod tests { ...@@ -1397,8 +1555,10 @@ mod tests {
.is_empty() .is_empty()
); );
trie.apply_event(create_store_event(worker_0, 0, vec![0], None)); trie.apply_event(create_store_event(worker_0, 0, vec![0], None))
trie.apply_event(create_store_event(worker_1, 0, vec![0], None)); .unwrap();
trie.apply_event(create_store_event(worker_1, 0, vec![0], None))
.unwrap();
let result = trie.find_matches(vec![LocalBlockHash(0)], false).scores; let result = trie.find_matches(vec![LocalBlockHash(0)], false).scores;
assert!(result.len() == 2 && result[&worker_0] == 1 && result[&worker_1] == 1); assert!(result.len() == 2 && result[&worker_0] == 1 && result[&worker_1] == 1);
...@@ -1427,8 +1587,10 @@ mod tests { ...@@ -1427,8 +1587,10 @@ mod tests {
assert!(!trie.lookup.contains_key(&worker_0)); assert!(!trie.lookup.contains_key(&worker_0));
// Test clearing a worker with shared blocks // Test clearing a worker with shared blocks
trie.apply_event(create_store_event(worker_0, 0, vec![0, 1, 3], None)); trie.apply_event(create_store_event(worker_0, 0, vec![0, 1, 3], None))
trie.apply_event(create_store_event(worker_1, 0, vec![0, 2, 3], None)); .unwrap();
trie.apply_event(create_store_event(worker_1, 0, vec![0, 2, 3], None))
.unwrap();
let result = trie.find_matches(vec![LocalBlockHash(0)], false).scores; let result = trie.find_matches(vec![LocalBlockHash(0)], false).scores;
assert!(result.len() == 2 && result[&worker_0] == 1 && result[&worker_1] == 1); assert!(result.len() == 2 && result[&worker_0] == 1 && result[&worker_1] == 1);
...@@ -1452,7 +1614,8 @@ mod tests { ...@@ -1452,7 +1614,8 @@ mod tests {
assert_eq!(result[&worker_1], 1); assert_eq!(result[&worker_1], 1);
// Test re-adding blocks after clearing worker // Test re-adding blocks after clearing worker
trie.apply_event(create_store_event(worker_0, 0, vec![4, 5], None)); trie.apply_event(create_store_event(worker_0, 0, vec![4, 5], None))
.unwrap();
let result = trie let result = trie
.find_matches(vec![LocalBlockHash(4), LocalBlockHash(5)], false) .find_matches(vec![LocalBlockHash(4), LocalBlockHash(5)], false)
.scores; .scores;
...@@ -1472,8 +1635,10 @@ mod tests { ...@@ -1472,8 +1635,10 @@ mod tests {
assert!(trie.lookup.get(&worker_1).unwrap().is_empty()); assert!(trie.lookup.get(&worker_1).unwrap().is_empty());
// Test clearing a worker that has been removed // Test clearing a worker that has been removed
trie.apply_event(create_store_event(worker_0, 0, vec![6], None)); trie.apply_event(create_store_event(worker_0, 0, vec![6], None))
trie.apply_event(create_store_event(worker_1, 0, vec![6], None)); .unwrap();
trie.apply_event(create_store_event(worker_1, 0, vec![6], None))
.unwrap();
trie.remove_worker(worker_0); trie.remove_worker(worker_0);
trie.clear_all_blocks(worker_0); trie.clear_all_blocks(worker_0);
assert!(!trie.lookup.contains_key(&worker_0)); assert!(!trie.lookup.contains_key(&worker_0));
...@@ -1500,8 +1665,10 @@ mod tests { ...@@ -1500,8 +1665,10 @@ mod tests {
let worker_0 = 0; let worker_0 = 0;
let worker_1 = 1; let worker_1 = 1;
trie.apply_event(create_store_event(worker_0, 0, vec![0, 1, 2], None)); trie.apply_event(create_store_event(worker_0, 0, vec![0, 1, 2], None))
trie.apply_event(create_store_event(worker_1, 0, vec![0], None)); .unwrap();
trie.apply_event(create_store_event(worker_1, 0, vec![0], None))
.unwrap();
let result = trie let result = trie
.find_matches( .find_matches(
...@@ -1545,13 +1712,15 @@ mod tests { ...@@ -1545,13 +1712,15 @@ mod tests {
num_shards: usize, num_shards: usize,
kv_block_size: u32, kv_block_size: u32,
) -> Box<dyn KvIndexerInterface> { ) -> Box<dyn KvIndexerInterface> {
let metrics = KvIndexerMetrics::new_unregistered();
if num_shards == 1 { if num_shards == 1 {
Box::new(KvIndexer::new(token.clone(), kv_block_size)) Box::new(KvIndexer::new(token.clone(), kv_block_size, metrics.into()))
} else { } else {
Box::new(KvIndexerSharded::new( Box::new(KvIndexerSharded::new(
token.clone(), token.clone(),
num_shards, num_shards,
kv_block_size, kv_block_size,
metrics.into(),
)) ))
} }
} }
...@@ -1632,12 +1801,14 @@ mod tests { ...@@ -1632,12 +1801,14 @@ mod tests {
let mut kv_indexer: Box<dyn KvIndexerInterface>; let mut kv_indexer: Box<dyn KvIndexerInterface>;
let token = CancellationToken::new(); let token = CancellationToken::new();
let expiration = Duration::from_millis(50); let expiration = Duration::from_millis(50);
let metrics = Arc::new(KvIndexerMetrics::new_unregistered());
if num_shards == 1 { if num_shards == 1 {
kv_indexer = Box::new(KvIndexer::new_with_frequency( kv_indexer = Box::new(KvIndexer::new_with_frequency(
token, token,
Some(expiration), Some(expiration),
kv_block_size, kv_block_size,
metrics,
)); ));
} else { } else {
kv_indexer = Box::new(KvIndexerSharded::new_with_frequency( kv_indexer = Box::new(KvIndexerSharded::new_with_frequency(
...@@ -1645,6 +1816,7 @@ mod tests { ...@@ -1645,6 +1816,7 @@ mod tests {
num_shards, num_shards,
Some(expiration), Some(expiration),
kv_block_size, kv_block_size,
metrics,
)); ));
} }
...@@ -1777,10 +1949,12 @@ mod tests { ...@@ -1777,10 +1949,12 @@ mod tests {
// Configuration // Configuration
let kv_block_size = 32; let kv_block_size = 32;
let num_shards = 2; let num_shards = 2;
let metrics = Arc::new(KvIndexerMetrics::new_unregistered());
// Build a non-trivial indexer with events // Build a non-trivial indexer with events
let token1 = CancellationToken::new(); let token1 = CancellationToken::new();
let mut original_indexer = KvIndexerSharded::new(token1.clone(), num_shards, kv_block_size); let mut original_indexer =
KvIndexerSharded::new(token1.clone(), num_shards, kv_block_size, metrics.clone());
let worker_0 = 0; let worker_0 = 0;
let worker_1 = 1; let worker_1 = 1;
...@@ -1826,7 +2000,7 @@ mod tests { ...@@ -1826,7 +2000,7 @@ mod tests {
// Create a new indexer and apply all dumped events // Create a new indexer and apply all dumped events
let token2 = CancellationToken::new(); let token2 = CancellationToken::new();
let mut reconstructed_indexer = let mut reconstructed_indexer =
KvIndexerSharded::new(token2.clone(), num_shards, kv_block_size); KvIndexerSharded::new(token2.clone(), num_shards, kv_block_size, metrics);
for event in &dump1 { for event in &dump1 {
reconstructed_indexer.apply_event(event.clone()).await; reconstructed_indexer.apply_event(event.clone()).await;
...@@ -1942,4 +2116,49 @@ mod tests { ...@@ -1942,4 +2116,49 @@ mod tests {
original_indexer.shutdown(); original_indexer.shutdown();
reconstructed_indexer.shutdown(); reconstructed_indexer.shutdown();
} }
#[test]
fn test_increment_event_applied() {
let metrics = KvIndexerMetrics::new_unregistered();
metrics.increment_event_applied(METRIC_EVENT_STORED, Ok(()));
assert_eq!(
metrics
.kv_cache_events_applied
.get_metric_with_label_values(&[METRIC_EVENT_STORED, METRIC_STATUS_OK])
.unwrap()
.get(),
1
);
metrics.increment_event_applied(
METRIC_EVENT_STORED,
Err(KvCacheEventError::ParentBlockNotFound),
);
assert_eq!(
metrics
.kv_cache_events_applied
.get_metric_with_label_values(&[
METRIC_EVENT_STORED,
METRIC_STATUS_PARENT_NOT_FOUND
])
.unwrap()
.get(),
1
);
metrics
.increment_event_applied(METRIC_EVENT_REMOVED, Err(KvCacheEventError::BlockNotFound));
assert_eq!(
metrics
.kv_cache_events_applied
.get_metric_with_label_values(&[
METRIC_EVENT_REMOVED,
METRIC_STATUS_BLOCK_NOT_FOUND
])
.unwrap()
.get(),
1
);
}
} }
...@@ -23,6 +23,7 @@ pub type KvRecorder = Recorder<RouterEvent>; ...@@ -23,6 +23,7 @@ pub type KvRecorder = Recorder<RouterEvent>;
mod tests { mod tests {
use super::*; use super::*;
use crate::kv_router::indexer::KvIndexer; use crate::kv_router::indexer::KvIndexer;
use crate::kv_router::indexer::KvIndexerMetrics;
use crate::kv_router::indexer::WorkerId; use crate::kv_router::indexer::WorkerId;
use crate::kv_router::protocols::*; use crate::kv_router::protocols::*;
use std::time::Duration; use std::time::Duration;
...@@ -128,7 +129,12 @@ mod tests { ...@@ -128,7 +129,12 @@ mod tests {
// Part 2: Now create a KvIndexer and load the events from the file // Part 2: Now create a KvIndexer and load the events from the file
let indexer_token = CancellationToken::new(); let indexer_token = CancellationToken::new();
let kv_block_size = 32; // Default block size for testing let kv_block_size = 32; // Default block size for testing
let indexer = KvIndexer::new(indexer_token.clone(), kv_block_size); let kv_indexer_metrics = KvIndexerMetrics::new_unregistered();
let indexer = KvIndexer::new(
indexer_token.clone(),
kv_block_size,
kv_indexer_metrics.into(),
);
let indexer_event_tx = indexer.event_sender(); let indexer_event_tx = indexer.event_sender();
// Use the send_events method to load events from file to indexer // Use the send_events method to load events from file to indexer
......
...@@ -279,6 +279,12 @@ pub const KVSTATS_METRICS: &[&str] = &[ ...@@ -279,6 +279,12 @@ pub const KVSTATS_METRICS: &[&str] = &[
kvstats::GPU_PREFIX_CACHE_HIT_RATE, kvstats::GPU_PREFIX_CACHE_HIT_RATE,
]; ];
// KvRouter (including KvInexer) Prometheus metric names
pub mod kvrouter {
/// Number of KV cache events applied to the index (including status)
pub const KV_CACHE_EVENTS_APPLIED: &str = "kv_cache_events_applied";
}
// Shared regex patterns for Prometheus sanitization // Shared regex patterns for Prometheus sanitization
static METRIC_INVALID_CHARS_PATTERN: Lazy<Regex> = static METRIC_INVALID_CHARS_PATTERN: Lazy<Regex> =
Lazy::new(|| Regex::new(r"[^a-zA-Z0-9_:]").unwrap()); Lazy::new(|| Regex::new(r"[^a-zA-Z0-9_:]").unwrap());
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment