Unverified Commit 2f18b23e authored by Kris Hung's avatar Kris Hung Committed by GitHub
Browse files

chore: Use general engine source for KVBM KV events (#4515)

parent edab1ac4
......@@ -7,6 +7,7 @@ use dynamo_llm::block_manager::block::{
data::logical::distributed_leader_worker::DistributedLeaderWorkerResources, locality::Logical,
};
use dynamo_llm::block_manager::kv_consolidator::KvEventConsolidatorConfig;
use dynamo_llm::block_manager::kv_consolidator::tracker::EventSource;
use dynamo_llm::block_manager::offload::filter::FrequencyFilter;
use dynamo_llm::block_manager::{BasicMetadata, BlockParallelismStrategy};
use dynamo_runtime::DistributedRuntime;
......@@ -249,7 +250,7 @@ pub struct BlockManagerBuilder {
page_size: usize,
disable_device_pool: bool,
kvbm_metrics: Option<dynamo_llm::block_manager::metrics_kvbm::KvbmMetrics>,
consolidator_config: Option<(String, String)>, // (vllm_endpoint, output_endpoint)
consolidator_config: Option<(String, String, EventSource)>, // (engine_endpoint, output_endpoint, engine_source)
}
impl BlockManagerBuilder {
......@@ -285,8 +286,13 @@ impl BlockManagerBuilder {
self
}
pub fn consolidator_config(mut self, vllm_endpoint: String, output_endpoint: String) -> Self {
self.consolidator_config = Some((vllm_endpoint, output_endpoint));
pub fn consolidator_config(
mut self,
engine_endpoint: String,
output_endpoint: String,
engine_source: EventSource,
) -> Self {
self.consolidator_config = Some((engine_endpoint, output_endpoint, engine_source));
self
}
......@@ -360,8 +366,12 @@ impl BlockManagerBuilder {
config_builder = config_builder.kvbm_metrics(Some(kvbm_metrics));
}
if let Some((vllm_ep, output_ep)) = self.consolidator_config {
let consolidator_config = KvEventConsolidatorConfig::new(vllm_ep, output_ep);
if let Some((engine_ep, output_ep, engine_source)) = self.consolidator_config {
let consolidator_config = KvEventConsolidatorConfig::new(
engine_ep,
output_ep,
engine_source,
);
config_builder = config_builder.consolidator_config(consolidator_config);
}
......
......@@ -22,6 +22,7 @@ use dynamo_llm::block_manager::{
locality::Logical,
},
connector::*,
kv_consolidator::tracker::EventSource,
};
use dynamo_llm::tokens::{SaltHash, TokenBlockSequence, Tokens};
use dynamo_runtime::config::environment_names::kvbm as env_kvbm;
......@@ -144,7 +145,7 @@ impl KvConnectorLeader {
output_ep
);
block_manager_builder =
block_manager_builder.consolidator_config(vllm_ep, output_ep);
block_manager_builder.consolidator_config(vllm_ep, output_ep, EventSource::Vllm);
}
let block_manager = match block_manager_builder.build().await {
......
......@@ -3,6 +3,7 @@
use super::*;
use anyhow;
use dynamo_llm::block_manager::kv_consolidator::tracker::EventSource;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Action {
......@@ -152,7 +153,7 @@ impl KvConnectorLeaderRecorder {
(consolidator_vllm_ep, consolidator_output_ep)
{
block_manager_builder =
block_manager_builder.consolidator_config(vllm_ep, output_ep);
block_manager_builder.consolidator_config(vllm_ep, output_ep, EventSource::Vllm);
}
let block_manager = match block_manager_builder.build().await {
......
......@@ -238,12 +238,21 @@ pub(crate) struct KvEventPublisher {
#[pymethods]
impl KvEventPublisher {
#[new]
#[pyo3(signature = (component, kv_block_size, dp_rank=0))]
fn new(component: Component, kv_block_size: usize, dp_rank: DpRank) -> PyResult<Self> {
#[pyo3(signature = (component, worker_id, kv_block_size, dp_rank=0))]
fn new(
component: Component,
worker_id: WorkerId,
kv_block_size: usize,
dp_rank: DpRank,
) -> PyResult<Self> {
if kv_block_size == 0 {
return Err(to_pyerr(anyhow::anyhow!("kv_block_size cannot be 0")));
}
// Note: worker_id parameter matches the Python stub (_core.pyi) signature but is not used.
// The actual worker_id is inferred from component's connection_id in the Rust implementation.
let _ = worker_id;
let inner = llm_rs::kv_router::publisher::KvEventPublisher::new(
component.inner,
kv_block_size as u32,
......
......@@ -5,30 +5,41 @@
use serde::{Deserialize, Serialize};
use super::tracker::EventSource;
/// Configuration for the KV Event Consolidator
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KvEventConsolidatorConfig {
/// ZMQ endpoint to subscribe to vLLM events (e.g., "tcp://localhost:5557")
pub vllm_event_endpoint: String,
/// ZMQ endpoint to subscribe to engine events (vLLM or TensorRT-LLM) (e.g., "tcp://localhost:5557")
pub engine_event_endpoint: String,
/// ZMQ endpoint to publish consolidated events (e.g., "tcp://*:5558")
pub consolidated_event_endpoint: String,
/// Engine source for events (vLLM or TensorRT-LLM)
pub engine_source: EventSource,
}
impl Default for KvEventConsolidatorConfig {
fn default() -> Self {
Self {
vllm_event_endpoint: "tcp://localhost:5557".to_string(),
engine_event_endpoint: "tcp://localhost:5557".to_string(),
consolidated_event_endpoint: "tcp://*:5558".to_string(),
engine_source: EventSource::Vllm,
}
}
}
impl KvEventConsolidatorConfig {
pub fn new(vllm_event_endpoint: String, consolidated_event_endpoint: String) -> Self {
pub fn new(
engine_event_endpoint: String,
consolidated_event_endpoint: String,
engine_source: EventSource,
) -> Self {
Self {
vllm_event_endpoint,
engine_event_endpoint,
consolidated_event_endpoint,
engine_source,
}
}
}
......@@ -102,7 +102,7 @@ impl KvEventConsolidator {
pub async fn start(&mut self) -> Result<()> {
tracing::info!(
"Starting KV Event Consolidator: subscribe from {}, publish to {}",
self.config.vllm_event_endpoint,
self.config.engine_event_endpoint,
self.config.consolidated_event_endpoint
);
......@@ -116,11 +116,12 @@ impl KvEventConsolidator {
tracing::info!("Waiting for downstream subscribers to connect...");
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
// Start the subscriber (connects to vLLM's publisher)
// Start the subscriber (connects to engine's publisher - vLLM or TensorRT-LLM)
let handle = start_simple_zmq_listener(
self.config.vllm_event_endpoint.clone(),
self.config.engine_event_endpoint.clone(),
self.tracker.clone(),
self.cancellation_token.clone(),
self.config.engine_source,
)
.await?;
......
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Simple ZMQ Subscriber for vLLM KV Events
//! Simple ZMQ Subscriber for vLLM/TensorRT-LLM KV Events
//!
//! This is a simplified subscriber that deserializes raw vLLM events.
//! This is a simplified subscriber that deserializes raw vLLM/TensorRT-LLM events.
use anyhow::{Context, Result};
use rmp_serde::Deserializer;
......@@ -14,13 +14,13 @@ use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use zeromq::{Socket, SocketRecv, SubSocket};
use super::tracker::{CacheStatusTracker, StorageTier};
use super::tracker::{CacheStatusTracker, EventSource, StorageTier};
/// Event batch received from vLLM (array format)
/// Event batch received from vLLM/TensorRT-LLM (array format)
/// Format: [timestamp, [events], data_parallel_rank]
///
/// Note: This uses a tuple struct to deserialize from array [ts, events, rank]
/// rather than an object {"ts": ..., "events": ..., "rank": ...} for vLLM compatibility.
/// rather than an object {"ts": ..., "events": ..., "rank": ...} for vLLM/TensorRT-LLM compatibility.
#[derive(Debug, Deserialize)]
struct VllmEventBatch(
f64, // ts
......@@ -59,7 +59,7 @@ impl std::fmt::Display for BlockHash {
}
}
/// Raw vLLM event format (preserves all data including token_ids)
/// Raw vLLM/TensorRT-LLM event format (preserves all data including token_ids)
#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "type")]
enum VllmRawEvent {
......@@ -88,9 +88,12 @@ pub async fn start_simple_zmq_listener(
endpoint: String,
tracker: Arc<RwLock<CacheStatusTracker>>,
cancellation_token: CancellationToken,
engine_source: EventSource,
) -> Result<JoinHandle<()>> {
let handle = tokio::spawn(async move {
if let Err(e) = run_listener_loop(endpoint, tracker, cancellation_token).await {
if let Err(e) =
run_listener_loop(endpoint, tracker, cancellation_token, engine_source).await
{
tracing::error!("ZMQ listener task failed: {}", e);
}
});
......@@ -102,6 +105,7 @@ async fn run_listener_loop(
endpoint: String,
tracker: Arc<RwLock<CacheStatusTracker>>,
cancellation_token: CancellationToken,
engine_source: EventSource,
) -> Result<()> {
tracing::info!(
"KV event consolidator ZMQ listener connecting to {}",
......@@ -174,7 +178,7 @@ async fn run_listener_loop(
// Process events
let mut tracker_guard = tracker.write().await;
for event in batch.events() {
process_event(&mut tracker_guard, event.clone(), dp_rank);
process_event(&mut tracker_guard, event.clone(), dp_rank, engine_source);
}
}
}
......@@ -187,6 +191,7 @@ fn process_event(
tracker: &mut CacheStatusTracker,
event: VllmRawEvent,
data_parallel_rank: Option<i32>,
engine_source: EventSource,
) {
match event {
VllmRawEvent::BlockStored {
......@@ -259,7 +264,7 @@ fn process_event(
tracker.handle_store(
block_hash.to_string(),
crate::block_manager::kv_consolidator::EventSource::Vllm,
engine_source,
block_tokens,
current_parent.clone(),
block_size_usize,
......@@ -289,10 +294,7 @@ fn process_event(
);
for block_hash in block_hashes {
tracker.handle_remove(
&block_hash.to_string(),
crate::block_manager::kv_consolidator::EventSource::Vllm,
);
tracker.handle_remove(&block_hash.to_string(), engine_source);
}
}
......
......@@ -3,11 +3,11 @@
//! Cache Status Tracker
//!
//! Maintains the state of KV cache blocks across different event sources (vLLM and KVBM)
//! Maintains the state of KV cache blocks across different event sources (vLLM, TensorRT-LLM, and KVBM)
//! and determines when to emit STORE/REMOVE events.
//!
//! - Tracks by EVENT SOURCE (vLLM vs KVBM) instead of storage tier
//! - vLLM source: G1 (GPU) events from vLLM worker
//! - Tracks by EVENT SOURCE (vLLM/TensorRT-LLM vs KVBM) instead of storage tier
//! - vLLM/TensorRT-LLM source: G1 (GPU) events from vLLM or TensorRT-LLM worker
//! - KVBM source: G2/G3 (host pinned/disk) events from KVBM
//! - Deduplication: Uses SequenceHash as the key
//! - Always computes sequence hash using KVBM's xxHash3 method, regardless of source
......@@ -59,10 +59,12 @@ fn compute_sequence_hash(
}
/// Event source for KV cache events
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub enum EventSource {
/// Events from vLLM worker (G1/GPU)
Vllm,
/// Events from TensorRT-LLM worker (G1/GPU)
Trtllm,
/// Events from KVBM
Kvbm,
}
......@@ -73,6 +75,7 @@ impl std::str::FromStr for EventSource {
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"vllm" | "VLLM" | "GPU" => Ok(EventSource::Vllm),
"trtllm" | "TRTLLM" | "TensorRT-LLM" => Ok(EventSource::Trtllm),
"kvbm" | "KVBM" => Ok(EventSource::Kvbm),
_ => Err(format!("Unknown event source: {}", s)),
}
......@@ -84,6 +87,7 @@ impl EventSource {
pub fn to_str(&self) -> &'static str {
match self {
EventSource::Vllm => "vllm",
EventSource::Trtllm => "trtllm",
EventSource::Kvbm => "kvbm",
}
}
......
......@@ -28,9 +28,9 @@ impl Resources {
event_mgr.clone()
} else if let Some(consolidator_config) = config.consolidator_config.clone() {
tracing::info!(
"Creating DynamoEventManager with kv event consolidator config: vllm={}, output={}",
consolidator_config.vllm_event_endpoint,
consolidator_config.consolidated_event_endpoint
"Creating DynamoEventManager with kv event consolidator config: engine={}, source={:?}",
consolidator_config.engine_event_endpoint,
consolidator_config.engine_source
);
// Create DynamoEventManager with consolidator config (async)
match DynamoEventManager::new_with_config(consolidator_config).await {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment