Unverified Commit 7c25f702 authored by Yongming Ding's avatar Yongming Ding Committed by GitHub
Browse files

feat(mocker): add optional KV cache allocation/eviction trace (#6052)


Signed-off-by: default avatarYongming Ding <yongmingd@nvidia.com>
Co-authored-by: default avatarYan Ru Pei <yanrpei@gmail.com>
parent 2bcbda19
......@@ -2452,6 +2452,7 @@ dependencies = [
"derive-getters",
"derive_builder",
"dynamo-kv-router",
"dynamo-runtime",
"dynamo-tokens",
"ndarray",
"ndarray-interp",
......
......@@ -28,6 +28,10 @@ The mocker engine now supports a vLLM-style CLI interface with individual argume
- `--stagger-delay`: Delay in seconds between launching each worker to avoid overwhelming etcd/NATS/frontend. Set to 0 to disable staggering. Use -1 for auto mode (stagger dependent on number of workers). Default: -1 (auto)
- `--is-prefill-worker` / `--is-decode-worker`: Whether the worker is a prefill or decode worker for disaggregated deployment. If not specified, mocker will be in aggregated mode.
**Environment variables:**
- `DYN_MOCKER_KV_CACHE_TRACE`: Set to `1` or `true` to log structured KV cache allocation/eviction trace (timestamp_ms, block_ids, etc.). Default: off.
### Example with individual arguments (vLLM-style):
```bash
# Start mocker with custom configuration
......
......@@ -1752,6 +1752,7 @@ dependencies = [
"derive-getters",
"derive_builder",
"dynamo-kv-router",
"dynamo-runtime",
"dynamo-tokens",
"ndarray",
"ndarray-interp",
......
......@@ -12,6 +12,7 @@ repository.workspace = true
[dependencies]
# repo
dynamo-runtime = { workspace = true }
dynamo-tokens = { workspace = true }
dynamo-kv-router = { workspace = true }
......
......@@ -41,10 +41,20 @@ use dynamo_kv_router::protocols::{
ExternalSequenceBlockHash, KvCacheEvent, KvCacheEventData, KvCacheRemoveData, KvCacheStoreData,
KvCacheStoredBlockData, LocalBlockHash,
};
use dynamo_runtime::config::environment_names::mocker;
use dynamo_tokens::blocks::UniqueBlock;
use dynamo_tokens::{BlockHash, SequenceHash};
use std::collections::HashMap;
use std::sync::Arc;
use std::env;
use std::sync::{Arc, LazyLock};
use std::time::{SystemTime, UNIX_EPOCH};
/// Check the env var to enable KV cache allocation/eviction trace logs.
static KV_CACHE_TRACE_ENABLED: LazyLock<bool> = LazyLock::new(|| {
env::var(mocker::DYN_MOCKER_KV_CACHE_TRACE)
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
.unwrap_or(false)
});
#[derive(Getters)]
pub struct KvManager {
......@@ -97,7 +107,7 @@ impl KvManager {
}
}
/// Converts stored/removed blocks into KvCacheEventData and publishes if sink is available
/// Converts stored/removed blocks into KvCacheEventData and publishes if sink is available.
fn publish_kv_event(
&mut self,
full_blocks: Vec<SequenceHash>,
......@@ -113,6 +123,32 @@ impl KvManager {
return;
};
if *KV_CACHE_TRACE_ENABLED {
let timestamp_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let active_len = self.active_blocks.len();
let inactive_len = self.inactive_blocks.len();
let free_blocks = self
.max_capacity
.saturating_sub(active_len)
.saturating_sub(inactive_len);
let event = if is_store { "allocation" } else { "eviction" };
tracing::info!(
event,
timestamp_ms,
block_ids = ?&full_blocks,
block_size = self.block_size,
free_blocks_after = free_blocks,
active_blocks = active_len,
inactive_blocks = inactive_len,
total_blocks = self.max_capacity,
dp_rank = self.dp_rank,
"KV cache trace"
);
}
let event_data = if is_store {
let num_blocks = full_blocks.len();
let local_hashes_slice = &local_hashes[local_hashes
......@@ -263,17 +299,15 @@ impl KvManager {
"uuid_block {uuid_block:?} should exist and be unique with ref_count=1"
);
let hash_ref_count = if let Some(ref_count) = self.active_blocks.get(&hash_block) {
*ref_count
} else if self.inactive_blocks.remove(&hash_block) {
0
} else {
self.publish_kv_event(vec![*hash], &[*local_hash], *parent_hash, true);
0
};
let hash_ref_count = self.active_blocks.get(&hash_block).copied();
let is_new = hash_ref_count.is_none() && !self.inactive_blocks.remove(&hash_block);
self.active_blocks
.insert(hash_block.clone(), hash_ref_count + 1);
.insert(hash_block.clone(), hash_ref_count.unwrap_or(0) + 1);
if is_new {
self.publish_kv_event(vec![*hash], &[*local_hash], *parent_hash, true);
}
}
}
......
......@@ -20,6 +20,7 @@
//! - **Model**: Model loading and caching
//! - **Worker**: Worker lifecycle and shutdown
//! - **Testing**: Test-specific configuration
//! - **Mocker**: Mocker (mock scheduler/KV manager) configuration
/// Logging and tracing environment variables
pub mod logging {
......@@ -356,6 +357,12 @@ pub mod build {
pub const OUT_DIR: &str = "OUT_DIR";
}
/// Mocker (mock scheduler/KV manager) environment variables
pub mod mocker {
/// Enable structured KV cache allocation/eviction trace logs (set to "1" or "true" to enable)
pub const DYN_MOCKER_KV_CACHE_TRACE: &str = "DYN_MOCKER_KV_CACHE_TRACE";
}
/// Testing environment variables
pub mod testing {
/// Enable queued-up request processing in tests
......@@ -454,6 +461,8 @@ mod tests {
cuda::DYNAMO_FATBIN_PATH,
// Build
build::OUT_DIR,
// Mocker
mocker::DYN_MOCKER_KV_CACHE_TRACE,
// Testing
testing::DYN_QUEUED_UP_PROCESSING,
testing::DYN_SOAK_RUN_DURATION,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment