"lib/runtime/vscode:/vscode.git/clone" did not exist on "c4935b34979bcc2cc4938418efa874ad2317d1cd"
Unverified Commit 9b2e8f3d authored by Yongming Ding's avatar Yongming Ding Committed by GitHub
Browse files

refactor(mocker): modularize into common/scheduler/kv_manager/cache (#6440)


Signed-off-by: default avatarYongming Ding <yongmingd@nvidia.com>
parent 41d7d549
......@@ -18,8 +18,8 @@ use std::sync::Arc;
use uuid::Uuid;
use dynamo_kv_router::protocols::{KvCacheEvent, KvCacheEventData};
use dynamo_mocker::Scheduler;
use dynamo_mocker::protocols::{DirectRequest, KvCacheEventSink, MockEngineArgs};
use dynamo_mocker::common::protocols::{DirectRequest, KvCacheEventSink, MockEngineArgs};
use dynamo_mocker::scheduler::Scheduler;
use indicatif::{ProgressBar, ProgressStyle};
use std::sync::Mutex;
use tokio::task::JoinHandle;
......
......@@ -33,12 +33,14 @@ use crate::protocols::common::llm_backend::{LLMEngineOutput, PreprocessedRequest
use dynamo_kv_router::protocols::KvCacheEvent;
// Re-export from dynamo-mocker for convenience
use dynamo_mocker::bootstrap::{BootstrapServer, connect_to_prefill};
use dynamo_mocker::protocols::OutputSignal;
pub use dynamo_mocker::{
DirectRequest, KvCacheEventSink, MockEngineArgs, MockEngineArgsBuilder, Scheduler, bootstrap,
evictor, kv_manager, perf_model, protocols, running_mean, scheduler, sequence,
use dynamo_mocker::common::bootstrap::{BootstrapServer, connect_to_prefill};
use dynamo_mocker::common::protocols::OutputSignal;
pub use dynamo_mocker::common::protocols::{
DirectRequest, KvCacheEventSink, MockEngineArgs, MockEngineArgsBuilder,
};
pub use dynamo_mocker::common::{bootstrap, perf_model, protocols, running_mean, sequence};
pub use dynamo_mocker::scheduler::Scheduler;
pub use dynamo_mocker::{kv_manager, scheduler};
pub const MOCKER_COMPONENT: &str = "mocker";
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use crate::common::evictor::LRUEvictor;
use dynamo_tokens::blocks::UniqueBlock;
use std::collections::HashMap;
/// Hash-based KV cache with O(1) block lookups, maintaining active (ref-counted) and
/// inactive (LRU-evictable) pools.
pub struct HashCache {
active_blocks: HashMap<UniqueBlock, usize>,
inactive_blocks: LRUEvictor<UniqueBlock>,
max_capacity: usize,
}
impl HashCache {
/// Create a new HashCache with the given maximum block capacity.
pub fn new(max_capacity: usize) -> Self {
Self {
active_blocks: HashMap::new(),
inactive_blocks: LRUEvictor::default(),
max_capacity,
}
}
/// Get the reference count of an active block, if it exists.
pub fn get_active_ref_count(&self, block: &UniqueBlock) -> Option<usize> {
self.active_blocks.get(block).copied()
}
/// Increment the reference count of an active block. Returns the new count.
pub fn increment_ref(&mut self, block: &UniqueBlock) -> usize {
let ref_count = self
.active_blocks
.get_mut(block)
.expect("block must be active to increment ref");
*ref_count += 1;
*ref_count
}
/// Decrement the reference count of an active block. Returns the new count.
pub fn decrement_ref(&mut self, block: &UniqueBlock) -> usize {
let ref_count = self
.active_blocks
.get_mut(block)
.expect("block must be active to decrement ref");
*ref_count -= 1;
*ref_count
}
/// Insert a block into the active pool with the given reference count.
pub fn insert_active(&mut self, block: UniqueBlock, ref_count: usize) {
self.active_blocks.insert(block, ref_count);
}
/// Remove a block from the active pool. Returns the reference count, or None if not found.
pub fn remove_active(&mut self, block: &UniqueBlock) -> Option<usize> {
self.active_blocks.remove(block)
}
/// Check if a block is in the active pool.
pub fn contains_active(&self, block: &UniqueBlock) -> bool {
self.active_blocks.contains_key(block)
}
/// Insert a block into the inactive pool (LRU order).
pub fn insert_inactive(&mut self, block: UniqueBlock) {
self.inactive_blocks.insert(block);
}
/// Remove a block from the inactive pool. Returns true if it was found.
pub fn remove_inactive(&mut self, block: &UniqueBlock) -> bool {
self.inactive_blocks.remove(block)
}
/// Evict the least-recently-used block from the inactive pool.
pub fn evict_inactive(&mut self) -> Option<UniqueBlock> {
self.inactive_blocks.evict()
}
/// Check if a block is in the inactive pool.
pub fn contains_inactive(&self, block: &UniqueBlock) -> bool {
self.inactive_blocks.contains(block)
}
/// Check if a block exists in either active or inactive pool.
pub fn contains(&self, block: &UniqueBlock) -> bool {
self.active_blocks.contains_key(block) || self.inactive_blocks.contains(block)
}
/// Move block from active to inactive (ref_count reached 0).
pub fn deactivate(&mut self, block: &UniqueBlock) {
debug_assert!(
self.active_blocks.contains_key(block),
"deactivate called on non-active block"
);
debug_assert!(
!self.inactive_blocks.contains(block),
"deactivate called on already-inactive block"
);
self.active_blocks.remove(block);
self.inactive_blocks.insert(block.clone());
}
/// Move block from inactive to active with ref_count=1. Returns true if found.
pub fn reactivate(&mut self, block: &UniqueBlock) -> bool {
if self.inactive_blocks.remove(block) {
self.active_blocks.insert(block.clone(), 1);
true
} else {
false
}
}
/// Check if total blocks (active + inactive) has reached max_capacity.
pub fn is_at_capacity(&self) -> bool {
self.active_blocks.len() + self.inactive_blocks.len() >= self.max_capacity
}
/// Get the number of active blocks.
pub fn num_active(&self) -> usize {
self.active_blocks.len()
}
/// Get the number of inactive blocks.
pub fn num_inactive(&self) -> usize {
self.inactive_blocks.len()
}
/// Get the maximum block capacity.
pub fn max_capacity(&self) -> usize {
self.max_capacity
}
/// Get the current capacity (active + inactive blocks).
pub fn current_capacity(&self) -> usize {
self.active_blocks.len() + self.inactive_blocks.len()
}
/// Iterate over active block keys.
pub fn active_keys(&self) -> impl Iterator<Item = &UniqueBlock> {
self.active_blocks.keys()
}
/// Iterate over inactive block keys.
pub fn inactive_keys(&self) -> impl Iterator<Item = &UniqueBlock> {
self.inactive_blocks.keys()
}
/// Direct access to active blocks map (for tests that check ref counts).
pub fn active_blocks(&self) -> &HashMap<UniqueBlock, usize> {
&self.active_blocks
}
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Cache data structures for KV block management.
pub mod hash_cache;
pub use hash_cache::HashCache;
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Shared components used across all engine implementations.
pub mod bootstrap;
pub mod evictor;
pub mod perf_model;
pub mod protocols;
pub mod running_mean;
pub mod sequence;
......@@ -9,7 +9,7 @@ use std::sync::Arc;
use uuid::Uuid;
use validator::Validate;
use crate::perf_model::PerfModel;
use crate::common::perf_model::PerfModel;
use dynamo_kv_router::protocols::KvCacheEvent;
use dynamo_tokens::blocks::UniqueBlock;
use dynamo_tokens::{BlockHash, SequenceHash, Token};
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use crate::protocols::MoveBlock;
use crate::common::protocols::MoveBlock;
use derive_getters::Getters;
use dynamo_tokens::blocks::UniqueBlock;
use dynamo_tokens::{TokenBlockSequence, Tokens};
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Pluggable KV cache block managers.
pub mod vllm_backend;
pub use vllm_backend::KvManager;
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! # KV Manager
//! # KV Manager (vLLM Backend)
//! A synchronous implementation of a block manager that handles MoveBlock signals for caching KV blocks.
//!
//! Uses [`HashCache`] for O(1) block lookups with active/inactive pool management.
//!
//! ## Block Operations
//! The KV manager processes four types of MoveBlock signals:
//!
......@@ -32,11 +34,9 @@
//! For simplicity (or non-simplicity), reference counting is tracked manually instead of using
//! the more idiomatic built-in Arc reference counter. This can be considered a shadow / mirror
//! implementation of the main block manager.
use crate::evictor::LRUEvictor;
use crate::protocols::{KvCacheEventSink, MoveBlock, PrefillCost};
use crate::sequence::ActiveSequence;
use derive_getters::Getters;
use crate::cache::HashCache;
use crate::common::protocols::{KvCacheEventSink, MoveBlock, PrefillCost};
use crate::common::sequence::ActiveSequence;
use dynamo_kv_router::protocols::{
ExternalSequenceBlockHash, KvCacheEvent, KvCacheEventData, KvCacheRemoveData, KvCacheStoreData,
KvCacheStoredBlockData, LocalBlockHash,
......@@ -56,23 +56,11 @@ static KV_CACHE_TRACE_ENABLED: LazyLock<bool> = LazyLock::new(|| {
.unwrap_or(false)
});
#[derive(Getters)]
pub struct KvManager {
#[getter(copy)]
max_capacity: usize,
#[getter(copy)]
cache: HashCache,
block_size: usize,
active_blocks: HashMap<UniqueBlock, usize>,
inactive_blocks: LRUEvictor<UniqueBlock>,
kv_event_sink: Option<Arc<dyn KvCacheEventSink>>,
#[getter(copy)]
dp_rank: u32,
next_event_id: u64,
}
......@@ -87,9 +75,7 @@ impl KvManager {
kv_event_sink: Option<Arc<dyn KvCacheEventSink>>,
dp_rank: u32,
) -> Self {
let active_blocks = HashMap::new();
let inactive_blocks = LRUEvictor::default();
debug_assert!(max_capacity > 0, "max_capacity must be > 0");
if kv_event_sink.is_some() {
tracing::info!(
"KvManager initialized with event sink for DP rank {dp_rank} with block_size {block_size}"
......@@ -97,10 +83,8 @@ impl KvManager {
}
KvManager {
max_capacity,
cache: HashCache::new(max_capacity),
block_size,
active_blocks,
inactive_blocks,
kv_event_sink,
dp_rank,
next_event_id: 0,
......@@ -120,17 +104,18 @@ impl KvManager {
}
if *KV_CACHE_TRACE_ENABLED {
let timestamp_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let active_len = self.active_blocks.len();
let inactive_len = self.inactive_blocks.len();
let active_len = self.cache.num_active();
let inactive_len = self.cache.num_inactive();
let free_blocks = self
.max_capacity
.cache
.max_capacity()
.saturating_sub(active_len)
.saturating_sub(inactive_len);
let event = if is_store { "allocation" } else { "eviction" };
let timestamp_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
tracing::info!(
event,
timestamp_ms,
......@@ -139,7 +124,7 @@ impl KvManager {
free_blocks_after = free_blocks,
active_blocks = active_len,
inactive_blocks = inactive_len,
total_blocks = self.max_capacity,
total_blocks = self.cache.max_capacity(),
dp_rank = self.dp_rank,
"KV cache trace"
);
......@@ -176,8 +161,7 @@ impl KvManager {
.collect(),
})
};
// Use incremental event ID starting from 0
// Use incremental event ID starting from 0 and incrementing by 1 for each event.
let event_id = self.next_event_id;
self.next_event_id += 1;
......@@ -201,28 +185,22 @@ impl KvManager {
let mut parent_block: Option<&UniqueBlock> = None;
for hash in hashes {
// First check if it already exists in active blocks
if let Some(ref_count) = self.active_blocks.get_mut(hash) {
if self.cache.contains_active(hash) {
// Block already active, just increment reference count
*ref_count += 1;
self.cache.increment_ref(hash);
parent_block = Some(hash);
continue;
}
// Then check if it exists in inactive and move it to active if found
if self.inactive_blocks.remove(hash) {
// Insert into active with reference count 1
self.active_blocks.insert(hash.clone(), 1);
if self.cache.reactivate(hash) {
parent_block = Some(hash);
continue;
}
// Get counts for capacity check
let active_count = self.active_blocks.len();
let inactive_count = self.inactive_blocks.len();
// If at max capacity, evict the oldest entry from inactive blocks
if active_count + inactive_count >= self.max_capacity {
let Some(evicted) = self.inactive_blocks.evict() else {
if self.cache.is_at_capacity() {
let Some(evicted) = self.cache.evict_inactive() else {
return false;
};
tracing::trace!(
......@@ -235,7 +213,7 @@ impl KvManager {
}
// Now insert the new block in active blocks with reference count 1
self.active_blocks.insert(hash.clone(), 1);
self.cache.insert_active(hash.clone(), 1);
// Track blocks for trace/event
if let UniqueBlock::FullBlock(stored_full_block) = hash {
blocks_stored.push(*stored_full_block);
......@@ -252,11 +230,9 @@ impl KvManager {
MoveBlock::Destroy(hashes) => {
let mut blocks_destroyed = Vec::<u64>::new();
// Process blocks in order (already reversed by caller if needed)
for hash in hashes.iter() {
self.active_blocks.remove(hash).unwrap();
self.cache.remove_active(hash).unwrap();
// Track blocks for batch sending
if let UniqueBlock::FullBlock(destroyed_full_block) = hash {
blocks_destroyed.push(*destroyed_full_block);
......@@ -270,17 +246,16 @@ impl KvManager {
// Process blocks in order (already reversed by caller if needed)
for hash in hashes.iter() {
// Decrement reference count and check if we need to move to inactive
if let Some(ref_count) = self.active_blocks.get_mut(hash) {
if *ref_count == 0 {
if let Some(ref_count) = self.cache.get_active_ref_count(hash) {
if ref_count == 0 {
panic!("Negative reference count would be encountered after Deref.");
}
*ref_count -= 1;
// If reference count reaches zero, remove from active and move to inactive
if *ref_count == 0 {
self.active_blocks.remove(hash);
// Use the LRUEvictor's timing functionality
self.inactive_blocks.insert(hash.clone());
if ref_count == 1 {
self.cache.deactivate(hash);
} else {
self.cache.decrement_ref(hash);
}
}
}
......@@ -291,16 +266,21 @@ impl KvManager {
let hash_block = UniqueBlock::FullBlock(*hash);
assert_eq!(
self.active_blocks.remove(&uuid_block),
self.cache.remove_active(&uuid_block),
Some(1),
"uuid_block {uuid_block:?} should exist and be unique with ref_count=1"
);
let hash_ref_count = self.active_blocks.get(&hash_block).copied();
let is_new = hash_ref_count.is_none() && !self.inactive_blocks.remove(&hash_block);
let hash_ref_count = self.cache.get_active_ref_count(&hash_block);
// Block is new if it's not in active and not in inactive
let is_new = if hash_ref_count.is_some() {
false
} else {
!self.cache.remove_inactive(&hash_block)
};
self.active_blocks
.insert(hash_block.clone(), hash_ref_count.unwrap_or(0) + 1);
self.cache
.insert_active(hash_block, hash_ref_count.unwrap_or(0) + 1);
if is_new {
self.publish_kv_event(vec![*hash], &[*local_hash], *parent_hash, true);
......@@ -316,48 +296,60 @@ impl KvManager {
pub fn probe_new_blocks(&self, blocks: &[UniqueBlock]) -> usize {
blocks
.iter()
.filter(|&block| {
!self.active_blocks.contains_key(block) && !self.inactive_blocks.contains(block)
})
.filter(|&block| !self.cache.contains(block))
.count()
}
/// Get the current capacity (active blocks + inactive blocks)
pub fn current_capacity(&self) -> usize {
let active = self.active_blocks.len();
let inactive = self.inactive_blocks.len();
active + inactive
self.cache.current_capacity()
}
/// Get the current capacity as a percentage of the maximum capacity
pub fn current_capacity_perc(&self) -> f64 {
let current = self.current_capacity() as f64;
current / self.max_capacity as f64
self.cache.current_capacity() as f64 / self.cache.max_capacity() as f64
}
/// Get the number of active blocks
pub fn num_active_blocks(&self) -> usize {
self.active_blocks.len()
self.cache.num_active()
}
/// Get the percentage of active blocks relative to maximum capacity
pub fn get_active_perc(&self) -> f64 {
self.active_blocks.len() as f64 / self.max_capacity as f64
self.cache.num_active() as f64 / self.cache.max_capacity() as f64
}
/// Get the number of inactive blocks
pub fn num_inactive_blocks(&self) -> usize {
self.inactive_blocks.len()
self.cache.num_inactive()
}
/// Get the keys of inactive blocks
pub fn get_inactive_blocks(&self) -> Vec<&UniqueBlock> {
self.inactive_blocks.keys().collect()
self.cache.inactive_keys().collect()
}
/// Get the keys of active blocks
pub fn get_active_blocks(&self) -> Vec<&UniqueBlock> {
self.active_blocks.keys().collect()
self.cache.active_keys().collect()
}
pub fn max_capacity(&self) -> usize {
self.cache.max_capacity()
}
pub fn block_size(&self) -> usize {
self.block_size
}
pub fn dp_rank(&self) -> u32 {
self.dp_rank
}
/// Direct access to active blocks map (for tests).
pub fn active_blocks(&self) -> &HashMap<UniqueBlock, usize> {
self.cache.active_blocks()
}
/// Check if a sequence can be scheduled and calculate cost if possible
......@@ -368,7 +360,7 @@ impl KvManager {
// We must stop at the first cache miss since KV states are computed sequentially
let mut overlap_blocks = 0;
for block in seq_blocks {
if !self.active_blocks.contains_key(block) && !self.inactive_blocks.contains(block) {
if !self.cache.contains(block) {
// First cache miss - can't use anything after this point
break;
}
......@@ -471,7 +463,7 @@ mod tests {
expected_blocks: &[u64],
) {
let inactive_blocks = manager.get_inactive_blocks();
let inactive_blocks_count = manager.inactive_blocks().len();
let inactive_blocks_count = manager.num_inactive_blocks();
assert_eq!(
inactive_blocks_count, expected_size,
......
......@@ -7,15 +7,7 @@
//! KV cache management, request scheduling, and token generation timing without
//! requiring actual GPU resources or a full distributed runtime.
pub mod bootstrap;
pub mod evictor;
pub mod cache;
pub mod common;
pub mod kv_manager;
pub mod perf_model;
pub mod protocols;
pub mod running_mean;
pub mod scheduler;
pub mod sequence;
// Re-export commonly used types
pub use protocols::{DirectRequest, KvCacheEventSink, MockEngineArgs, MockEngineArgsBuilder};
pub use scheduler::Scheduler;
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Engine-specific scheduling implementations.
pub mod vllm;
// Backward compatibility: re-export Scheduler from vllm module
pub use vllm::Scheduler;
......@@ -28,15 +28,15 @@
//! ## NOTE
//! The current prefill and decoding time simulations are not scientific at all and are WIP
use crate::evictor::LRUEvictor;
use crate::kv_manager::KvManager;
use crate::perf_model::PerfModel;
use crate::protocols::{
use crate::common::evictor::LRUEvictor;
use crate::common::perf_model::PerfModel;
use crate::common::protocols::{
DirectRequest, KvCacheEventSink, MockEngineArgs, MoveBlock, OutputSignal, PrefillCost,
WorkerType,
};
use crate::running_mean::RunningMean;
use crate::sequence::ActiveSequence;
use crate::common::running_mean::RunningMean;
use crate::common::sequence::ActiveSequence;
use crate::kv_manager::KvManager;
use dynamo_kv_router::protocols::DpRank;
use dynamo_tokens::blocks::UniqueBlock;
use std::collections::{HashMap, VecDeque};
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment