"...git@developer.sourcefind.cn:2222/OpenDAS/vllm_cscc.git" did not exist on "4df841fe7538cb8de281b9d78e37ba51ac35b5da"
Unverified Commit d0d9c030 authored by Thomas Montfort's avatar Thomas Montfort Committed by GitHub
Browse files

fix(kv-router): expose kv_cache_events_applied in multithreaded indexer (#8088)


Signed-off-by: default avatartmontfort <tmontfort@nvidia.com>
Signed-off-by: default avatarThomas Montfort <tmontfort@nvidia.com>
Co-authored-by: default avatarClaude Opus 4.6 (1M context) <noreply@anthropic.com>
parent 71af9675
......@@ -32,7 +32,7 @@ use rustc_hash::{FxBuildHasher, FxHashMap, FxHashSet};
use std::collections::VecDeque;
use std::sync::atomic::{AtomicUsize, Ordering};
use super::{SyncIndexer, WorkerTask};
use super::{EventKind, KvIndexerMetrics, SyncIndexer, WorkerTask};
use crate::protocols::*;
/// Thread-safe shared reference to a Block.
......@@ -624,14 +624,24 @@ impl ConcurrentRadixTree {
// ============================================================================
impl SyncIndexer for ConcurrentRadixTree {
fn worker(&self, event_receiver: flume::Receiver<WorkerTask>) -> anyhow::Result<()> {
fn worker(
&self,
event_receiver: flume::Receiver<WorkerTask>,
metrics: Option<Arc<KvIndexerMetrics>>,
) -> anyhow::Result<()> {
let mut lookup = FxHashMap::default();
let counters = metrics.as_ref().map(|m| m.prebind());
while let Ok(task) = event_receiver.recv() {
match task {
WorkerTask::Event(event) => {
if let Err(e) = self.apply_event(&mut lookup, event) {
tracing::warn!("Failed to apply event: {:?}", e);
let kind = EventKind::of(&event.event.data);
let result = self.apply_event(&mut lookup, event);
if result.is_err() {
tracing::warn!("Failed to apply event: {:?}", result.as_ref().err());
}
if let Some(ref c) = counters {
c.inc(kind, result);
}
}
WorkerTask::RemoveWorker(worker_id) => {
......
......@@ -67,7 +67,7 @@ use rustc_hash::{FxBuildHasher, FxHashMap, FxHashSet};
use std::collections::VecDeque;
use std::sync::atomic::{AtomicUsize, Ordering};
use super::{SyncIndexer, WorkerTask};
use super::{EventKind, KvIndexerMetrics, SyncIndexer, WorkerTask};
use crate::protocols::*;
macro_rules! read_lock {
......@@ -1197,14 +1197,24 @@ impl ConcurrentRadixTreeCompressed {
// ============================================================================
impl SyncIndexer for ConcurrentRadixTreeCompressed {
fn worker(&self, event_receiver: flume::Receiver<WorkerTask>) -> anyhow::Result<()> {
fn worker(
&self,
event_receiver: flume::Receiver<WorkerTask>,
metrics: Option<Arc<KvIndexerMetrics>>,
) -> anyhow::Result<()> {
let mut lookup = FxHashMap::default();
let counters = metrics.as_ref().map(|m| m.prebind());
while let Ok(task) = event_receiver.recv() {
match task {
WorkerTask::Event(event) => {
if let Err(e) = self.apply_event(&mut lookup, event) {
tracing::warn!("Failed to apply event: {:?}", e);
let kind = EventKind::of(&event.event.data);
let result = self.apply_event(&mut lookup, event);
if result.is_err() {
tracing::warn!("Failed to apply event: {:?}", result.as_ref().err());
}
if let Some(ref c) = counters {
c.inc(kind, result);
}
}
WorkerTask::RemoveWorker(worker_id) => {
......
......@@ -11,8 +11,8 @@ use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;
use super::{
DumpRequest, GetWorkersRequest, KvIndexerInterface, KvIndexerMetrics, KvRouterError,
MatchRequest, RadixTree, RoutingDecisionRequest,
DumpRequest, EventKind, GetWorkersRequest, KvIndexerInterface, KvIndexerMetrics, KvRouterError,
MatchRequest, PreBoundEventCounters, RadixTree, RoutingDecisionRequest,
};
use crate::indexer::pruning::{BlockEntry, PruneConfig, PruneManager};
use crate::protocols::*;
......@@ -41,11 +41,11 @@ fn stored_block_entries(event: &RouterEvent) -> Option<Vec<BlockEntry>> {
fn apply_event_with_prune_tracking(
trie: &mut RadixTree,
event: RouterEvent,
metrics: &KvIndexerMetrics,
counters: &PreBoundEventCounters,
prune_manager: &mut Option<PruneManager<BlockEntry>>,
prune_tx: &mpsc::Sender<()>,
) {
let event_type = KvIndexerMetrics::get_event_type(&event.event.data);
let kind = EventKind::of(&event.event.data);
let event_id = event.event.event_id;
let worker_id = event.worker_id;
let event_for_prune = prune_manager.is_some().then(|| event.clone());
......@@ -53,9 +53,9 @@ fn apply_event_with_prune_tracking(
let result_is_ok = result.is_ok();
let tree_size = trie.current_size();
tracing::trace!(
"Applied KV event to global radix tree: event_type={event_type}, event_id={event_id}, worker_id={worker_id}, success={result_is_ok}, global_radix_tree_size={tree_size}"
"Applied KV event to global radix tree: event_type={kind}, event_id={event_id}, worker_id={worker_id}, success={result_is_ok}, global_radix_tree_size={tree_size}"
);
metrics.increment_event_applied(event_type, result);
counters.inc(kind, result);
let Some(pm) = prune_manager.as_mut() else {
return;
......@@ -166,6 +166,7 @@ impl KvIndexer {
PruneManager::<BlockEntry>::new(50, config)
});
let mut event_id_counter = 0u64;
let counters = metrics.prebind();
loop {
// Create a future that sleeps until the next expiration time
......@@ -222,7 +223,7 @@ impl KvIndexer {
apply_event_with_prune_tracking(
&mut trie,
event,
&metrics,
&counters,
&mut prune_manager,
&prune_tx,
);
......@@ -234,7 +235,7 @@ impl KvIndexer {
apply_event_with_prune_tracking(
&mut trie,
event,
&metrics,
&counters,
&mut prune_manager,
&prune_tx,
);
......
......@@ -15,6 +15,42 @@ use prometheus::{IntCounterVec, Opts};
use crate::protocols::{KvCacheEventData, KvCacheEventError};
/// Lightweight, `Copy` discriminant for [`KvCacheEventData`].
///
/// Extracted before the event is moved into `apply_event()`, then passed to
/// [`PreBoundEventCounters::inc`] so the compiler enforces exhaustiveness
/// without requiring a clone of the full event payload.
///
/// `Display` produces the Prometheus label value (`"stored"`, `"removed"`,
/// `"cleared"`), so this enum is also the single source of truth for the
/// `event_type` label — replacing the former `get_event_type()` helper.
#[derive(Debug, Clone, Copy)]
pub enum EventKind {
Stored,
Removed,
Cleared,
}
impl EventKind {
pub fn of(data: &KvCacheEventData) -> Self {
match data {
KvCacheEventData::Stored(_) => Self::Stored,
KvCacheEventData::Removed(_) => Self::Removed,
KvCacheEventData::Cleared => Self::Cleared,
}
}
}
impl std::fmt::Display for EventKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Stored => f.write_str(METRIC_EVENT_STORED),
Self::Removed => f.write_str(METRIC_EVENT_REMOVED),
Self::Cleared => f.write_str(METRIC_EVENT_CLEARED),
}
}
}
/// Metrics for the KV Indexer.
#[derive(Clone)]
#[cfg_attr(not(feature = "metrics"), derive(Default))]
......@@ -107,14 +143,6 @@ impl KvIndexerMetrics {
Self::default()
}
pub fn get_event_type(event_data: &KvCacheEventData) -> &'static str {
match event_data {
KvCacheEventData::Stored(_) => METRIC_EVENT_STORED,
KvCacheEventData::Removed(_) => METRIC_EVENT_REMOVED,
KvCacheEventData::Cleared => METRIC_EVENT_CLEARED,
}
}
pub fn increment_event_applied(
&self,
event_type: &'static str,
......@@ -143,4 +171,127 @@ impl KvIndexerMetrics {
#[cfg(not(feature = "metrics"))]
let _ = (self, event_type, result);
}
/// Pre-resolve all `IntCounter` handles for the finite (event_type, status) label space.
/// Call this once per worker thread at startup, then use
/// [`PreBoundEventCounters::inc`] in the hot loop to avoid the
/// `with_label_values` hashmap lookup on every event.
pub fn prebind(&self) -> PreBoundEventCounters {
PreBoundEventCounters::new(self)
}
}
/// Pre-resolved `IntCounter` handles for every (event_type, status) combination.
///
/// Created once per worker thread via [`KvIndexerMetrics::prebind`], then used in
/// the event processing loop with a direct `.inc()` call instead of the
/// `IntCounterVec::with_label_values()` hashmap lookup.
pub struct PreBoundEventCounters {
#[cfg(feature = "metrics")]
stored_ok: prometheus::IntCounter,
#[cfg(feature = "metrics")]
stored_parent_not_found: prometheus::IntCounter,
#[cfg(feature = "metrics")]
stored_block_not_found: prometheus::IntCounter,
#[cfg(feature = "metrics")]
stored_invalid_block: prometheus::IntCounter,
#[cfg(feature = "metrics")]
removed_ok: prometheus::IntCounter,
#[cfg(feature = "metrics")]
removed_parent_not_found: prometheus::IntCounter,
#[cfg(feature = "metrics")]
removed_block_not_found: prometheus::IntCounter,
#[cfg(feature = "metrics")]
removed_invalid_block: prometheus::IntCounter,
#[cfg(feature = "metrics")]
cleared_ok: prometheus::IntCounter,
#[cfg(feature = "metrics")]
cleared_parent_not_found: prometheus::IntCounter,
#[cfg(feature = "metrics")]
cleared_block_not_found: prometheus::IntCounter,
#[cfg(feature = "metrics")]
cleared_invalid_block: prometheus::IntCounter,
}
impl PreBoundEventCounters {
fn new(metrics: &KvIndexerMetrics) -> Self {
#[cfg(feature = "metrics")]
{
let cv = &metrics.kv_cache_events_applied;
Self {
stored_ok: cv.with_label_values(&[METRIC_EVENT_STORED, METRIC_STATUS_OK]),
stored_parent_not_found: cv
.with_label_values(&[METRIC_EVENT_STORED, METRIC_STATUS_PARENT_NOT_FOUND]),
stored_block_not_found: cv
.with_label_values(&[METRIC_EVENT_STORED, METRIC_STATUS_BLOCK_NOT_FOUND]),
stored_invalid_block: cv
.with_label_values(&[METRIC_EVENT_STORED, METRIC_STATUS_INVALID_BLOCK]),
removed_ok: cv.with_label_values(&[METRIC_EVENT_REMOVED, METRIC_STATUS_OK]),
removed_parent_not_found: cv
.with_label_values(&[METRIC_EVENT_REMOVED, METRIC_STATUS_PARENT_NOT_FOUND]),
removed_block_not_found: cv
.with_label_values(&[METRIC_EVENT_REMOVED, METRIC_STATUS_BLOCK_NOT_FOUND]),
removed_invalid_block: cv
.with_label_values(&[METRIC_EVENT_REMOVED, METRIC_STATUS_INVALID_BLOCK]),
cleared_ok: cv.with_label_values(&[METRIC_EVENT_CLEARED, METRIC_STATUS_OK]),
cleared_parent_not_found: cv
.with_label_values(&[METRIC_EVENT_CLEARED, METRIC_STATUS_PARENT_NOT_FOUND]),
cleared_block_not_found: cv
.with_label_values(&[METRIC_EVENT_CLEARED, METRIC_STATUS_BLOCK_NOT_FOUND]),
cleared_invalid_block: cv
.with_label_values(&[METRIC_EVENT_CLEARED, METRIC_STATUS_INVALID_BLOCK]),
}
}
#[cfg(not(feature = "metrics"))]
{
let _ = metrics;
Self {}
}
}
/// Increment the pre-resolved counter for the given event kind and result.
///
/// Takes [`EventKind`] (a `Copy` discriminant) instead of a string label,
/// so the compiler enforces exhaustiveness — a new [`EventKind`] or
/// [`KvCacheEventError`] variant will produce a compile error here.
pub fn inc(&self, kind: EventKind, result: Result<(), KvCacheEventError>) {
#[cfg(feature = "metrics")]
{
let counter = match (kind, result) {
(EventKind::Stored, Ok(())) => &self.stored_ok,
(EventKind::Stored, Err(KvCacheEventError::ParentBlockNotFound)) => {
&self.stored_parent_not_found
}
(EventKind::Stored, Err(KvCacheEventError::BlockNotFound)) => {
&self.stored_block_not_found
}
(EventKind::Stored, Err(KvCacheEventError::InvalidBlockSequence)) => {
&self.stored_invalid_block
}
(EventKind::Removed, Ok(())) => &self.removed_ok,
(EventKind::Removed, Err(KvCacheEventError::ParentBlockNotFound)) => {
&self.removed_parent_not_found
}
(EventKind::Removed, Err(KvCacheEventError::BlockNotFound)) => {
&self.removed_block_not_found
}
(EventKind::Removed, Err(KvCacheEventError::InvalidBlockSequence)) => {
&self.removed_invalid_block
}
(EventKind::Cleared, Ok(())) => &self.cleared_ok,
(EventKind::Cleared, Err(KvCacheEventError::ParentBlockNotFound)) => {
&self.cleared_parent_not_found
}
(EventKind::Cleared, Err(KvCacheEventError::BlockNotFound)) => {
&self.cleared_block_not_found
}
(EventKind::Cleared, Err(KvCacheEventError::InvalidBlockSequence)) => {
&self.cleared_invalid_block
}
};
counter.inc();
}
#[cfg(not(feature = "metrics"))]
let _ = (self, kind, result);
}
}
......@@ -22,9 +22,10 @@
//! in a `ThreadPoolIndexer`.
use dashmap::DashMap;
use rustc_hash::{FxBuildHasher, FxHashMap, FxHashSet};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use super::{SyncIndexer, WorkerTask};
use super::{EventKind, KvIndexerMetrics, SyncIndexer, WorkerTask};
use crate::protocols::{
DpRank, ExternalSequenceBlockHash, KvCacheEvent, KvCacheEventData, KvCacheEventError,
KvCacheStoreData, KvCacheStoredBlockData, LocalBlockHash, OverlapScores, RouterEvent, WorkerId,
......@@ -138,14 +139,24 @@ impl PositionalIndexer {
// ============================================================================
impl SyncIndexer for PositionalIndexer {
fn worker(&self, event_receiver: flume::Receiver<WorkerTask>) -> anyhow::Result<()> {
fn worker(
&self,
event_receiver: flume::Receiver<WorkerTask>,
metrics: Option<Arc<KvIndexerMetrics>>,
) -> anyhow::Result<()> {
let mut worker_blocks = FxHashMap::default();
let counters = metrics.as_ref().map(|m| m.prebind());
while let Ok(task) = event_receiver.recv() {
match task {
WorkerTask::Event(event) => {
if let Err(e) = self.apply_event(&mut worker_blocks, event) {
tracing::warn!("Failed to apply event: {:?}", e);
let kind = EventKind::of(&event.event.data);
let result = self.apply_event(&mut worker_blocks, event);
if result.is_err() {
tracing::warn!("Failed to apply event: {:?}", result.as_ref().err());
}
if let Some(ref c) = counters {
c.inc(kind, result);
}
}
WorkerTask::RemoveWorker(worker_id) => {
......
......@@ -12,7 +12,7 @@ use dashmap::DashMap;
use rustc_hash::FxBuildHasher;
use tokio::sync::oneshot;
use super::{KvIndexerInterface, KvRouterError, SyncIndexer, WorkerTask};
use super::{KvIndexerInterface, KvIndexerMetrics, KvRouterError, SyncIndexer, WorkerTask};
use crate::protocols::*;
/// Generic wrapper that provides [`KvIndexerInterface`] for any [`SyncIndexer`] backend.
......@@ -73,6 +73,31 @@ impl<T: SyncIndexer> ThreadPoolIndexer<T> {
///
/// Panics if `num_workers` is 0.
pub fn new(backend: T, num_workers: usize, kv_block_size: u32) -> Self {
Self::new_with_metrics(backend, num_workers, kv_block_size, None)
}
/// Create a new `ThreadPoolIndexer` with optional metrics.
///
/// Same as [`new`](Self::new) but allows passing `KvIndexerMetrics` so that
/// each worker thread records `kv_cache_events_applied` counters, matching
/// the observability of the single-threaded `KvIndexer` path.
///
/// # Arguments
///
/// * `backend` - The thread-safe data structure to wrap
/// * `num_workers` - Number of worker threads for event processing
/// * `kv_block_size` - Block size for KV cache
/// * `metrics` - Optional metrics to record event application counts
///
/// # Panics
///
/// Panics if `num_workers` is 0.
pub fn new_with_metrics(
backend: T,
num_workers: usize,
kv_block_size: u32,
metrics: Option<Arc<KvIndexerMetrics>>,
) -> Self {
assert!(num_workers > 0, "Number of workers must be greater than 0");
let backend = Arc::new(backend);
......@@ -83,9 +108,10 @@ impl<T: SyncIndexer> ThreadPoolIndexer<T> {
worker_event_senders.push(event_sender);
let backend = Arc::clone(&backend);
let metrics = metrics.clone();
let handle = std::thread::spawn(move || {
backend.worker(event_receiver).unwrap();
backend.worker(event_receiver, metrics).unwrap();
});
thread_handles.push(handle);
}
......
......@@ -3,7 +3,9 @@
use async_trait::async_trait;
use super::{KvRouterError, WorkerTask};
use std::sync::Arc;
use super::{KvIndexerMetrics, KvRouterError, WorkerTask};
use crate::protocols::*;
#[async_trait]
......@@ -107,7 +109,11 @@ pub trait KvIndexerInterface {
/// - Sticky event routing to N worker threads
/// - Inline reads on the caller's thread (no channel dispatch for find_matches)
pub trait SyncIndexer: Send + Sync + 'static {
fn worker(&self, event_receiver: flume::Receiver<WorkerTask>) -> anyhow::Result<()>;
fn worker(
&self,
event_receiver: flume::Receiver<WorkerTask>,
metrics: Option<Arc<KvIndexerMetrics>>,
) -> anyhow::Result<()>;
/// Find matches for a sequence of block hashes.
fn find_matches(&self, sequence: &[LocalBlockHash], early_exit: bool) -> OverlapScores;
......
......@@ -83,11 +83,15 @@ impl Indexer {
}
if kv_router_config.router_event_threads > 1 {
return Ok(Self::Concurrent(Arc::new(ThreadPoolIndexer::new(
ConcurrentRadixTreeCompressed::new(),
kv_router_config.router_event_threads as usize,
block_size,
))));
let kv_indexer_metrics = KvIndexerMetrics::from_component(component);
return Ok(Self::Concurrent(Arc::new(
ThreadPoolIndexer::new_with_metrics(
ConcurrentRadixTreeCompressed::new(),
kv_router_config.router_event_threads as usize,
block_size,
Some(kv_indexer_metrics),
),
)));
}
let kv_indexer_metrics = KvIndexerMetrics::from_component(component);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment