Unverified Commit b94cb543 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

chore(kv-router): split monolithic indexer.rs into indexer/ directory (#6870)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
Co-authored-by: default avatarClaude Opus 4.6 <noreply@anthropic.com>
parent 2ae9c290
......@@ -204,7 +204,7 @@ The main KV-aware routing arguments (frontend uses the same `--router-*` flag na
- `--router-prune-target-ratio`: Target size ratio to prune down to when `--router-max-tree-size` is exceeded. For example, with a value of 0.8 (default) and max tree size of 1048576, the router will prune down to approximately 838860 blocks when the threshold is exceeded. Defaults to 0.8 when `--no-router-kv-events` is used. This creates headroom before the next pruning cycle.
- `--router-event-threads`: Number of event processing threads for the KV indexer (default: 4). When set to 1, the router uses a single-threaded radix tree with channel-based event processing. When set to a value greater than 1 (the default), the router uses a concurrent radix tree with a thread pool of the specified size for higher event throughput. This setting only applies when KV events are enabled (the default). When `--no-router-kv-events` is set (approximate mode), the router always uses a single-threaded indexer with TTL-based expiration and pruning regardless of this setting. Can be set via `DYN_ROUTER_EVENT_THREADS` env var. For details on the underlying index data structures (`RadixTree`, `ConcurrentRadixTree`, `PositionalIndexer`) and their concurrency model (inline reads, sticky-routed writes via thread pool), see the [KV Router Index documentation](../../../lib/kv-router/README.md).
- `--router-event-threads`: Number of event processing threads for the KV indexer (default: 4). When set to 1, the router uses a single-threaded radix tree with channel-based event processing. When set to a value greater than 1 (the default), the router uses a concurrent radix tree with a thread pool of the specified size for higher event throughput. This setting only applies when KV events are enabled (the default). When `--no-router-kv-events` is set (approximate mode), the router always uses a single-threaded indexer with TTL-based expiration and pruning regardless of this setting. Can be set via `DYN_ROUTER_EVENT_THREADS` env var. For details on the underlying index data structures (`RadixTree`, `ConcurrentRadixTree`, `PositionalIndexer`) and their concurrency model (inline reads, sticky-routed writes via thread pool), see the [KV Router Index documentation](../../../lib/kv-router/src/indexer/README.md).
To implement KV event publishing for custom inference engines, enabling them to participate in Dynamo's KV cache-aware routing, see [KV Event Publishing for Custom Engines](../../integrations/kv-events-custom-engines.md).
......@@ -389,7 +389,7 @@ The cli args `--router-ttl-secs`, `--router-max-tree-size`, and `--router-prune-
- **[Router README](README.md)**: Quick start guide for the KV Router
- **[Router Examples](router-examples.md)**: Python API usage, K8s examples, and custom routing patterns
- **[KV Router Index Data Structures](../../../lib/kv-router/README.md)**: `RadixTree`, `ConcurrentRadixTree`, and `PositionalIndexer` internals and concurrency model
- **[KV Router Index Data Structures](../../../lib/kv-router/src/indexer/README.md)**: `RadixTree`, `ConcurrentRadixTree`, and `PositionalIndexer` internals and concurrency model
- **[Router Design](../../design-docs/router-design.md)**: Architecture details and event transport modes
- **[KV Event Publishing for Custom Engines](../../integrations/kv-events-custom-engines.md)**: Integrate custom inference engines with KV-aware routing
- **[Prometheus and Grafana Setup](../../observability/prometheus-grafana.md)**: General Prometheus/Grafana configuration
......
......@@ -819,7 +819,7 @@ async fn run_microbench_mode(args: MicrobenchArgs) {
// ============================================================================
/// Result of a single request during stress test
#[allow(dead_code)]
#[expect(dead_code)]
struct RequestResult {
request_id: u64,
submit_time: Instant,
......
......@@ -51,7 +51,7 @@ pub async fn run_zmq_listener(
let mut next_event_id = 0u64;
let warning_count = Arc::new(AtomicU32::new(0));
let mut consecutive_errors = 0u32;
#[allow(unused_assignments)]
#[expect(unused_assignments)]
let mut exit_reason = "unknown";
let mut messages_processed = 0u64;
......@@ -112,7 +112,7 @@ pub async fn run_zmq_listener(
continue;
};
let effective_dp_rank = batch.data_parallel_rank.map_or(dp_rank, |r| r as u32);
let effective_dp_rank = batch.data_parallel_rank.map_or(dp_rank, |r| r.cast_unsigned());
for raw_event in batch.events {
let event_id = next_event_id;
next_event_id += 1;
......
This diff is collapsed.
......@@ -4,6 +4,25 @@ This document explains the KV cache index implementations: `RadixTree` (and its
The concurrent indexers achieve a combined throughput of over **10 million events + requests per second** with **p99 latency under 10 microseconds**.
## Module Map
| File | What it does |
|------|-------------|
| `mod.rs` | Module declarations and re-exports |
| `traits.rs` | `KvIndexerInterface` (async trait) and `SyncIndexer` (sync trait for thread-pool backends) |
| `types.rs` | `KvRouterError`, `MatchRequest`, `WorkerTask`, channel message types |
| `metrics.rs` | `KvIndexerMetrics` — Prometheus counters and histograms |
| `kv_indexer.rs` | `KvIndexer` — single-threaded async wrapper around `RadixTree` with tokio mpsc channels |
| `radix_tree.rs` | `RadixTree` — single-threaded tree with `Rc<RefCell<RadixBlock>>` nodes, tracks per-block frequency |
| `concurrent_radix_tree.rs` | `ConcurrentRadixTree` — thread-safe variant with `Arc<RwLock<Block>>` nodes and `DashMap` lookup |
| `positional.rs` | `PositionalIndexer` — flat `DashMap<(pos, hash), SeqEntry>` with jump optimization |
| `thread_pool.rs` | `ThreadPoolIndexer<T: SyncIndexer>` — N OS threads for sticky-routed writes, inline reads; wraps `ConcurrentRadixTree` or `PositionalIndexer` |
| `sharded.rs` | `KvIndexerSharded` — N independent `RadixTree` shards each in its own OS thread, scatter-gather for matches |
| `local.rs` | `LocalKvIndexer` — thin wrapper around `KvIndexer` with a circular event buffer for worker-side decentralized routing |
| `pruning.rs` | `PruneManager` — TTL-based expiration and size-based pruning via `BinaryHeap<BlockEntry>` |
| `naive.rs` | Brute-force baseline indexers (bench-only, behind `bench` feature flag) |
| `tests.rs` | Integration tests for all indexer variants |
## Motivation: The Four Block Identifiers
Every cached KV block in a distributed LLM system needs four pieces of information:
......
......@@ -32,7 +32,7 @@ use rustc_hash::{FxBuildHasher, FxHashMap, FxHashSet};
use std::collections::VecDeque;
use std::sync::atomic::{AtomicUsize, Ordering};
use crate::indexer::{SyncIndexer, WorkerTask};
use super::{SyncIndexer, WorkerTask};
use crate::protocols::*;
/// Thread-safe shared reference to a Block.
......
This diff is collapsed.
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::{
collections::VecDeque,
sync::{Arc, Mutex},
};
use async_trait::async_trait;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use super::{
DumpRequest, GetWorkersRequest, KvIndexer, KvIndexerInterface, KvIndexerMetrics, KvRouterError,
WorkerKvQueryResponse,
};
use crate::protocols::*;
// -------------------------------------------------
// Decentralized router: LocalKvIndexer for workers
// -------------------------------------------------
/// A thin wrapper around KvIndexer that buffers recent events
/// (e.g. which may be queued by router upon startup)
///
pub struct LocalKvIndexer {
/// The underlying indexer
indexer: KvIndexer,
/// Circular buffer of recent events
pub(super) event_buffer: Mutex<VecDeque<RouterEvent>>,
/// Maximum number of events to keep in buffer
max_buffer_size: usize, // Router sets this to WORKER_KV_INDEXER_BUFFER_SIZE
}
impl LocalKvIndexer {
/// create a new LocalKvIndexer pointing to a KvIndexer.
pub fn new(
token: CancellationToken,
kv_block_size: u32,
metrics: Arc<KvIndexerMetrics>,
max_buffer_size: usize,
) -> Self {
Self {
indexer: KvIndexer::new(token, kv_block_size, metrics),
event_buffer: Mutex::new(VecDeque::with_capacity(max_buffer_size)),
max_buffer_size,
}
}
/// Get all buffered events (oldest first).
pub fn get_all_events_in_buffer(&self) -> Vec<RouterEvent> {
let buffer = self.event_buffer.lock().unwrap();
buffer.iter().cloned().collect()
}
/// Query events by ID range, returning events in `[start_id, end_id]` (both inclusive).
///
/// ### Arguments
///
/// * `start_id` - Starting event ID (inclusive). If `None`, dumps entire tree.
/// * `end_id` - Ending event ID (inclusive). If `None`, returns up to newest available.
///
/// ### Returns
///
/// - `Events`: Buffered events with original IDs (when range is within buffer)
/// - `TreeDump`: Full tree dump with synthetic IDs (when range is too old or unspecified)
/// - `TooNew`: Error when requested range is newer than available data
/// - `InvalidRange`: Error when end_id < start_id
pub async fn get_events_in_id_range(
&self,
start_id: Option<u64>,
end_id: Option<u64>,
) -> WorkerKvQueryResponse {
// Validate range if both specified
if let (Some(s), Some(e)) = (start_id, end_id)
&& e < s
{
tracing::warn!(start_id = s, end_id = e, "Invalid range: end_id < start_id");
return WorkerKvQueryResponse::InvalidRange {
start_id: s,
end_id: e,
};
}
// Get buffer state
let (first_id, last_id) = {
let buffer = self.event_buffer.lock().unwrap();
if buffer.is_empty() {
(None, None)
} else {
(
Some(buffer.front().unwrap().event.event_id),
Some(buffer.back().unwrap().event.event_id),
)
}
};
// If no start_id specified, dump entire tree
if start_id.is_none() {
tracing::debug!("No start_id specified, dumping entire tree");
let events = self.dump_events().await.unwrap_or_default();
return WorkerKvQueryResponse::TreeDump(events);
}
let start_id = start_id.unwrap();
let end_id = end_id.unwrap_or_else(|| last_id.unwrap_or(start_id));
// Check for empty buffer
let Some(first_buffered) = first_id else {
tracing::debug!("Buffer empty, dumping entire tree");
let events = self.dump_events().await.unwrap_or_default();
return WorkerKvQueryResponse::TreeDump(events);
};
let last_buffered = last_id.unwrap();
// Check if request is too new
if start_id > last_buffered {
tracing::warn!(
start_id,
last_buffered,
"Requested start_id is newer than buffer"
);
return WorkerKvQueryResponse::TooNew {
requested_start: Some(start_id),
requested_end: Some(end_id),
newest_available: last_buffered,
};
}
// Check if start_id is too old (before buffer) -> tree dump
if start_id < first_buffered {
tracing::info!(
start_id,
first_buffered,
"Requested start_id is older than buffer, dumping entire tree"
);
let events = self.dump_events().await.unwrap_or_default();
return WorkerKvQueryResponse::TreeDump(events);
}
// Serve from buffer
let buffer = self.event_buffer.lock().unwrap();
let start_idx = match buffer.binary_search_by_key(&start_id, |e| e.event.event_id) {
Ok(idx) => idx,
Err(insertion_point) => insertion_point,
};
// Clamp end_id to buffer bounds
let clamped_end_id = end_id.min(last_buffered);
let end_idx = match buffer.binary_search_by_key(&clamped_end_id, |e| e.event.event_id) {
Ok(idx) => idx + 1, // Include the matched element
Err(insertion_point) => insertion_point,
};
let events: Vec<RouterEvent> = buffer
.iter()
.skip(start_idx)
.take(end_idx.saturating_sub(start_idx))
.cloned()
.collect();
WorkerKvQueryResponse::Events(events)
}
/// Record an event in the buffer
fn record_event(&self, event: RouterEvent) {
let mut buffer = self.event_buffer.lock().unwrap();
// Check that event id is consecutive to last one
if let Some(last_event) = buffer.back()
&& event.event.event_id != last_event.event.event_id + 1
{
let expected = last_event.event.event_id + 1;
tracing::error!(
worker_id = event.worker_id,
expected,
got = event.event.event_id,
"Non-consecutive KV event id; buffer may have gaps"
);
}
tracing::debug!(
"Recorded event {:?} in buffer, now size is {}",
event,
buffer.len()
);
// Add to back
buffer.push_back(event);
// Remove from front if over capacity (circular buffer behavior)
while buffer.len() > self.max_buffer_size {
buffer.pop_front();
}
}
/// Apply event with buffering.
///
/// This records the event in the buffer and forwards it to the underlying indexer.
pub async fn apply_event_with_buffer(&self, event: RouterEvent) -> Result<(), KvRouterError> {
// Record in buffer
self.record_event(event.clone());
// Forward to underlying indexer
self.indexer
.event_sender()
.send(event)
.await
.map_err(|_| KvRouterError::IndexerOffline)
}
/// Clear the event buffer.
pub fn clear_buffer(&self) {
let mut buffer = self.event_buffer.lock().unwrap();
buffer.clear();
}
/// Get the current buffer size.
pub fn buffer_len(&self) -> usize {
let buffer = self.event_buffer.lock().unwrap();
buffer.len()
}
// Delegation methods to underlying KvIndexer
/// Get a sender for `RouterEvent`s.
pub fn event_sender(&self) -> mpsc::Sender<RouterEvent> {
self.indexer.event_sender()
}
/// Get a sender for dump requests (snapshot events).
pub fn snapshot_event_sender(&self) -> mpsc::Sender<DumpRequest> {
self.indexer.snapshot_event_sender()
}
/// Get a sender for worker removal requests.
pub fn remove_worker_sender(&self) -> mpsc::Sender<WorkerId> {
self.indexer.remove_worker_sender()
}
/// Get a sender for get workers requests.
pub fn get_workers_sender(&self) -> mpsc::Sender<GetWorkersRequest> {
self.indexer.get_workers_sender()
}
/// Get the KV block size.
pub fn block_size(&self) -> u32 {
self.indexer.block_size()
}
}
// Implement KvIndexerInterface by delegating to the underlying indexer
#[async_trait]
impl KvIndexerInterface for LocalKvIndexer {
async fn find_matches(
&self,
sequence: Vec<LocalBlockHash>,
) -> Result<OverlapScores, KvRouterError> {
self.indexer.find_matches(sequence).await
}
async fn find_matches_for_request(
&self,
tokens: &[u32],
lora_name: Option<&str>,
) -> Result<OverlapScores, KvRouterError> {
self.indexer
.find_matches_for_request(tokens, lora_name)
.await
}
async fn apply_event(&self, event: RouterEvent) {
// Use the buffering version
let _ = self.apply_event_with_buffer(event).await;
}
async fn remove_worker(&self, worker: WorkerId) {
let _ = self.indexer.remove_worker_sender().send(worker).await;
}
fn shutdown(&self) {
self.indexer.shutdown();
}
async fn dump_events(&self) -> Result<Vec<RouterEvent>, KvRouterError> {
self.indexer.dump_events().await
}
async fn process_routing_decision_for_request(
&self,
tokens_with_hashes: &mut TokensWithHashes,
worker: WorkerWithDpRank,
) -> Result<(), KvRouterError> {
// TODO I guess the local kvindexers have little use for this method?
// Keeping it here now to implement the trait fully
self.indexer
.process_routing_decision_for_request(tokens_with_hashes, worker)
.await
}
async fn flush(&self) -> usize {
self.indexer.flush().await
}
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
#[cfg(feature = "metrics")]
use std::sync::{Arc, OnceLock};
#[cfg(feature = "metrics")]
use dynamo_runtime::{
component::Component,
metrics::{MetricsHierarchy, prometheus_names::kvrouter},
};
use prometheus::{IntCounterVec, Opts};
use crate::protocols::{KvCacheEventData, KvCacheEventError};
/// Metrics for the KV Indexer.
#[derive(Clone)]
pub struct KvIndexerMetrics {
/// Counter of events applied.
pub kv_cache_events_applied: IntCounterVec,
}
/// Metric status labels.
pub const METRIC_STATUS_OK: &str = "ok";
pub const METRIC_STATUS_PARENT_NOT_FOUND: &str = "parent_block_not_found";
pub const METRIC_STATUS_BLOCK_NOT_FOUND: &str = "block_not_found";
pub const METRIC_STATUS_INVALID_BLOCK: &str = "invalid_block";
/// Metric event labels.
pub const METRIC_EVENT_STORED: &str = "stored";
pub const METRIC_EVENT_REMOVED: &str = "removed";
pub const METRIC_EVENT_CLEARED: &str = "cleared";
/// Metric name for KV cache events applied counter.
const KV_CACHE_EVENTS_APPLIED_NAME: &str = "dynamo_kvrouter_kv_cache_events_applied";
#[cfg(feature = "metrics")]
static KV_INDEXER_METRICS: OnceLock<Arc<KvIndexerMetrics>> = OnceLock::new();
impl KvIndexerMetrics {
#[cfg(feature = "metrics")]
fn new(kv_cache_events_applied: IntCounterVec) -> Self {
Self {
kv_cache_events_applied,
}
}
/// Creates a new KvIndexerMetrics from a Component, memoizing the result in
/// KV_INDEXER_METRICS to avoid duplicate registration issues.
#[cfg(feature = "metrics")]
pub fn from_component(component: &Component) -> Arc<Self> {
KV_INDEXER_METRICS.get_or_init(|| {
match component.metrics().create_intcountervec(
kvrouter::KV_CACHE_EVENTS_APPLIED,
"Total number of KV cache events applied to index",
&["event_type", "status"],
&[],
) {
Ok(kv_cache_events_applied) => Arc::new(Self::new(kv_cache_events_applied)),
Err(e) => {
tracing::warn!("Failed to create kv indexer metrics from component: {}. Using unregistered metrics as fallback.", e);
Arc::new(Self::new_unregistered())
}
}
}).clone()
}
/// Creates a new KvIndexerMetrics which is not registered with a MetricsRegistry.
/// This may be used for tests or as a fallback for when a MetricsRegistry is not available / has errored.
pub fn new_unregistered() -> Self {
Self {
kv_cache_events_applied: IntCounterVec::new(
Opts::new(
KV_CACHE_EVENTS_APPLIED_NAME,
"Total number of KV cache events applied to index",
),
&["event_type", "status"],
)
.unwrap(),
}
}
pub fn get_event_type(event_data: &KvCacheEventData) -> &'static str {
match event_data {
KvCacheEventData::Stored(_) => METRIC_EVENT_STORED,
KvCacheEventData::Removed(_) => METRIC_EVENT_REMOVED,
KvCacheEventData::Cleared => METRIC_EVENT_CLEARED,
}
}
pub fn increment_event_applied(
&self,
event_type: &'static str,
result: Result<(), KvCacheEventError>,
) {
match result {
Ok(_) => {
self.kv_cache_events_applied
.with_label_values(&[event_type, METRIC_STATUS_OK])
.inc_by(1);
}
Err(e) => {
let error_label = match e {
KvCacheEventError::ParentBlockNotFound => METRIC_STATUS_PARENT_NOT_FOUND,
KvCacheEventError::BlockNotFound => METRIC_STATUS_BLOCK_NOT_FOUND,
KvCacheEventError::InvalidBlockSequence => METRIC_STATUS_INVALID_BLOCK,
};
self.kv_cache_events_applied
.with_label_values(&[event_type, error_label])
.inc_by(1);
}
}
}
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment