Unverified Commit 36b4208e authored by Yongming Ding's avatar Yongming Ding Committed by GitHub
Browse files

refactor(mocker): replace vllm block manager with kvbm-logical (#8451)


Signed-off-by: default avatarYongming Ding <yongmingd@nvidia.com>
Co-authored-by: default avatarRyan Olson <rolson@nvidia.com>
parent 1dc0975b
......@@ -2511,6 +2511,7 @@ dependencies = [
"dynamo-kv-router",
"dynamo-tokens",
"indicatif 0.18.4",
"kvbm-logical",
"ndarray 0.16.1",
"ndarray-interp",
"ndarray-npy",
......
......@@ -1634,6 +1634,7 @@ dependencies = [
"dynamo-kv-router",
"dynamo-tokens",
"indicatif 0.18.4",
"kvbm-logical",
"ndarray",
"ndarray-interp",
"ndarray-npy",
......@@ -1714,7 +1715,7 @@ dependencies = [
"libc",
"local-ip-address",
"log",
"lru",
"lru 0.12.5",
"mio 1.1.1",
"notify",
"nuid",
......@@ -2112,6 +2113,12 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
[[package]]
name = "foldhash"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb"
[[package]]
name = "form_urlencoded"
version = "1.2.2"
......@@ -2396,7 +2403,18 @@ checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
dependencies = [
"allocator-api2",
"equivalent",
"foldhash",
"foldhash 0.1.5",
]
[[package]]
name = "hashbrown"
version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100"
dependencies = [
"allocator-api2",
"equivalent",
"foldhash 0.2.0",
]
[[package]]
......@@ -3228,6 +3246,31 @@ dependencies = [
"tracing",
]
[[package]]
name = "kvbm-logical"
version = "1.1.0"
dependencies = [
"anyhow",
"async-stream",
"bincode 2.0.1",
"bytes",
"derive_builder",
"dynamo-tokens",
"futures",
"indexmap 2.14.0",
"lru 0.16.4",
"parking_lot",
"prometheus",
"rmp-serde",
"serde",
"serde_json",
"thiserror 2.0.18",
"tokio",
"tokio-stream",
"tracing",
"xxhash-rust",
]
[[package]]
name = "kvbm-py3"
version = "1.1.0"
......@@ -3389,6 +3432,15 @@ dependencies = [
"hashbrown 0.15.5",
]
[[package]]
name = "lru"
version = "0.16.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f66e8d5d03f609abc3a39e6f08e4164ebf1447a732906d39eb9b99b7919ef39"
dependencies = [
"hashbrown 0.16.1",
]
[[package]]
name = "lru-slab"
version = "0.1.2"
......@@ -6628,6 +6680,7 @@ dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]
......
......@@ -1649,6 +1649,7 @@ dependencies = [
"dynamo-kv-router",
"dynamo-tokens",
"indicatif 0.18.4",
"kvbm-logical",
"ndarray",
"ndarray-interp",
"ndarray-npy",
......@@ -1761,7 +1762,7 @@ dependencies = [
"libc",
"local-ip-address",
"log",
"lru",
"lru 0.12.5",
"mio 1.1.1",
"notify",
"nuid",
......@@ -2184,6 +2185,12 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
[[package]]
name = "foldhash"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb"
[[package]]
name = "form_urlencoded"
version = "1.2.2"
......@@ -2468,7 +2475,18 @@ checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
dependencies = [
"allocator-api2",
"equivalent",
"foldhash",
"foldhash 0.1.5",
]
[[package]]
name = "hashbrown"
version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100"
dependencies = [
"allocator-api2",
"equivalent",
"foldhash 0.2.0",
]
[[package]]
......@@ -3300,6 +3318,31 @@ dependencies = [
"tracing",
]
[[package]]
name = "kvbm-logical"
version = "1.1.0"
dependencies = [
"anyhow",
"async-stream",
"bincode 2.0.1",
"bytes",
"derive_builder",
"dynamo-tokens",
"futures",
"indexmap 2.14.0",
"lru 0.16.4",
"parking_lot",
"prometheus",
"rmp-serde",
"serde",
"serde_json",
"thiserror 2.0.18",
"tokio",
"tokio-stream",
"tracing",
"xxhash-rust",
]
[[package]]
name = "lalrpop-util"
version = "0.20.2"
......@@ -3440,6 +3483,15 @@ dependencies = [
"hashbrown 0.15.5",
]
[[package]]
name = "lru"
version = "0.16.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f66e8d5d03f609abc3a39e6f08e4164ebf1447a732906d39eb9b99b7919ef39"
dependencies = [
"hashbrown 0.16.1",
]
[[package]]
name = "lru-slab"
version = "0.1.2"
......@@ -6698,6 +6750,7 @@ dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]
......
......@@ -78,6 +78,20 @@ impl<T: BlockMetadata> BlockManager<T> {
///
/// Returns `None` if fewer than `count` blocks are available across both pools.
pub fn allocate_blocks(&self, count: usize) -> Option<Vec<MutableBlock<T>>> {
self.allocate_blocks_with_evictions(count)
.map(|(blocks, _evicted)| blocks)
}
/// Like [`allocate_blocks`](Self::allocate_blocks) but also reports the
/// [`SequenceHash`] of each block evicted from the inactive pool to
/// satisfy the allocation. Callers maintaining a shadow view of which
/// registrations are alive (e.g. the mocker's router-event bridge) can
/// translate these hashes into cache-invalidation events directly,
/// avoiding an O(N) presence scan over the registry.
pub fn allocate_blocks_with_evictions(
&self,
count: usize,
) -> Option<(Vec<MutableBlock<T>>, Vec<SequenceHash>)> {
let _guard = self.allocate_mutex.lock();
let from_reset = self.reset_pool.allocate_blocks(count);
let from_reset_count = from_reset.len();
......@@ -85,7 +99,7 @@ impl<T: BlockMetadata> BlockManager<T> {
let remaining_needed = count - blocks.len();
match self.inactive_pool.allocate_blocks(remaining_needed) {
Some(remaining) => {
Some((remaining, evicted)) => {
let eviction_count = remaining.len() as u64;
blocks.extend(remaining);
......@@ -94,7 +108,7 @@ impl<T: BlockMetadata> BlockManager<T> {
.inc_allocations_from_reset(from_reset_count as u64);
self.metrics.inc_evictions(eviction_count);
Some(blocks)
Some((blocks, evicted))
}
None => None,
}
......
......@@ -181,10 +181,18 @@ impl<T: BlockMetadata + Sync> InactivePool<T> {
.collect()
}
/// Allocate blocks from registered pool, converting them to MutableBlocks for ResetPool
pub(crate) fn allocate_blocks(&self, count: usize) -> Option<Vec<MutableBlock<T>>> {
/// Allocate blocks from registered pool, converting them to
/// [`MutableBlock`]s for the [`ResetPool`]. Also reports the
/// [`SequenceHash`] of each evicted block so upstream layers can
/// propagate cache-invalidation events without a secondary presence scan.
///
/// Returns `None` if fewer than `count` evictable blocks are available.
pub(crate) fn allocate_blocks(
&self,
count: usize,
) -> Option<(Vec<MutableBlock<T>>, Vec<SequenceHash>)> {
if count == 0 {
return Some(Vec::new());
return Some((Vec::new(), Vec::new()));
}
let mut inner = self.inner.write();
......@@ -202,15 +210,19 @@ impl<T: BlockMetadata + Sync> InactivePool<T> {
}
}
let mut mutable_blocks = Vec::with_capacity(count);
mutable_blocks.extend(allocated_blocks.into_iter().map(|registered_block| {
let mut evicted = Vec::with_capacity(count);
for registered_block in allocated_blocks {
// Capture the identity BEFORE `reset()` drops the
// registration handle and marks the block absent.
evicted.push(registered_block.sequence_hash());
let reset_block = registered_block.reset();
MutableBlock::new(
mutable_blocks.push(MutableBlock::new(
reset_block,
self.reset_return_fn.clone(),
self.metrics.clone(),
)
}));
Some(mutable_blocks)
));
}
Some((mutable_blocks, evicted))
} else {
for block in allocated_blocks {
inner.backend.insert(block);
......@@ -395,17 +407,27 @@ mod tests {
fn test_allocate_blocks() {
let (pool, reset_pool) = create_test_pool();
let (block1, _) = create_registered_block::<TestMeta>(1, &tokens_for_id(1));
let (block2, _) = create_registered_block::<TestMeta>(2, &tokens_for_id(2));
let (block3, _) = create_registered_block::<TestMeta>(3, &tokens_for_id(3));
let (block1, seq_hash1) = create_registered_block::<TestMeta>(1, &tokens_for_id(1));
let (block2, seq_hash2) = create_registered_block::<TestMeta>(2, &tokens_for_id(2));
let (block3, seq_hash3) = create_registered_block::<TestMeta>(3, &tokens_for_id(3));
pool.insert(block1);
pool.insert(block2);
pool.insert(block3);
assert_eq!(pool.len(), 3);
let mutable_blocks = pool.allocate_blocks(1).expect("Should allocate 1 block");
let (mutable_blocks, evicted) = pool.allocate_blocks(1).expect("Should allocate 1 block");
assert_eq!(mutable_blocks.len(), 1);
assert_eq!(
evicted.len(),
1,
"one sequence hash should be reported as evicted"
);
assert!(
[seq_hash1, seq_hash2, seq_hash3].contains(&evicted[0]),
"evicted hash must match one of the inserted blocks; got {:?}",
evicted[0]
);
assert_eq!(pool.len(), 2);
drop(mutable_blocks);
......@@ -414,6 +436,35 @@ mod tests {
assert_eq!(reset_pool.available_blocks(), 11);
}
/// Sanity: asking for multiple evictions returns that many distinct hashes,
/// each matching an inserted block.
#[test]
fn test_allocate_blocks_reports_all_evicted_hashes() {
let (pool, _reset_pool) = create_test_pool();
let (block1, seq_hash1) = create_registered_block::<TestMeta>(1, &tokens_for_id(1));
let (block2, seq_hash2) = create_registered_block::<TestMeta>(2, &tokens_for_id(2));
let (block3, seq_hash3) = create_registered_block::<TestMeta>(3, &tokens_for_id(3));
pool.insert(block1);
pool.insert(block2);
pool.insert(block3);
let inserted = [seq_hash1, seq_hash2, seq_hash3];
let (mutable_blocks, evicted) = pool
.allocate_blocks(3)
.expect("Should allocate all three blocks");
assert_eq!(mutable_blocks.len(), 3);
assert_eq!(evicted.len(), 3);
for h in &evicted {
assert!(
inserted.contains(h),
"evicted hash {h:?} not in inserted set"
);
}
let unique: std::collections::HashSet<_> = evicted.iter().copied().collect();
assert_eq!(unique.len(), 3, "evicted hashes must all be distinct");
}
#[test]
fn test_allocate_more_than_available_fails() {
let (pool, _reset_pool) = create_test_pool();
......
......@@ -16,6 +16,7 @@ readme = "README.md"
# repo
dynamo-kv-router = { workspace = true }
dynamo-tokens = { workspace = true }
kvbm-logical = { workspace = true }
# workspace
anyhow = { workspace = true }
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use crate::common::evictor::LRUEvictor;
use dynamo_tokens::blocks::UniqueBlock;
use rustc_hash::FxHashMap;
/// Hash-based KV cache with O(1) block lookups, maintaining active (ref-counted) and
/// inactive (LRU-evictable) pools.
pub struct HashCache {
active_blocks: FxHashMap<UniqueBlock, usize>,
inactive_blocks: LRUEvictor<UniqueBlock>,
max_capacity: usize,
}
impl HashCache {
/// Create a new HashCache with the given maximum block capacity.
pub fn new(max_capacity: usize) -> Self {
Self {
active_blocks: FxHashMap::default(),
inactive_blocks: LRUEvictor::default(),
max_capacity,
}
}
/// Get the reference count of an active block, if it exists.
pub fn get_active_ref_count(&self, block: &UniqueBlock) -> Option<usize> {
self.active_blocks.get(block).copied()
}
/// Increment the reference count of an active block. Returns the new count.
pub fn increment_ref(&mut self, block: &UniqueBlock) -> usize {
let ref_count = self
.active_blocks
.get_mut(block)
.expect("block must be active to increment ref");
*ref_count += 1;
*ref_count
}
/// Decrement the reference count of an active block. Returns the new count.
pub fn decrement_ref(&mut self, block: &UniqueBlock) -> usize {
let ref_count = self
.active_blocks
.get_mut(block)
.expect("block must be active to decrement ref");
*ref_count -= 1;
*ref_count
}
/// Insert a block into the active pool with the given reference count.
pub fn insert_active(&mut self, block: UniqueBlock, ref_count: usize) {
self.active_blocks.insert(block, ref_count);
}
/// Remove a block from the active pool. Returns the reference count, or None if not found.
pub fn remove_active(&mut self, block: &UniqueBlock) -> Option<usize> {
self.active_blocks.remove(block)
}
/// Check if a block is in the active pool.
pub fn contains_active(&self, block: &UniqueBlock) -> bool {
self.active_blocks.contains_key(block)
}
/// Insert a block into the inactive pool (LRU order).
pub fn insert_inactive(&mut self, block: UniqueBlock) {
self.inactive_blocks.insert(block);
}
/// Remove a block from the inactive pool. Returns true if it was found.
pub fn remove_inactive(&mut self, block: &UniqueBlock) -> bool {
self.inactive_blocks.remove(block)
}
/// Evict the least-recently-used block from the inactive pool.
pub fn evict_inactive(&mut self) -> Option<UniqueBlock> {
self.inactive_blocks.evict()
}
/// Check if a block is in the inactive pool.
pub fn contains_inactive(&self, block: &UniqueBlock) -> bool {
self.inactive_blocks.contains(block)
}
/// Check if a block exists in either active or inactive pool.
pub fn contains(&self, block: &UniqueBlock) -> bool {
self.active_blocks.contains_key(block) || self.inactive_blocks.contains(block)
}
/// Move block from active to inactive (ref_count reached 0).
pub fn deactivate(&mut self, block: &UniqueBlock) {
debug_assert!(
self.active_blocks.contains_key(block),
"deactivate called on non-active block"
);
debug_assert!(
!self.inactive_blocks.contains(block),
"deactivate called on already-inactive block"
);
self.active_blocks.remove(block);
self.inactive_blocks.insert(block.clone());
}
/// Move block from inactive to active with ref_count=1. Returns true if found.
pub fn reactivate(&mut self, block: &UniqueBlock) -> bool {
if self.inactive_blocks.remove(block) {
self.active_blocks.insert(block.clone(), 1);
true
} else {
false
}
}
/// Check if total blocks (active + inactive) has reached max_capacity.
pub fn is_at_capacity(&self) -> bool {
self.active_blocks.len() + self.inactive_blocks.len() >= self.max_capacity
}
/// Get the number of active blocks.
pub fn num_active(&self) -> usize {
self.active_blocks.len()
}
/// Get the number of inactive blocks.
pub fn num_inactive(&self) -> usize {
self.inactive_blocks.len()
}
/// Get the maximum block capacity.
pub fn max_capacity(&self) -> usize {
self.max_capacity
}
/// Get the current capacity (active + inactive blocks).
pub fn current_capacity(&self) -> usize {
self.active_blocks.len() + self.inactive_blocks.len()
}
/// Iterate over active block keys.
pub fn active_keys(&self) -> impl Iterator<Item = &UniqueBlock> {
self.active_blocks.keys()
}
/// Iterate over inactive block keys.
pub fn inactive_keys(&self) -> impl Iterator<Item = &UniqueBlock> {
self.inactive_blocks.keys()
}
/// Direct access to active blocks map (for tests that check ref counts).
pub fn active_blocks(&self) -> &FxHashMap<UniqueBlock, usize> {
&self.active_blocks
}
}
......@@ -3,8 +3,6 @@
//! Cache data structures for KV block management.
pub mod hash_cache;
pub mod radix_cache;
pub use hash_cache::HashCache;
pub use radix_cache::RadixCache;
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::cmp::{Eq, Ordering};
use std::collections::BTreeSet;
use std::hash::Hash;
use rustc_hash::FxHashMap;
/// A wrapper for (T, counter) that implements Ord based only on counter
#[derive(Debug, Clone, Eq, PartialEq)]
struct PriorityItem<T> {
item: T,
counter: i64,
}
impl<T: Eq> Ord for PriorityItem<T> {
fn cmp(&self, other: &Self) -> Ordering {
self.counter.cmp(&other.counter)
}
}
impl<T: Eq> PartialOrd for PriorityItem<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
/// An LRU evictor that maintains objects and evicts them based on their
/// priority counter. Lower counter values are evicted first.
#[derive(Debug)]
pub struct LRUEvictor<T: Clone + Eq + Hash> {
free_table: FxHashMap<T, i64>,
priority_queue: BTreeSet<PriorityItem<T>>,
positive_counter: i64,
negative_counter: i64,
}
impl<T: Clone + Eq + Hash> Default for LRUEvictor<T> {
fn default() -> Self {
Self {
free_table: FxHashMap::default(),
priority_queue: BTreeSet::new(),
positive_counter: 0,
negative_counter: 0,
}
}
}
impl<T: Clone + Eq + Hash> LRUEvictor<T> {
pub fn new(_cleanup_threshold: usize) -> Self {
Self::default()
}
pub fn keys(&self) -> std::collections::hash_map::Keys<'_, T, i64> {
self.free_table.keys()
}
fn update(&mut self, object: T, counter: i64) {
self.free_table.insert(object.clone(), counter);
self.priority_queue.insert(PriorityItem {
item: object,
counter,
});
}
pub fn insert(&mut self, object: T) {
// Remove old entry if it exists
if let Some(&old_counter) = self.free_table.get(&object) {
self.priority_queue.remove(&PriorityItem {
item: object.clone(),
counter: old_counter,
});
}
// Increment positive counter and insert
self.positive_counter += 1;
let counter = self.positive_counter;
self.update(object, counter);
}
/// Push an object to the front with negative counter (highest priority for eviction)
pub fn push_front(&mut self, object: T) {
// Remove old entry if it exists
if let Some(&old_counter) = self.free_table.get(&object) {
self.priority_queue.remove(&PriorityItem {
item: object.clone(),
counter: old_counter,
});
}
// Decrement negative counter and insert
self.negative_counter -= 1;
let counter = self.negative_counter;
self.update(object, counter);
}
pub fn contains(&self, object: &T) -> bool {
self.free_table.contains_key(object)
}
/// Evict an object based on LRU policy (lowest counter value)
/// Returns the evicted object or None if no objects are available
pub fn evict(&mut self) -> Option<T> {
self.priority_queue.pop_first().map(|item| {
self.free_table.remove(&item.item);
item.item
})
}
pub fn remove(&mut self, object: &T) -> bool {
let Some(&counter) = self.free_table.get(object) else {
return false;
};
self.free_table.remove(object);
self.priority_queue.remove(&PriorityItem {
item: object.clone(),
counter,
});
true
}
pub fn len(&self) -> usize {
self.free_table.len()
}
pub fn is_empty(&self) -> bool {
self.free_table.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_lru_evictor_eviction_order() {
// Create a new LRUEvictor
let mut evictor = LRUEvictor::<i32>::new(1); // threshold value doesn't matter anymore
// Add items in the specified order
evictor.insert(4);
evictor.insert(3);
evictor.insert(2);
evictor.insert(1);
evictor.insert(5);
evictor.insert(1); // Updates counter for 1
evictor.insert(4); // Updates counter for 4
evictor.insert(2); // Updates counter for 2
evictor.push_front(4);
// Verify the eviction order
let evicted = evictor.evict().unwrap();
assert_eq!(evicted, 4);
let evicted = evictor.evict().unwrap();
assert_eq!(evicted, 3);
let evicted = evictor.evict().unwrap();
assert_eq!(evicted, 5);
let evicted = evictor.evict().unwrap();
assert_eq!(evicted, 1);
let evicted = evictor.evict().unwrap();
assert_eq!(evicted, 2);
let evicted = evictor.evict();
assert_eq!(evicted, None);
assert_eq!(evictor.len(), 0);
}
// ... existing test_push_front test ...
}
......@@ -4,7 +4,6 @@
//! Shared components used across all engine implementations.
pub mod bootstrap;
pub mod evictor;
pub mod kv_cache_trace;
pub mod perf_model;
pub mod protocols;
......
......@@ -13,7 +13,24 @@ use validator::Validate;
use crate::common::perf_model::PerfModel;
use dynamo_kv_router::protocols::KvCacheEvent;
use dynamo_tokens::blocks::UniqueBlock;
use dynamo_tokens::{BlockHash, SequenceHash, Token};
use dynamo_tokens::{BlockHash, PositionalLineageHash, SequenceHash, Token};
/// Metadata marker type for kvbm-logical blocks in the mocker's G1 pool.
#[derive(Clone, Debug)]
pub struct G1;
/// Eviction strategy for the kvbm-logical inactive pool.
///
/// `Lineage` is the default and matches kvbm-logical's own default — it evicts
/// leaf blocks first, which subsumes the preemption-priority behaviour that the
/// mocker's old `LRUEvictor::push_front` provided.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
pub enum MockerEvictionBackend {
Lru,
MultiLru,
#[default]
Lineage,
}
/// Trait for publishing KV cache events.
/// This abstracts the runtime dependency so mocker components can remain generic.
......@@ -142,12 +159,20 @@ pub enum MoveBlock {
Use(
Vec<UniqueBlock>,
Vec<BlockHash>,
Vec<PositionalLineageHash>,
Option<Vec<Vec<u32>>>,
Option<UniqueBlock>,
),
Destroy(Vec<UniqueBlock>),
Deref(Vec<UniqueBlock>),
Promote(Uuid, SequenceHash, Option<u64>, BlockHash, Option<Vec<u32>>),
Promote(
Uuid,
SequenceHash,
Option<u64>,
BlockHash,
PositionalLineageHash,
Option<Vec<u32>>,
),
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
......
......@@ -4,35 +4,41 @@
use crate::common::protocols::MoveBlock;
use derive_getters::Getters;
use dynamo_tokens::blocks::UniqueBlock;
use dynamo_tokens::{TokenBlockSequence, Tokens};
use dynamo_tokens::{PositionalLineageHash, TokenBlockSequence, Tokens};
use rand::random;
use validator::Validate;
/// Create unique blocks and block hashes from a TokenBlockSequence.
/// Create unique blocks, block hashes, and positional-lineage hashes from a
/// [`TokenBlockSequence`].
fn create_sequence_cache(
tokens: &TokenBlockSequence,
block_size: usize,
enable_prefix_caching: bool,
) -> (Vec<UniqueBlock>, Vec<u64>) {
) -> (Vec<UniqueBlock>, Vec<u64>, Vec<PositionalLineageHash>) {
let mut unique_blocks = Vec::with_capacity(tokens.blocks().len() + 1);
let mut block_hashes = Vec::with_capacity(tokens.blocks().len());
let mut plhs = Vec::with_capacity(tokens.blocks().len());
for block in tokens.blocks() {
for (pos, block) in tokens.blocks().iter().enumerate() {
block_hashes.push(block.block_hash());
unique_blocks.push({
if enable_prefix_caching {
UniqueBlock::FullBlock(block.sequence_hash())
unique_blocks.push(UniqueBlock::FullBlock(block.sequence_hash()));
plhs.push(block.positional_lineage_hash());
} else {
UniqueBlock::FullBlock(random::<u64>())
unique_blocks.push(UniqueBlock::FullBlock(random::<u64>()));
plhs.push(PositionalLineageHash::new(
random::<u64>(),
None,
pos as u64,
));
}
});
}
// Only push the partial block if tokens count isn't a multiple of block_size
if !tokens.total_tokens().is_multiple_of(block_size) {
unique_blocks.push(UniqueBlock::default());
}
(unique_blocks, block_hashes)
(unique_blocks, block_hashes, plhs)
}
/// A sequence that is actively being built, with the ability to add tokens and commit to hashes
......@@ -41,6 +47,7 @@ fn create_sequence_cache(
pub struct ActiveSequence {
unique_blocks: Vec<UniqueBlock>,
block_hashes: Vec<u64>,
plhs: Vec<PositionalLineageHash>,
tokens: TokenBlockSequence,
......@@ -80,12 +87,13 @@ impl ActiveSequence {
let num_input_tokens = tokens.len();
let tokens = Tokens::from(tokens).into_sequence(block_size as u32, Some(1337));
let (unique_blocks, block_hashes) =
let (unique_blocks, block_hashes, plhs) =
create_sequence_cache(&tokens, block_size, enable_prefix_caching);
let seq = Self {
unique_blocks,
block_hashes,
plhs,
tokens,
block_size,
max_output_tokens,
......@@ -132,6 +140,8 @@ impl ActiveSequence {
let hash_start = prev_blocks.min(self.block_hashes.len());
let hash_end = target_blocks.min(self.block_hashes.len());
let hashes = self.block_hashes[hash_start..hash_end].to_vec();
// Cached per-sequence PLHs (stable across calls).
let plhs = self.plhs[hash_start..hash_end].to_vec();
let token_ids = if self.emit_token_ids && hash_start < hash_end {
Some(
......@@ -149,7 +159,17 @@ impl ActiveSequence {
} else {
None
};
Some(MoveBlock::Use(blocks, hashes, token_ids, parent))
Some(MoveBlock::Use(blocks, hashes, plhs, token_ids, parent))
}
/// Positional lineage hashes for all fully-tokenised blocks in the sequence.
/// Mirrors `block_hashes()` but returns the PLH identity used by kvbm-logical.
pub fn positional_lineage_hashes(&self) -> Vec<PositionalLineageHash> {
self.tokens
.blocks()
.iter()
.map(|block| block.positional_lineage_hash())
.collect()
}
/// Commit a successful allocation by advancing `num_allocated_tokens`.
......@@ -209,12 +229,22 @@ impl ActiveSequence {
random::<u64>()
};
let last_block_hash = last_complete.block_hash();
// Same randomization story as `last_seq_hash`: with prefix caching off,
// two identical prompts must not share blocks, so the PLH we promote
// with must also be unique — otherwise `process_promote`'s
// `match_blocks(&[plh])` lookup would reuse another request's block.
let last_plh = if self.enable_prefix_caching {
last_complete.positional_lineage_hash()
} else {
PositionalLineageHash::new(random::<u64>(), None, self.block_hashes.len() as u64)
};
let promote_token_ids = if self.emit_token_ids {
Some(last_complete.tokens().to_vec())
} else {
None
};
self.block_hashes.push(last_block_hash);
self.plhs.push(last_plh);
self.unique_blocks.pop();
// After pop, the last element is the parent block
......@@ -230,13 +260,20 @@ impl ActiveSequence {
last_seq_hash,
second_to_last_hash,
last_block_hash,
last_plh,
promote_token_ids,
));
}
let new_partial_block = UniqueBlock::default();
self.unique_blocks.push(new_partial_block.clone());
signals.push(MoveBlock::Use(vec![new_partial_block], vec![], None, None));
signals.push(MoveBlock::Use(
vec![new_partial_block],
vec![],
vec![],
None,
None,
));
Some(signals)
}
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! # KV Manager (kvbm-logical G1 backend)
//!
//! Synchronous vLLM-flavour G1 block manager built on `kvbm-logical::BlockManager<G1>`.
//! Translates the mocker's `MoveBlock` protocol into the RAII lifecycle
//! (allocate → stage → register → drop) exposed by kvbm-logical.
//!
//! ## MoveBlock semantics
//!
//! - **Use**: check active pool → clone `ImmutableBlock` to bump refcount; check
//! active+inactive via `match_blocks(plh)` → reactivate; otherwise allocate a
//! new `MutableBlock`, stage with PLH, and register. On capacity exhaustion
//! returns partial count so the scheduler can preempt the oldest running
//! request.
//! - **Destroy**: drop all RAII handles for the block. Emits a `Removed` KV
//! event to match the mocker's existing router protocol.
//! - **Deref**: pop one `ImmutableBlock` clone; when the vec empties, the block
//! transitions to kvbm-logical's inactive pool (RAII return).
//! - **Promote**: PartialBlock (`MutableBlock`) → FullBlock (`ImmutableBlock`).
//! Collapses onto an existing registered handle if the PLH / SequenceHash is
//! already present; otherwise stages + registers a new block.
//!
//! ## Eviction backends
//!
//! Three backends are exposed via [`MockerEvictionBackend`]:
//! - `Lineage` (default) — parent-chain aware, evicts leaves first. Subsumes
//! the `push_front` preemption-priority behaviour of the old `LRUEvictor`.
//! - `Lru` — simple recency-based LRU.
//! - `MultiLru` — 4-tier frequency-aware LRU (requires TinyLFU tracker).
use std::collections::HashMap;
use std::sync::Arc;
use dynamo_kv_router::protocols::{
ExternalSequenceBlockHash, KvCacheEvent, KvCacheEventData, KvCacheRemoveData, KvCacheStoreData,
KvCacheStoredBlockData, LocalBlockHash,
};
use dynamo_tokens::blocks::UniqueBlock;
use dynamo_tokens::{BlockHash, PositionalLineageHash, SequenceHash};
use kvbm_logical::registry::BlockRegistry;
use kvbm_logical::tinylfu::TinyLFUTracker;
use kvbm_logical::{BlockManager, ImmutableBlock, MutableBlock};
use uuid::Uuid;
use crate::common::kv_cache_trace;
use crate::common::protocols::{
G1, KvEventPublishers, MockerEvictionBackend, MoveBlock, PrefillCost,
};
use crate::common::sequence::ActiveSequence;
/// Classification for each block processed inside `Use`.
///
/// - `ActiveHit`: block is already pinned in `active_full` / `active_partial`;
/// we just bump our local refcount (handle clone).
/// - `InactiveHit`: block was in kvbm-logical's inactive pool and was
/// reactivated via `match_blocks(plh)`.
/// - `NewStore`: block was freshly allocated, staged, and registered.
///
/// The router radix tree already knows about `ActiveHit` and `InactiveHit`
/// (it only forgets on explicit `Removed`), so only `NewStore` should emit a
/// `Stored` KV event. Both hit outcomes still advance the parent cursor so
/// subsequent `NewStore` batches anchor to the last reused full block.
enum UseOutcome {
ActiveHit,
InactiveHit,
NewStore,
}
/// Synchronous G1 KV block manager backed by `kvbm-logical::BlockManager<G1>`.
pub struct KvManager {
block_manager: BlockManager<G1>,
max_capacity: usize,
block_size: usize,
kv_event_publishers: KvEventPublishers,
dp_rank: u32,
next_event_id: u64,
/// PartialBlocks (still filling tokens) held as `MutableBlock`.
/// Dropped blocks return to kvbm-logical's reset pool.
active_partial: HashMap<Uuid, MutableBlock<G1>>,
/// FullBlocks held as `ImmutableBlock`, keyed by `SequenceHash`. The vec
/// length is the mocker's reference count — each `Use` pushes a clone,
/// each `Deref` pops one. When the vec empties, the block transitions to
/// kvbm-logical's inactive pool (RAII return on drop of the last clone).
active_full: HashMap<SequenceHash, Vec<ImmutableBlock<G1>>>,
/// Shadow registry of (PLH → mocker u64 seq_hash) for every block that has
/// been registered in kvbm-logical. kvbm-logical's registry is keyed by
/// `PositionalLineageHash`, but the router's radix tree is keyed by the
/// mocker's u64 `SequenceHash` on `UniqueBlock::FullBlock`. We keep this
/// map so we can emit router-compatible `Removed` events when kvbm-logical
/// evicts inactive blocks as a side effect of `allocate_blocks_with_evictions`.
registered_plhs: HashMap<PositionalLineageHash, SequenceHash>,
}
impl KvManager {
pub fn new_with_event_sink(
max_capacity: usize,
block_size: usize,
kv_event_publishers: KvEventPublishers,
dp_rank: u32,
) -> Self {
Self::new_with_eviction_backend(
max_capacity,
block_size,
kv_event_publishers,
dp_rank,
MockerEvictionBackend::default(),
)
}
pub fn new_with_eviction_backend(
max_capacity: usize,
block_size: usize,
kv_event_publishers: KvEventPublishers,
dp_rank: u32,
eviction_backend: MockerEvictionBackend,
) -> Self {
debug_assert!(max_capacity > 0, "max_capacity must be > 0");
let mut registry_builder = BlockRegistry::builder();
if matches!(eviction_backend, MockerEvictionBackend::MultiLru) {
let tracker = Arc::new(TinyLFUTracker::new(max_capacity));
registry_builder = registry_builder.frequency_tracker(tracker);
}
let registry = registry_builder.build();
let mut mgr_builder = BlockManager::builder()
.block_count(max_capacity)
.block_size(block_size)
.registry(registry);
mgr_builder = match eviction_backend {
MockerEvictionBackend::Lineage => mgr_builder.with_lineage_backend(),
MockerEvictionBackend::Lru => mgr_builder.with_lru_backend(),
MockerEvictionBackend::MultiLru => mgr_builder.with_multi_lru_backend(),
};
let block_manager = mgr_builder.build().expect("BlockManager build failed");
if !kv_event_publishers.is_empty() {
tracing::info!(
"KvManager initialized with event sink for DP rank {dp_rank} with block_size {block_size}, eviction={eviction_backend:?}"
);
}
Self {
block_manager,
max_capacity,
block_size,
kv_event_publishers,
dp_rank,
next_event_id: 0,
active_partial: HashMap::new(),
active_full: HashMap::new(),
registered_plhs: HashMap::new(),
}
}
/// Emit a `Stored` or `Removed` KV event to the router.
/// Ported verbatim from the old `vllm_backend::publish_kv_event` to
/// preserve KV-aware routing semantics (parent_hash chaining, token_ids).
fn publish_kv_event(
&mut self,
full_blocks: Vec<SequenceHash>,
local_hashes: &[BlockHash],
parent_hash: Option<u64>,
is_store: bool,
token_ids: Option<Vec<Vec<u32>>>,
) {
if full_blocks.is_empty() {
return;
}
kv_cache_trace::log_vllm_trace(
if is_store { "allocation" } else { "eviction" },
self.dp_rank,
self.block_size,
self.num_active_blocks(),
self.num_inactive_blocks(),
self.max_capacity,
);
if self.kv_event_publishers.is_empty() {
return;
}
let event_data = if is_store {
// `local_hashes` is either empty (caller has no token-derived
// hashes to publish) or 1:1 with `full_blocks`. Match the
// front-door contract in `process_use`.
debug_assert!(
local_hashes.is_empty() || local_hashes.len() == full_blocks.len(),
"publish_kv_event: local_hashes must be empty or 1:1 with full_blocks ({} vs {})",
local_hashes.len(),
full_blocks.len(),
);
KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: parent_hash.map(ExternalSequenceBlockHash),
start_position: None,
blocks: full_blocks
.into_iter()
.enumerate()
.map(|(i, global_hash)| KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(global_hash),
tokens_hash: LocalBlockHash(
local_hashes.get(i).copied().unwrap_or_default(),
),
mm_extra_info: None,
})
.collect(),
})
} else {
KvCacheEventData::Removed(KvCacheRemoveData {
block_hashes: full_blocks
.into_iter()
.map(ExternalSequenceBlockHash)
.collect(),
})
};
let event_id = self.next_event_id;
self.next_event_id += 1;
let event = KvCacheEvent {
event_id,
data: event_data,
dp_rank: self.dp_rank,
};
if let Err(e) = self
.kv_event_publishers
.publish(event, token_ids.as_deref())
{
tracing::warn!("Failed to publish KV event: {e}");
}
}
/// Process a `MoveBlock` instruction synchronously.
///
/// For `MoveBlock::Use`, returns the number of blocks successfully allocated.
/// On partial failure, blocks `0..N` are committed but block `N+1` could not
/// be allocated (capacity exhausted); the scheduler uses this to trigger
/// preemption.
///
/// For `Destroy` / `Deref` / `Promote`, returns 1 on success and panics on
/// invalid state (consistent with the old `vllm_backend` semantics).
pub fn process(&mut self, event: &MoveBlock) -> usize {
match event {
MoveBlock::Use(blocks, local_hashes, plhs, token_ids, parent) => self.process_use(
blocks,
local_hashes,
plhs,
token_ids.as_deref(),
parent.as_ref(),
),
MoveBlock::Destroy(hashes) => {
self.process_destroy(hashes);
1
}
MoveBlock::Deref(hashes) => {
self.process_deref(hashes);
1
}
MoveBlock::Promote(uuid, seq_hash, parent_hash, local_hash, plh, token_ids) => {
self.process_promote(
*uuid,
*seq_hash,
*parent_hash,
*local_hash,
*plh,
token_ids.clone(),
);
1
}
}
}
fn process_use(
&mut self,
blocks: &[UniqueBlock],
local_hashes: &[BlockHash],
plhs: &[PositionalLineageHash],
token_ids: Option<&[Vec<u32>]>,
parent: Option<&UniqueBlock>,
) -> usize {
// Upstream invariant: caller must supply exactly one PLH per FullBlock in
// `blocks`.
let expected_full_blocks = blocks
.iter()
.filter(|b| matches!(b, UniqueBlock::FullBlock(_)))
.count();
assert_eq!(
plhs.len(),
expected_full_blocks,
"Use: plhs.len() must match FullBlock count in blocks"
);
assert!(
local_hashes.is_empty() || local_hashes.len() == expected_full_blocks,
"Use: local_hashes must be empty or match FullBlock count ({} vs {})",
local_hashes.len(),
expected_full_blocks,
);
let mut blocks_stored = Vec::<SequenceHash>::new();
let mut stored_local_hashes = Vec::<BlockHash>::new();
let mut stored_token_ids: Option<Vec<Vec<u32>>> = token_ids.map(|_| Vec::new());
let mut evicted_plhs = Vec::<PositionalLineageHash>::new();
let mut parent_block: Option<&UniqueBlock> = parent;
let mut plh_idx = 0usize;
let mut allocated = 0usize;
for (i, block) in blocks.iter().enumerate() {
let outcome = match block {
UniqueBlock::FullBlock(seq_hash) => {
// Active hit — bump refcount by cloning the first handle.
if let Some(vec) = self.active_full.get_mut(seq_hash) {
let cloned = vec[0].clone();
vec.push(cloned);
plh_idx += 1;
UseOutcome::ActiveHit
} else {
// Not active: try inactive via PLH lookup, else allocate fresh.
let plh = plhs[plh_idx];
plh_idx += 1;
let matched = self.block_manager.match_blocks(&[plh]);
if let Some(immutable) = matched.into_iter().next() {
self.active_full
.entry(*seq_hash)
.or_default()
.push(immutable);
UseOutcome::InactiveHit
} else {
let Some((mut alloc, evicted)) =
self.block_manager.allocate_blocks_with_evictions(1)
else {
break; // capacity exhausted; scheduler will preempt
};
evicted_plhs.extend(evicted);
let mutable = alloc.pop().unwrap();
let complete =
mutable.stage(plh, self.block_size).expect("stage failed");
let immutable = self.block_manager.register_block(complete);
self.active_full
.entry(*seq_hash)
.or_default()
.push(immutable);
self.registered_plhs.insert(plh, *seq_hash);
UseOutcome::NewStore
}
}
}
UniqueBlock::PartialBlock(uuid) => {
if self.active_partial.contains_key(uuid) {
UseOutcome::ActiveHit
} else {
let Some((mut alloc, evicted)) =
self.block_manager.allocate_blocks_with_evictions(1)
else {
break;
};
evicted_plhs.extend(evicted);
let mutable = alloc.pop().unwrap();
self.active_partial.insert(*uuid, mutable);
UseOutcome::ActiveHit
}
}
};
match outcome {
UseOutcome::ActiveHit | UseOutcome::InactiveHit => {
// Router already has this block; no `Stored` event.
// Advance the parent cursor across the reused prefix so any
// subsequent `NewStore` batches anchor at the last reused
// full block.
if matches!(block, UniqueBlock::FullBlock(_)) {
parent_block = Some(block);
}
}
UseOutcome::NewStore => {
// Freshly registered: announce to router.
// NOTE: we do NOT advance `parent_block` here — within a
// single `Stored` event, consecutive blocks chain via their
// position in `blocks[]`, so `parent_hash` must remain the
// block *before* the first newly-stored one.
if let UniqueBlock::FullBlock(seq_hash) = block {
blocks_stored.push(*seq_hash);
if let Some(lh) = local_hashes.get(i) {
stored_local_hashes.push(*lh);
}
if let (Some(ref mut stids), Some(ids)) =
(stored_token_ids.as_mut(), token_ids)
{
stids.push(ids[i].clone());
}
}
}
}
allocated += 1;
}
let parent_hash = match parent_block {
None => None,
Some(UniqueBlock::FullBlock(block)) => Some(*block),
Some(UniqueBlock::PartialBlock(_)) => panic!("parent block cannot be partial"),
};
self.publish_kv_event(
blocks_stored,
&stored_local_hashes,
parent_hash,
true,
stored_token_ids,
);
// Translate any blocks kvbm-logical evicted from its inactive pool
// during the allocations above into router `Removed` events.
if !evicted_plhs.is_empty() {
let evicted: Vec<SequenceHash> = evicted_plhs
.into_iter()
.filter_map(|plh| self.registered_plhs.remove(&plh))
.collect();
if !evicted.is_empty() {
self.publish_kv_event(evicted, &[], None, false, None);
}
}
allocated
}
/// Process a `MoveBlock::Destroy` instruction.
///
/// Contract difference vs. the legacy `vllm_backend`: in the old manual
/// backend `Destroy(FullBlock)` physically removed the block from the
/// cache surface. Here we only drop all active RAII handles for the
/// block; kvbm-logical keeps the `Registered` state alive and the block
/// transitions to the **inactive pool**, where it remains matchable via
/// `match_blocks(plh)` until an allocation forces its eviction. We still
/// emit a router `Removed` event for protocol parity, but callers should
/// be aware that a subsequent `Use` on the same PLH will reactivate the
/// same physical block from inactive (an `InactiveHit`) rather than
/// allocating fresh.
///
/// In the current scheduler flow `Destroy(FullBlock)` is not exercised
/// (preemption goes through `Deref` + `reset_with_signal`), so this
/// divergence is effectively dormant — see `test_destroy_full_block`.
fn process_destroy(&mut self, blocks: &[UniqueBlock]) {
let mut destroyed = Vec::<SequenceHash>::new();
for block in blocks {
match block {
UniqueBlock::PartialBlock(uuid) => {
self.active_partial
.remove(uuid)
.expect("Destroy: partial block not in active pool");
}
UniqueBlock::FullBlock(seq_hash) => {
self.active_full
.remove(seq_hash)
.expect("Destroy: full block not in active pool");
destroyed.push(*seq_hash);
}
}
}
self.publish_kv_event(destroyed, &[], None, false, None);
}
fn process_deref(&mut self, blocks: &[UniqueBlock]) {
for block in blocks {
match block {
UniqueBlock::PartialBlock(_) => {
panic!("Deref on PartialBlock is not valid");
}
UniqueBlock::FullBlock(seq_hash) => {
let vec = self
.active_full
.get_mut(seq_hash)
.expect("Deref: full block not in active pool");
vec.pop();
if vec.is_empty() {
self.active_full.remove(seq_hash);
}
}
}
}
}
fn process_promote(
&mut self,
uuid: Uuid,
seq_hash: SequenceHash,
parent_hash: Option<u64>,
local_hash: BlockHash,
plh: PositionalLineageHash,
token_ids: Option<Vec<u32>>,
) {
let mutable = self
.active_partial
.remove(&uuid)
.expect("Promote: partial block not found");
// Detect collision: seq_hash already has registered handles (active or inactive).
let is_new = if let Some(vec) = self.active_full.get_mut(&seq_hash) {
// Collision on active pool — drop MutableBlock, clone existing handle.
drop(mutable);
let existing = vec[0].clone();
vec.push(existing);
false
} else if let Some(immutable) = self.block_manager.match_blocks(&[plh]).into_iter().next() {
// Collision on inactive pool — reactivate existing handle.
drop(mutable);
self.active_full.insert(seq_hash, vec![immutable]);
false
} else {
// Fresh registration.
let complete = mutable
.stage(plh, self.block_size)
.expect("stage failed during promote");
let immutable = self.block_manager.register_block(complete);
self.active_full.insert(seq_hash, vec![immutable]);
self.registered_plhs.insert(plh, seq_hash);
true
};
if is_new {
self.publish_kv_event(
vec![seq_hash],
&[local_hash],
parent_hash,
true,
token_ids.map(|t| vec![t]),
);
}
}
/// Number of **distinct** physically-resident KV blocks currently pinned
/// by mocker (not available for eviction).
pub fn num_active_blocks(&self) -> usize {
// kvbm-logical partitions physical blocks into three pools:
// total = reset + inactive + active
// where `available = reset + inactive`. So `total - available` is
// exactly the number of registered (ImmutableBlock) full blocks.
self.block_manager.total_blocks() - self.block_manager.available_blocks()
}
/// Total number of held RAII handles (refcount-style): one per held
/// `MutableBlock` plus one per cloned `ImmutableBlock` in `active_full`.
/// Shared-prefix reuse inflates this above the distinct-block count.
pub fn num_active_block_refs(&self) -> usize {
self.active_partial.len() + self.active_full.values().map(|v| v.len()).sum::<usize>()
}
pub fn get_active_perc(&self) -> f64 {
self.num_active_blocks() as f64 / self.max_capacity as f64
}
pub fn num_inactive_blocks(&self) -> usize {
self.block_manager.metrics().snapshot().inactive_pool_size as usize
}
pub fn max_capacity(&self) -> usize {
self.max_capacity
}
pub fn block_size(&self) -> usize {
self.block_size
}
pub fn dp_rank(&self) -> u32 {
self.dp_rank
}
/// Calculate the prefill cost for a sequence by scanning `unique_blocks` in
/// order and counting the longest prefix that is cached (active or
/// inactive). Stops at first cache miss — KV states are computed
/// sequentially, so anything after a miss must be recomputed.
pub fn get_prefill_cost(&self, sequence: &ActiveSequence) -> PrefillCost {
let seq_blocks = sequence.unique_blocks();
// Without prefix caching, each `UniqueBlock::FullBlock` carries a
// randomised hash that can't possibly be in the cache across requests
// — skip the PLH lookup (PLH is deterministic from tokens) to stay
// consistent with that no-reuse contract.
let overlap_blocks = if sequence.enable_prefix_caching() {
let plhs = sequence.positional_lineage_hashes();
let mut overlap = 0;
for (i, block) in seq_blocks.iter().enumerate() {
match block {
UniqueBlock::FullBlock(seq_hash) => {
if self.active_full.contains_key(seq_hash) {
overlap += 1;
continue;
}
let Some(plh) = plhs.get(i).copied() else {
break;
};
let presence = self
.block_manager
.block_registry()
.check_presence::<G1>(&[plh]);
if presence.first().is_some_and(|(_, present)| *present) {
overlap += 1;
} else {
break;
}
}
UniqueBlock::PartialBlock(_) => break,
}
}
overlap
} else {
0
};
let new_blocks = seq_blocks.len() - overlap_blocks;
let cached_tokens = (overlap_blocks * self.block_size).min(sequence.num_input_tokens());
let new_tokens = sequence.num_input_tokens() - cached_tokens;
PrefillCost {
new_blocks,
new_tokens,
cached_tokens,
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Mutex;
use super::*;
use crate::common::protocols::KvCacheEventSink;
/// Capturing event sink for router-publication assertions.
#[derive(Default)]
struct CapturingSink {
events: Mutex<Vec<KvCacheEvent>>,
}
impl KvCacheEventSink for CapturingSink {
fn publish(&self, event: KvCacheEvent) -> anyhow::Result<()> {
self.events.lock().unwrap().push(event);
Ok(())
}
}
fn make_mgr(capacity: usize, block_size: usize) -> KvManager {
KvManager::new_with_event_sink(capacity, block_size, KvEventPublishers::default(), 0)
}
fn make_mgr_capturing(capacity: usize, block_size: usize) -> (KvManager, Arc<CapturingSink>) {
let sink = Arc::new(CapturingSink::default());
let publishers = KvEventPublishers::new(Some(sink.clone() as _), None);
(
KvManager::new_with_event_sink(capacity, block_size, publishers, 0),
sink,
)
}
fn plh(v: u64) -> PositionalLineageHash {
PositionalLineageHash::new(v, None, 0)
}
fn use_full(mgr: &mut KvManager, seq_hash: u64, p: PositionalLineageHash) -> usize {
mgr.process(&MoveBlock::Use(
vec![UniqueBlock::FullBlock(seq_hash)],
vec![],
vec![p],
None,
None,
))
}
fn use_partial(mgr: &mut KvManager, uuid: Uuid) -> usize {
mgr.process(&MoveBlock::Use(
vec![UniqueBlock::PartialBlock(uuid)],
vec![],
vec![],
None,
None,
))
}
fn deref_full(mgr: &mut KvManager, seq_hash: u64) {
mgr.process(&MoveBlock::Deref(vec![UniqueBlock::FullBlock(seq_hash)]));
}
fn destroy_full(mgr: &mut KvManager, seq_hash: u64) {
mgr.process(&MoveBlock::Destroy(vec![UniqueBlock::FullBlock(seq_hash)]));
}
#[test]
fn test_use_single_full_block() {
let mut mgr = make_mgr(10, 16);
assert_eq!(use_full(&mut mgr, 1, plh(100)), 1);
assert_eq!(mgr.num_active_blocks(), 1);
}
#[test]
fn test_duplicate_use_bumps_refcount() {
let mut mgr = make_mgr(10, 16);
use_full(&mut mgr, 1, plh(100));
use_full(&mut mgr, 1, plh(100));
// Same seq_hash used twice: only one distinct physical block is
// resident, but the mocker holds two RAII handles.
assert_eq!(mgr.num_active_blocks(), 1);
assert_eq!(mgr.num_active_block_refs(), 2);
}
#[test]
fn test_capacity_exhaustion_returns_partial() {
let mut mgr = make_mgr(4, 16);
for i in 0..4 {
assert_eq!(use_full(&mut mgr, i, plh(i + 100)), 1);
}
// Fifth allocation fails - returns 0 (no blocks allocated)
assert_eq!(use_full(&mut mgr, 4, plh(500)), 0);
}
#[test]
fn test_deref_returns_to_inactive() {
let mut mgr = make_mgr(4, 16);
use_full(&mut mgr, 1, plh(100));
deref_full(&mut mgr, 1);
assert_eq!(mgr.num_active_blocks(), 0);
}
#[test]
fn test_inactive_reuse_via_match_blocks() {
let mut mgr = make_mgr(10, 16);
let p = plh(100);
use_full(&mut mgr, 1, p);
deref_full(&mut mgr, 1);
// Use with same PLH reuses the inactive block.
assert_eq!(use_full(&mut mgr, 2, p), 1);
}
#[test]
fn test_eviction_frees_inactive_for_new_allocation() {
let mut mgr = make_mgr(4, 16);
for i in 0..4 {
use_full(&mut mgr, i, plh(i + 100));
}
for i in 0..4 {
deref_full(&mut mgr, i);
}
for i in 10..14 {
assert_eq!(use_full(&mut mgr, i, plh(i + 1000)), 1);
}
assert_eq!(mgr.num_active_blocks(), 4);
}
#[test]
fn test_promote_basic() {
let mut mgr = make_mgr(10, 16);
let uuid = Uuid::new_v4();
use_partial(&mut mgr, uuid);
mgr.process(&MoveBlock::Promote(uuid, 42, None, 0, plh(500), None));
assert_eq!(mgr.num_active_blocks(), 1);
assert!(mgr.active_partial.is_empty());
assert!(mgr.active_full.contains_key(&42));
}
#[test]
#[should_panic(expected = "Promote: partial block not found")]
fn test_promote_nonexistent_panics() {
let mut mgr = make_mgr(10, 16);
mgr.process(&MoveBlock::Promote(
Uuid::new_v4(),
42,
None,
0,
plh(500),
None,
));
}
#[test]
#[should_panic(expected = "Deref on PartialBlock is not valid")]
fn test_deref_on_partial_panics() {
let mut mgr = make_mgr(10, 16);
let uuid = Uuid::new_v4();
use_partial(&mut mgr, uuid);
mgr.process(&MoveBlock::Deref(vec![UniqueBlock::PartialBlock(uuid)]));
}
#[test]
fn test_destroy_full_block() {
let (mut mgr, sink) = make_mgr_capturing(10, 16);
use_full(&mut mgr, 1, plh(100));
assert_eq!(mgr.num_active_blocks(), 1);
assert_eq!(mgr.num_inactive_blocks(), 0);
destroy_full(&mut mgr, 1);
// Active handles released — but kvbm-logical keeps the Registered
// state alive and the block lands in the *inactive* pool, still
// matchable via `match_blocks(plh)`.
assert_eq!(mgr.num_active_blocks(), 0);
assert_eq!(
mgr.num_inactive_blocks(),
1,
"Destroy must leave the block in kvbm-logical's inactive pool"
);
// Router-visible protocol parity: we still emit a `Removed` event
// for the caller.
let events = sink.events.lock().unwrap();
let removed_count = events
.iter()
.filter(|e| matches!(e.data, KvCacheEventData::Removed(_)))
.count();
assert_eq!(removed_count, 1, "Destroy must emit one Removed event");
}
#[test]
fn test_prefill_cost_no_overlap() {
let mgr = make_mgr(10, 16);
let tokens: Vec<u32> = (0..35).collect();
let seq = ActiveSequence::new(tokens, 10, Some(16), true, false);
let cost = mgr.get_prefill_cost(&seq);
assert_eq!(cost.new_blocks, seq.unique_blocks().len());
assert_eq!(cost.new_tokens, 35);
}
#[test]
fn test_eviction_backend_lru_and_multi_lru() {
for backend in [MockerEvictionBackend::Lru, MockerEvictionBackend::MultiLru] {
let mut mgr = KvManager::new_with_eviction_backend(
4,
16,
KvEventPublishers::default(),
0,
backend,
);
for i in 0..4u64 {
assert_eq!(use_full(&mut mgr, i, plh(i + 100)), 1);
}
for i in 0..4u64 {
deref_full(&mut mgr, i);
}
for i in 10..14u64 {
assert_eq!(
use_full(&mut mgr, i, plh(i + 1000)),
1,
"backend={backend:?}"
);
}
assert_eq!(mgr.num_active_blocks(), 4);
}
}
#[test]
fn test_failure_on_max_capacity() {
fn use_batch(mgr: &mut KvManager, ids: &[u64]) -> usize {
let blocks: Vec<_> = ids.iter().map(|&id| UniqueBlock::FullBlock(id)).collect();
let plhs: Vec<_> = ids.iter().map(|&id| plh(id)).collect();
mgr.process(&MoveBlock::Use(blocks, vec![], plhs, None, None))
}
let mut mgr = make_mgr(10, 16);
// Fill capacity in a single Use batch.
let ids: Vec<u64> = (0..10).collect();
assert_eq!(use_batch(&mut mgr, &ids), 10, "all 10 should allocate");
assert_eq!(mgr.num_active_blocks(), 10);
// One more block must return 0 (no partial allocation possible, not panic).
assert_eq!(
use_batch(&mut mgr, &[10]),
0,
"over-capacity Use must return 0"
);
}
#[test]
fn test_block_lifecycle_stringent() {
// Batch helpers local to this test. Each FullBlock gets a unique PLH
// derived from its id so the PLH->SequenceHash mapping stays 1:1.
fn use_blocks(mgr: &mut KvManager, ids: &[u64]) {
let blocks: Vec<_> = ids.iter().map(|&id| UniqueBlock::FullBlock(id)).collect();
let plhs: Vec<_> = ids.iter().map(|&id| plh(id)).collect();
mgr.process(&MoveBlock::Use(blocks, vec![], plhs, None, None));
}
fn destroy_blocks(mgr: &mut KvManager, ids: &[u64]) {
let blocks = ids.iter().map(|&id| UniqueBlock::FullBlock(id)).collect();
mgr.process(&MoveBlock::Destroy(blocks));
}
fn deref_blocks(mgr: &mut KvManager, ids: &[u64]) {
let blocks = ids.iter().map(|&id| UniqueBlock::FullBlock(id)).collect();
mgr.process(&MoveBlock::Deref(blocks));
}
fn refcount(mgr: &KvManager, id: u64) -> usize {
mgr.active_full.get(&id).map(|v| v.len()).unwrap_or(0)
}
fn assert_active(mgr: &KvManager, expected: &[(u64, usize)]) {
let distinct = expected.len();
let total_refs: usize = expected.iter().map(|&(_, r)| r).sum();
assert_eq!(
mgr.num_active_blocks(),
distinct,
"distinct active-block count mismatch; expected={expected:?}"
);
assert_eq!(
mgr.num_active_block_refs(),
total_refs,
"active handle-refcount mismatch; expected={expected:?}"
);
for &(id, r) in expected {
assert_eq!(refcount(mgr, id), r, "block {id} refcount mismatch");
}
}
// Inactive membership helper. Uses `check_presence::<G1>` (non-mutating)
// against a snapshot of PLHs to confirm each expected id is present in
// kvbm-logical AND absent from `active_full`. Also checks total count
// matches so we catch stray inactive entries too.
//
// NOTE: under kvbm-logical, `Destroy` removes the block from
// `active_full` but the `ImmutableBlock` drop returns it to the
// inactive pool — so a destroyed block appears as inactive here until
// evicted. This differs from the old `HashCache` where `Destroy`
// removed the block entirely.
fn assert_inactive_blocks(mgr: &KvManager, expected_ids: &[u64]) {
assert_eq!(
mgr.num_inactive_blocks(),
expected_ids.len(),
"inactive count mismatch; expected={expected_ids:?}"
);
let plhs: Vec<_> = expected_ids.iter().map(|&id| plh(id)).collect();
let presence = mgr
.block_manager
.block_registry()
.check_presence::<G1>(&plhs);
for ((_, present), &id) in presence.iter().zip(expected_ids.iter()) {
assert!(
*present,
"block {id} expected in inactive pool, not found in registry"
);
assert!(
!mgr.active_full.contains_key(&id),
"block {id} expected inactive but is in active pool"
);
}
}
let mut mgr = make_mgr(10, 16);
// Use blocks 0..=4, then 0, 1, 5, 6 — 0 and 1 bump refcount to 2.
use_blocks(&mut mgr, &[0, 1, 2, 3, 4]);
use_blocks(&mut mgr, &[0, 1, 5, 6]);
assert_active(
&mgr,
&[(0, 2), (1, 2), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1)],
);
// Destroy block 4; deref 0, 1, 2, 3. 2 and 3 drop to inactive via
// RAII; 4 is destroyed (removed from active_full + Removed emitted)
// but kvbm-logical still has it in the inactive pool.
destroy_blocks(&mut mgr, &[4]);
deref_blocks(&mut mgr, &[0, 1, 2, 3]);
assert_active(&mgr, &[(0, 1), (1, 1), (5, 1), (6, 1)]);
assert_inactive_blocks(&mgr, &[2, 3, 4]);
// Destroy block 6; deref 0, 1, 5. Active drains; inactive = {0..=6}.
destroy_blocks(&mut mgr, &[6]);
deref_blocks(&mut mgr, &[0, 1, 5]);
assert_active(&mgr, &[]);
assert_inactive_blocks(&mgr, &[0, 1, 2, 3, 4, 5, 6]);
// Re-use 0, 1, 2 (reactivates from inactive) + 7, 8, 9 (new, 3 free
// slots). No eviction needed — inactive shrinks to {3, 4, 5, 6}.
use_blocks(&mut mgr, &[0, 1, 2, 7, 8, 9]);
assert_active(&mgr, &[(0, 1), (1, 1), (2, 1), (7, 1), (8, 1), (9, 1)]);
assert_inactive_blocks(&mgr, &[3, 4, 5, 6]);
// Allocate through capacity: 10, 11, 12 force eviction of 3 inactive
// entries. Exact survivor depends on eviction order (Lineage/LRU), so
// only assert count.
use_blocks(&mut mgr, &[10, 11, 12]);
assert_eq!(mgr.num_active_blocks(), 9);
assert_eq!(mgr.num_inactive_blocks(), 1);
// One more block keeps us at full capacity without panicking.
use_blocks(&mut mgr, &[13]);
assert_eq!(mgr.num_active_blocks(), 10);
assert_eq!(mgr.num_inactive_blocks(), 0);
}
#[test]
fn test_chunked_prefill_parent_hash() {
let block_size = 64;
let tokens: Vec<u32> = (0..512).collect(); // 8 full blocks
let mut seq = ActiveSequence::new(tokens, 100, Some(block_size), true, false);
let (mut mgr, sink) = make_mgr_capturing(256, block_size);
// Chunk 1: blocks 0..=3 (cumulative 256 tokens).
let signal = seq.prepare_allocation(256).unwrap();
mgr.process(&signal);
seq.commit_allocation(256);
// Chunk 2: blocks 4..=7 (cumulative 512 tokens).
let signal = seq.prepare_allocation(512).unwrap();
mgr.process(&signal);
seq.commit_allocation(512);
let events = sink.events.lock().unwrap();
assert_eq!(events.len(), 2, "expected two Stored events");
let KvCacheEventData::Stored(ref store1) = events[0].data else {
panic!("expected Stored event");
};
assert!(
store1.parent_hash.is_none(),
"first chunk should have no parent_hash"
);
let KvCacheEventData::Stored(ref store2) = events[1].data else {
panic!("expected Stored event");
};
let UniqueBlock::FullBlock(expected_hash) = seq.unique_blocks()[3].clone() else {
panic!("expected FullBlock at index 3");
};
assert_eq!(
store2.parent_hash,
Some(ExternalSequenceBlockHash(expected_hash)),
"second chunk's parent_hash should be block 3's seq_hash"
);
}
#[test]
fn test_repreempt_after_partial_recompute_only_frees_reallocated_blocks() {
let mut seq = ActiveSequence::new((0..6).collect(), 16, Some(4), true, false);
let mut mgr = make_mgr(16, 4);
let signal = seq.take_creation_signal().unwrap();
assert_eq!(mgr.process(&signal), 2);
for _ in 0..3 {
let signals = seq.generate();
for signal in &signals {
mgr.process(signal);
}
if seq.generated_tokens() < seq.max_output_tokens() {
seq.commit_allocation(seq.len());
}
}
assert_eq!(mgr.num_active_blocks(), 3);
let first_reset = seq.reset_with_signal();
for signal in &first_reset {
mgr.process(signal);
}
assert_eq!(mgr.num_active_blocks(), 0);
let prompt_only = seq.prepare_allocation(seq.num_input_tokens()).unwrap();
assert_eq!(mgr.process(&prompt_only), 2);
seq.commit_allocation(seq.num_input_tokens());
assert_eq!(mgr.num_active_blocks(), 2);
let second_reset = seq.reset_with_signal();
for signal in &second_reset {
mgr.process(signal);
}
assert_eq!(mgr.num_active_blocks(), 0);
}
/// When a FullBlock is used, deref'd (becomes inactive in kvbm-logical),
/// then used again, the router already knows about it — reactivation must
/// NOT emit a second `Stored` event.
#[test]
fn test_inactive_hit_does_not_republish_stored() {
let (mut mgr, sink) = make_mgr_capturing(4, 16);
// First Use: fresh registration → 1 Stored.
use_full(&mut mgr, 1, plh(100));
// Deref → block transitions to inactive pool. No Removed (we don't
// emit one on Deref).
deref_full(&mut mgr, 1);
// Second Use: match_blocks reactivates from inactive → InactiveHit.
// No new Stored should fire.
use_full(&mut mgr, 1, plh(100));
let events = sink.events.lock().unwrap();
let stored_count = events
.iter()
.filter(|e| matches!(e.data, KvCacheEventData::Stored(_)))
.count();
let removed_count = events
.iter()
.filter(|e| matches!(e.data, KvCacheEventData::Removed(_)))
.count();
assert_eq!(stored_count, 1, "reactivation must not re-emit Stored");
assert_eq!(removed_count, 0, "Deref must not emit Removed");
}
/// After reusing a prefix [A, B] and storing a new suffix [C], the
/// `Stored` event for C must anchor `parent_hash` to B (the last reused
/// full block), not to whatever parent the caller originally passed.
#[test]
fn test_stored_suffix_anchors_to_last_reused_block() {
let (mut mgr, sink) = make_mgr_capturing(8, 16);
// Prime the cache with [A=10, B=11].
mgr.process(&MoveBlock::Use(
vec![UniqueBlock::FullBlock(10), UniqueBlock::FullBlock(11)],
vec![],
vec![plh(10), plh(11)],
None,
None,
));
// Drop both to inactive.
deref_full(&mut mgr, 10);
deref_full(&mut mgr, 11);
// Clear captured events from priming.
sink.events.lock().unwrap().clear();
// New request reuses [A, B] and stores a new block C=12.
mgr.process(&MoveBlock::Use(
vec![
UniqueBlock::FullBlock(10),
UniqueBlock::FullBlock(11),
UniqueBlock::FullBlock(12),
],
vec![],
vec![plh(10), plh(11), plh(12)],
None,
None, // no explicit parent → scheduler would pass None for a head-chunk
));
let events = sink.events.lock().unwrap();
// Only one Stored (for C); no Stored for reused A or B.
assert_eq!(events.len(), 1, "only new suffix should fire a Stored");
let KvCacheEventData::Stored(ref data) = events[0].data else {
panic!("expected Stored");
};
assert_eq!(data.blocks.len(), 1, "Stored must only include C");
assert_eq!(data.blocks[0].block_hash, ExternalSequenceBlockHash(12));
assert_eq!(
data.parent_hash,
Some(ExternalSequenceBlockHash(11)),
"parent_hash must anchor to last reused full block (B=11)"
);
}
/// Two requests sharing a prefix must not inflate scheduler-visible
/// occupancy. The distinct count reflects physically-resident blocks; the
/// refcount metric reflects held handles.
#[test]
fn test_shared_prefix_distinct_vs_refcount() {
let mut mgr = make_mgr(8, 16);
// Request A uses [10, 11, 12].
mgr.process(&MoveBlock::Use(
vec![
UniqueBlock::FullBlock(10),
UniqueBlock::FullBlock(11),
UniqueBlock::FullBlock(12),
],
vec![],
vec![plh(10), plh(11), plh(12)],
None,
None,
));
assert_eq!(mgr.num_active_blocks(), 3);
assert_eq!(mgr.num_active_block_refs(), 3);
// Request B reuses prefix [10, 11] and adds its own block [13].
mgr.process(&MoveBlock::Use(
vec![
UniqueBlock::FullBlock(10),
UniqueBlock::FullBlock(11),
UniqueBlock::FullBlock(13),
],
vec![],
vec![plh(10), plh(11), plh(13)],
None,
None,
));
// Distinct resident blocks: {10, 11, 12, 13} = 4 (scheduler view).
assert_eq!(
mgr.num_active_blocks(),
4,
"shared prefix must not inflate distinct count"
);
// Handle count: 10 and 11 each held twice, 12 once, 13 once → 6.
assert_eq!(
mgr.num_active_block_refs(),
6,
"handle count should reflect per-request refcount"
);
}
/// With `enable_prefix_caching=false`, each sequence should still be able
/// to reactivate its OWN inactive blocks after preemption and re-admit.
#[test]
fn test_random_plh_stable_across_preempt_retry() {
// 4 blocks of size 16 → 64 tokens of prompt.
let block_size = 16;
let tokens: Vec<u32> = (0..64).collect();
let mut seq = ActiveSequence::new(tokens, 100, Some(block_size), false, false);
let (mut mgr, sink) = make_mgr_capturing(8, block_size);
// Admit: allocate prompt blocks.
let signal = seq.take_creation_signal().unwrap();
assert_eq!(mgr.process(&signal), 4);
assert_eq!(mgr.num_active_blocks(), 4);
// Preempt: reset_with_signal frees all active blocks (Deref) →
// kvbm-logical keeps them in the inactive pool (no Removed events).
let reset_signals = seq.reset_with_signal();
for signal in &reset_signals {
mgr.process(signal);
}
assert_eq!(mgr.num_active_blocks(), 0);
assert_eq!(mgr.num_inactive_blocks(), 4);
// Re-admit: prompt blocks must reactivate via InactiveHit, NOT allocate
// fresh. The cached per-sequence PLHs are what make this work.
let signal = seq.take_creation_signal().unwrap();
assert_eq!(mgr.process(&signal), 4);
assert_eq!(mgr.num_active_blocks(), 4);
assert_eq!(mgr.num_inactive_blocks(), 0);
// Router-event witness: only ONE `Stored` (from the original admit).
let events = sink.events.lock().unwrap();
let stored_count = events
.iter()
.filter(|e| matches!(e.data, KvCacheEventData::Stored(_)))
.count();
assert_eq!(
stored_count, 1,
"preempted request should self-match on re-admit (no duplicate Stored)"
);
}
#[test]
fn test_eviction_emits_exact_removed_event() {
// Capacity = 2. Use three blocks (10, 11, 12); deref 10, 11 to push
// them into the inactive pool; then use a third distinct block (12)
// that isn't already in the active or inactive pool — this forces
// allocation → inactive-pool eviction.
let (mut mgr, sink) = make_mgr_capturing(2, 16);
// Seed 10 and 11 in the inactive pool.
mgr.process(&MoveBlock::Use(
vec![UniqueBlock::FullBlock(10), UniqueBlock::FullBlock(11)],
vec![],
vec![plh(10), plh(11)],
None,
None,
));
deref_full(&mut mgr, 10);
deref_full(&mut mgr, 11);
assert_eq!(mgr.num_active_blocks(), 0);
assert_eq!(mgr.num_inactive_blocks(), 2);
sink.events.lock().unwrap().clear();
// Introduce block 12 → must evict exactly one of {10, 11}.
use_full(&mut mgr, 12, plh(12));
let events = sink.events.lock().unwrap();
let removed: Vec<u64> = events
.iter()
.filter_map(|e| match &e.data {
KvCacheEventData::Removed(data) => Some(
data.block_hashes
.iter()
.map(|ExternalSequenceBlockHash(h)| *h)
.collect::<Vec<_>>(),
),
_ => None,
})
.flatten()
.collect();
let stored_count = events
.iter()
.filter(|e| matches!(e.data, KvCacheEventData::Stored(_)))
.count();
assert_eq!(
removed.len(),
1,
"exactly one block should be reported as evicted"
);
assert!(
removed[0] == 10 || removed[0] == 11,
"evicted hash must be one we seeded ({}), got {}",
"10 or 11",
removed[0]
);
assert_eq!(stored_count, 1, "one Stored event for the fresh block 12");
}
}
......@@ -3,8 +3,8 @@
//! Pluggable KV cache block managers.
pub mod kvbm_backend;
pub mod sglang_backend;
pub mod vllm_backend;
pub use kvbm_backend::KvManager;
pub use sglang_backend::SglangKvManager;
pub use vllm_backend::KvManager;
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! # KV Manager (vLLM Backend)
//! A synchronous implementation of a block manager that handles MoveBlock signals for caching KV blocks.
//!
//! Uses [`HashCache`] for O(1) block lookups with active/inactive pool management.
//!
//! ## Block Operations
//! The KV manager processes four types of MoveBlock signals:
//!
//! ### Use
//! - Checks if block exists in active pool → increment reference count
//! - If in inactive pool → move to active pool
//! - If neither → try evicting from inactive pool to make room
//! - If inactive pool is empty → pre-empt the oldest running request
//!
//! ### Destroy
//! - Removes the block from the active pool
//!
//! ### Deref
//! - Decrements reference count of a block in active pool
//! - If count reaches zero → move block to inactive pool
//!
//! ### Promote
//! - Converts a partial block (uuid) into a full block (global block hash)
//!
//! ## Preemption
//! If a Use operation fails (typically due to insufficient space), a false boolean signal
//! is returned to the scheduler for preemption. Initial KV block allocations for new requests
//! should not fail due to the capacity checking during scheduling.
//!
//! ## NOTE
//! For simplicity (or non-simplicity), reference counting is tracked manually instead of using
//! the more idiomatic built-in Arc reference counter. This can be considered a shadow / mirror
//! implementation of the main block manager.
use crate::cache::HashCache;
use crate::common::kv_cache_trace;
use crate::common::protocols::{KvEventPublishers, MoveBlock, PrefillCost};
use crate::common::sequence::ActiveSequence;
use dynamo_kv_router::protocols::{
ExternalSequenceBlockHash, KvCacheEvent, KvCacheEventData, KvCacheRemoveData, KvCacheStoreData,
KvCacheStoredBlockData, LocalBlockHash,
};
use dynamo_tokens::blocks::UniqueBlock;
use dynamo_tokens::{BlockHash, SequenceHash};
use rustc_hash::FxHashMap;
pub struct KvManager {
cache: HashCache,
block_size: usize,
kv_event_publishers: KvEventPublishers,
dp_rank: u32,
next_event_id: u64,
}
impl KvManager {
pub fn new(max_capacity: usize, block_size: usize) -> Self {
Self::new_with_event_sink(max_capacity, block_size, KvEventPublishers::default(), 0)
}
pub fn new_with_event_sink(
max_capacity: usize,
block_size: usize,
kv_event_publishers: KvEventPublishers,
dp_rank: u32,
) -> Self {
debug_assert!(max_capacity > 0, "max_capacity must be > 0");
if !kv_event_publishers.is_empty() {
tracing::info!(
"KvManager initialized with event sink for DP rank {dp_rank} with block_size {block_size}"
);
}
KvManager {
cache: HashCache::new(max_capacity),
block_size,
kv_event_publishers,
dp_rank,
next_event_id: 0,
}
}
/// Converts stored/removed blocks into KvCacheEventData and publishes if sink is available.
fn publish_kv_event(
&mut self,
full_blocks: Vec<SequenceHash>,
local_hashes: &[BlockHash],
parent_hash: Option<u64>,
is_store: bool,
token_ids: Option<Vec<Vec<u32>>>,
) {
if full_blocks.is_empty() {
return;
}
kv_cache_trace::log_vllm_trace(
if is_store { "allocation" } else { "eviction" },
self.dp_rank,
self.block_size,
self.cache.num_active(),
self.cache.num_inactive(),
self.cache.max_capacity(),
);
if self.kv_event_publishers.is_empty() {
return;
}
let event_data = if is_store {
let num_blocks = full_blocks.len();
let local_hashes_slice = &local_hashes[local_hashes
.len()
.checked_sub(num_blocks)
.expect("local hashes fewer than stored blocks")..];
KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: parent_hash.map(ExternalSequenceBlockHash),
start_position: None,
blocks: full_blocks
.into_iter()
.zip(local_hashes_slice.iter())
.map(|(global_hash, local_hash)| KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(global_hash),
tokens_hash: LocalBlockHash(*local_hash),
mm_extra_info: None,
})
.collect(),
})
} else {
KvCacheEventData::Removed(KvCacheRemoveData {
block_hashes: full_blocks
.into_iter()
.map(ExternalSequenceBlockHash)
.collect(),
})
};
// Use incremental event ID starting from 0 and incrementing by 1 for each event.
let event_id = self.next_event_id;
self.next_event_id += 1;
let event = KvCacheEvent {
event_id,
data: event_data,
dp_rank: self.dp_rank,
};
if let Err(e) = self
.kv_event_publishers
.publish(event, token_ids.as_deref())
{
tracing::warn!("Failed to publish KV event: {e}");
}
}
/// Process a MoveBlock instruction synchronously.
///
/// For `MoveBlock::Use`, returns the number of blocks successfully allocated.
/// On partial failure, blocks 0..N are committed but block N+1 could not be
/// allocated. Callers should use the return value to track partial progress.
///
/// For other variants, returns the total block count (they always succeed or panic).
pub fn process(&mut self, event: &MoveBlock) -> usize {
match event {
MoveBlock::Use(hashes, local_hashes, token_ids, parent) => {
let mut blocks_stored = Vec::<u64>::new();
let mut stored_token_ids: Option<Vec<Vec<u32>>> =
token_ids.as_ref().map(|_| Vec::new());
let mut parent_block: Option<&UniqueBlock> = parent.as_ref();
let mut allocated = 0;
for (i, hash) in hashes.iter().enumerate() {
// First check if it already exists in active blocks
if self.cache.contains_active(hash) {
// Block already active, just increment reference count
self.cache.increment_ref(hash);
parent_block = Some(hash);
allocated += 1;
continue;
}
// Then check if it exists in inactive and move it to active if found
if self.cache.reactivate(hash) {
parent_block = Some(hash);
allocated += 1;
continue;
}
// If at max capacity, evict the oldest entry from inactive blocks
if self.cache.is_at_capacity() {
let Some(evicted) = self.cache.evict_inactive() else {
break;
};
tracing::trace!(
"Evicting block from inactive pool: {evicted:?}, dp_rank={}",
self.dp_rank
);
if let UniqueBlock::FullBlock(evicted_full_block) = evicted {
self.publish_kv_event(vec![evicted_full_block], &[], None, false, None);
}
}
// Now insert the new block in active blocks with reference count 1
self.cache.insert_active(hash.clone(), 1);
allocated += 1;
// Track blocks for trace/event
if let UniqueBlock::FullBlock(stored_full_block) = hash {
blocks_stored.push(*stored_full_block);
if let Some(ref mut stids) = stored_token_ids {
stids.push(token_ids.as_ref().unwrap()[i].clone());
}
}
}
let parent_hash = match parent_block {
None => None,
Some(UniqueBlock::FullBlock(block)) => Some(*block),
Some(UniqueBlock::PartialBlock(_)) => panic!("parent block cannot be partial"),
};
self.publish_kv_event(
blocks_stored,
local_hashes,
parent_hash,
true,
stored_token_ids,
);
return allocated;
}
MoveBlock::Destroy(hashes) => {
let mut blocks_destroyed = Vec::<u64>::new();
// Process blocks in order (already reversed by caller if needed)
for hash in hashes.iter() {
self.cache.remove_active(hash).unwrap();
// Track blocks for batch sending
if let UniqueBlock::FullBlock(destroyed_full_block) = hash {
blocks_destroyed.push(*destroyed_full_block);
}
}
self.publish_kv_event(blocks_destroyed, &[], None, false, None);
}
MoveBlock::Deref(hashes) => {
// Process blocks in order (already reversed by caller if needed)
for hash in hashes.iter() {
// Decrement reference count and check if we need to move to inactive
if let Some(ref_count) = self.cache.get_active_ref_count(hash) {
if ref_count == 0 {
panic!("Negative reference count would be encountered after Deref.");
}
// If reference count reaches zero, remove from active and move to inactive
if ref_count == 1 {
self.cache.deactivate(hash);
} else {
self.cache.decrement_ref(hash);
}
}
}
}
MoveBlock::Promote(uuid, hash, parent_hash, local_hash, promote_token_ids) => {
let uuid_block = UniqueBlock::PartialBlock(*uuid);
let hash_block = UniqueBlock::FullBlock(*hash);
assert_eq!(
self.cache.remove_active(&uuid_block),
Some(1),
"uuid_block {uuid_block:?} should exist and be unique with ref_count=1"
);
let hash_ref_count = self.cache.get_active_ref_count(&hash_block);
// Block is new if it's not in active and not in inactive
let is_new = if hash_ref_count.is_some() {
false
} else {
!self.cache.remove_inactive(&hash_block)
};
self.cache
.insert_active(hash_block, hash_ref_count.unwrap_or(0) + 1);
if is_new {
self.publish_kv_event(
vec![*hash],
&[*local_hash],
*parent_hash,
true,
promote_token_ids.as_ref().map(|t| vec![t.clone()]),
);
}
}
}
1
}
/// Get the count of blocks that aren't in active or inactive pools
pub fn probe_new_blocks(&self, blocks: &[UniqueBlock]) -> usize {
blocks
.iter()
.filter(|&block| !self.cache.contains(block))
.count()
}
/// Get the current capacity (active blocks + inactive blocks)
pub fn current_capacity(&self) -> usize {
self.cache.current_capacity()
}
/// Get the current capacity as a percentage of the maximum capacity
pub fn current_capacity_perc(&self) -> f64 {
self.cache.current_capacity() as f64 / self.cache.max_capacity() as f64
}
/// Get the number of active blocks
pub fn num_active_blocks(&self) -> usize {
self.cache.num_active()
}
/// Get the percentage of active blocks relative to maximum capacity
pub fn get_active_perc(&self) -> f64 {
self.cache.num_active() as f64 / self.cache.max_capacity() as f64
}
/// Get the number of inactive blocks
pub fn num_inactive_blocks(&self) -> usize {
self.cache.num_inactive()
}
/// Get the keys of inactive blocks
pub fn get_inactive_blocks(&self) -> Vec<&UniqueBlock> {
self.cache.inactive_keys().collect()
}
/// Get the keys of active blocks
pub fn get_active_blocks(&self) -> Vec<&UniqueBlock> {
self.cache.active_keys().collect()
}
pub fn max_capacity(&self) -> usize {
self.cache.max_capacity()
}
pub fn block_size(&self) -> usize {
self.block_size
}
pub fn dp_rank(&self) -> u32 {
self.dp_rank
}
/// Direct access to active blocks map (for tests).
pub fn active_blocks(&self) -> &FxHashMap<UniqueBlock, usize> {
self.cache.active_blocks()
}
/// Check if a sequence can be scheduled and calculate cost if possible
pub fn get_prefill_cost(&self, sequence: &ActiveSequence) -> PrefillCost {
let seq_blocks = sequence.unique_blocks();
// Find the longest prefix that exists in cache
// We must stop at the first cache miss since KV states are computed sequentially
let mut overlap_blocks = 0;
for block in seq_blocks {
if !self.cache.contains(block) {
// First cache miss - can't use anything after this point
break;
}
overlap_blocks += 1;
}
let new_blocks = seq_blocks.len() - overlap_blocks;
// Clamp cached_tokens to handle partial blocks (last block may have < block_size tokens)
let cached_tokens = (overlap_blocks * self.block_size).min(sequence.num_input_tokens());
let new_tokens = sequence.num_input_tokens() - cached_tokens;
PrefillCost {
new_blocks,
new_tokens,
cached_tokens,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use crate::common::protocols::KvCacheEventSink;
#[test]
fn test_failure_on_max_capacity() {
// Create a KvManager with 10 blocks capacity
let mut manager = KvManager::new(10, 16);
// Helper function to use multiple blocks that returns the count allocated
fn use_blocks(manager: &mut KvManager, ids: Vec<u64>) -> usize {
let blocks: Vec<_> = ids.iter().map(|&id| UniqueBlock::FullBlock(id)).collect();
let hashes: Vec<_> = ids.into_iter().collect();
manager.process(&MoveBlock::Use(blocks, hashes, None, None))
}
// First use 10 blocks (0 to 9) in a batch
let response = use_blocks(&mut manager, (0..10).collect());
assert_eq!(response, 10, "Expected all 10 blocks allocated");
// Verify we are at capacity
assert_eq!(manager.current_capacity(), 10);
// The 11th block should return 0, not panic
let response = use_blocks(&mut manager, vec![10]);
assert_eq!(
response, 0,
"Expected 0 blocks allocated when exceeding max capacity"
);
}
#[test]
fn test_block_lifecycle_stringent() {
// Create a KvManager with 10 blocks capacity (no KV event publisher for tests)
let mut manager = KvManager::new(10, 16);
// Helper function to use multiple blocks
fn use_blocks(manager: &mut KvManager, ids: Vec<u64>) {
let blocks: Vec<_> = ids.iter().map(|&id| UniqueBlock::FullBlock(id)).collect();
let hashes: Vec<_> = ids.into_iter().collect();
manager.process(&MoveBlock::Use(blocks, hashes, None, None));
}
// Helper function to destroy multiple blocks
fn destroy_blocks(manager: &mut KvManager, ids: Vec<u64>) {
let blocks = ids.into_iter().map(UniqueBlock::FullBlock).collect();
manager.process(&MoveBlock::Destroy(blocks));
}
// Helper function to deref multiple blocks
fn deref_blocks(manager: &mut KvManager, ids: Vec<u64>) {
let blocks = ids.into_iter().map(UniqueBlock::FullBlock).collect();
manager.process(&MoveBlock::Deref(blocks));
}
// Helper function to check if active blocks contain expected blocks with expected ref counts
fn assert_active_blocks(manager: &KvManager, expected_blocks: &[(u64, usize)]) {
assert_eq!(
manager.active_blocks().len(),
expected_blocks.len(),
"Active blocks count doesn't match expected"
);
for &(id, ref_count) in expected_blocks {
let block = UniqueBlock::FullBlock(id);
assert!(
manager.active_blocks().contains_key(&block),
"Block {id} not found in active blocks",
);
assert_eq!(
manager.active_blocks().get(&block),
Some(&ref_count),
"Block {id} has wrong reference count",
);
}
}
// Helper function to check if inactive blocks contain expected blocks
fn assert_inactive_blocks(
manager: &KvManager,
expected_size: usize,
expected_blocks: &[u64],
) {
let inactive_blocks = manager.get_inactive_blocks();
let inactive_blocks_count = manager.num_inactive_blocks();
assert_eq!(
inactive_blocks_count, expected_size,
"Inactive blocks count doesn't match expected"
);
for &id in expected_blocks {
let block = UniqueBlock::FullBlock(id);
assert!(
inactive_blocks.iter().any(|&b| *b == block),
"Block {id} not found in inactive blocks",
);
}
}
// First use blocks 0, 1, 2, 3, 4 in a batch
use_blocks(&mut manager, (0..5).collect());
// Then use blocks 0, 1, 5, 6 in a batch
use_blocks(&mut manager, vec![0, 1, 5, 6]);
// Check that the blocks 0 and 1 are in active blocks, both with reference counts of 2
assert_active_blocks(
&manager,
&[(0, 2), (1, 2), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1)],
);
// Now destroy block 4
destroy_blocks(&mut manager, vec![4]);
// And deref blocks 3, 2, 1, 0 in this order as a batch
deref_blocks(&mut manager, vec![0, 1, 2, 3]);
// Check that the inactive_blocks is size 2 (via num_objects) and contains 3 and 2
assert_inactive_blocks(&manager, 2, &[3, 2]);
assert_active_blocks(&manager, &[(0, 1), (1, 1), (5, 1), (6, 1)]);
// Now destroy block 6
destroy_blocks(&mut manager, vec![6]);
// And deref blocks 5, 1, 0 as a batch
deref_blocks(&mut manager, vec![0, 1, 5]);
// Check that the inactive_blocks is size 5, and contains 0, 1, 2, 3, 5
assert_inactive_blocks(&manager, 5, &[0, 1, 2, 3, 5]);
assert_active_blocks(&manager, &[]);
// Now use 0, 1, 2, 7, 8, 9 as a batch
use_blocks(&mut manager, vec![0, 1, 2, 7, 8, 9]);
// Check that the inactive_blocks is size 2, and contains 3 and 5
assert_inactive_blocks(&manager, 2, &[3, 5]);
assert_active_blocks(&manager, &[(0, 1), (1, 1), (2, 1), (7, 1), (8, 1), (9, 1)]);
// Test the new_blocks method - only block 4 should be new out of [0,1,2,3,4]
let blocks_to_check: Vec<UniqueBlock> = vec![0, 1, 2, 3, 4]
.into_iter()
.map(UniqueBlock::FullBlock)
.collect();
assert_eq!(manager.probe_new_blocks(&blocks_to_check), 1);
// Now use blocks 10, 11, 12 as a batch
use_blocks(&mut manager, vec![10, 11, 12]);
// Check that the inactive_blocks is size 1 and contains only 5
assert_inactive_blocks(&manager, 1, &[5]);
use_blocks(&mut manager, vec![13]);
}
#[test]
fn test_chunked_prefill_parent_hash() {
use std::sync::Mutex;
use crate::common::sequence::ActiveSequence;
#[derive(Default)]
struct CapturingSink {
events: Mutex<Vec<KvCacheEvent>>,
}
impl KvCacheEventSink for CapturingSink {
fn publish(&self, event: KvCacheEvent) -> anyhow::Result<()> {
self.events.lock().unwrap().push(event);
Ok(())
}
}
let block_size = 64;
let tokens: Vec<u32> = (0..512).collect(); // 8 blocks
let mut seq = ActiveSequence::new(tokens, 100, Some(block_size), true, false);
let sink = Arc::new(CapturingSink::default());
let mut manager = KvManager::new_with_event_sink(
256,
block_size,
KvEventPublishers::new(Some(sink.clone() as _), None),
0,
);
// Chunk 1: allocate blocks 0-3
let signal = seq.prepare_allocation(256).unwrap();
manager.process(&signal);
seq.commit_allocation(256);
// Chunk 2: allocate blocks 4-7
let signal = seq.prepare_allocation(512).unwrap();
manager.process(&signal);
seq.commit_allocation(512);
let events = sink.events.lock().unwrap();
assert_eq!(events.len(), 2, "expected two store events");
// First event: parent_hash should be None (starts from root)
let KvCacheEventData::Stored(ref store1) = events[0].data else {
panic!("expected store event");
};
assert!(
store1.parent_hash.is_none(),
"first chunk should have no parent"
);
// Second event: parent_hash should be the seq_hash of block 3
// (the last block from the first chunk)
let KvCacheEventData::Stored(ref store2) = events[1].data else {
panic!("expected store event");
};
let expected_parent = seq.unique_blocks()[3].clone();
let UniqueBlock::FullBlock(expected_hash) = expected_parent else {
panic!("expected full block");
};
assert_eq!(
store2.parent_hash,
Some(ExternalSequenceBlockHash(expected_hash)),
"second chunk's parent should be block 3's seq_hash"
);
}
#[test]
fn test_repreempt_after_partial_recompute_only_frees_reallocated_blocks() {
let mut seq = ActiveSequence::new((0..6).collect(), 16, Some(4), true, false);
let mut manager = KvManager::new(16, 4);
let signal = seq.take_creation_signal().unwrap();
assert_eq!(manager.process(&signal), 2);
for _ in 0..3 {
let signals = seq.generate();
for signal in &signals {
manager.process(signal);
}
if seq.generated_tokens() < seq.max_output_tokens() {
seq.commit_allocation(seq.len());
}
}
assert_eq!(manager.num_active_blocks(), 3);
let first_reset = seq.reset_with_signal();
for signal in &first_reset {
manager.process(signal);
}
assert_eq!(manager.num_active_blocks(), 0);
let prompt_only = seq.prepare_allocation(seq.num_input_tokens()).unwrap();
assert_eq!(manager.process(&prompt_only), 2);
seq.commit_allocation(seq.num_input_tokens());
assert_eq!(manager.num_active_blocks(), 2);
let second_reset = seq.reset_with_signal();
for signal in &second_reset {
manager.process(signal);
}
assert_eq!(manager.num_active_blocks(), 0);
}
}
......@@ -579,11 +579,21 @@ mod live_scheduler {
.build()
.unwrap();
// Side-channel router indexer: the mocker's emitted KV event stream is
// forwarded in real time into `LocalKvIndexer`, which applies Stored/
// Removed events against its own radix tree. If the mocker ever emits
// an invalid event (dangling parent, re-Stored of a present block, or
// Removed of an unknown block), the indexer's per-status counters tick
// — `assert_no_event_errors()` turns those into a test failure.
let harness = RouterIndexerHarness::new(64, ROUTER_TEST_WORKER_ID);
let (forwarder_sink, forwarder_task) = harness.spawn_forwarder();
let publishers = KvEventPublishers::new(Some(forwarder_sink as _), None);
let scheduler = Scheduler::new(
args,
0,
Some(output_tx),
KvEventPublishers::default(),
publishers,
None,
FpmPublisher::default(),
);
......@@ -597,6 +607,17 @@ mod live_scheduler {
use_shared_tokens,
)
.await;
// Stop the scheduler so no new events fire, then drop the forwarder's
// sender by dropping the scheduler → forwarder task drains and exits.
drop(scheduler);
let _ = tokio::time::timeout(Duration::from_secs(2), forwarder_task).await;
harness.flush().await;
harness.assert_no_event_errors();
// NOTE: we do NOT assert `dump_events().is_empty()` here because
// mocker's protocol does not emit router `Removed` events on
// request completion.
harness.shutdown();
}
#[tokio::test]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment