Unverified Commit 713c96d2 authored by Michael Feil's avatar Michael Feil Committed by GitHub
Browse files

feat: dynamic batching of client side events (#6733)


Signed-off-by: default avatarmichaelfeil <63565275+michaelfeil@users.noreply.github.com>
parent e5850e23
...@@ -132,8 +132,25 @@ pub(crate) struct KvEventPublisher { ...@@ -132,8 +132,25 @@ pub(crate) struct KvEventPublisher {
#[pymethods] #[pymethods]
impl KvEventPublisher { impl KvEventPublisher {
/// Create a KV event publisher that batches raw engine events before forwarding
/// them to NATS / the event plane.
///
/// Args:
/// endpoint: The Dynamo component endpoint for this worker.
/// worker_id: Identifier of this worker (default 0).
/// kv_block_size: KV cache block size in tokens; must be > 0.
/// dp_rank: Data-parallel rank of this worker (default 0).
/// enable_local_indexer: When True, a local KV indexer is kept in-process
/// so that routers can recover events directly from this worker.
/// zmq_endpoint: Optional ZMQ SUB endpoint to read raw engine events from.
/// zmq_topic: ZMQ topic filter (default "").
/// batching_timeout_us: Maximum time (in **microseconds**) to accumulate
/// events into a single batch before flushing.
/// ``None`` uses the default window of 10000 µs (10 ms).
/// ``0`` disables batching: every event is published immediately.
#[new] #[new]
#[pyo3(signature = (endpoint, worker_id=0, kv_block_size=0, dp_rank=0, enable_local_indexer=false, zmq_endpoint=None, zmq_topic=None))] #[pyo3(signature = (endpoint, worker_id=0, kv_block_size=0, dp_rank=0, enable_local_indexer=false, zmq_endpoint=None, zmq_topic=None, batching_timeout_us=None))]
#[allow(clippy::too_many_arguments)]
fn new( fn new(
endpoint: Endpoint, endpoint: Endpoint,
worker_id: WorkerId, worker_id: WorkerId,
...@@ -142,6 +159,7 @@ impl KvEventPublisher { ...@@ -142,6 +159,7 @@ impl KvEventPublisher {
enable_local_indexer: bool, enable_local_indexer: bool,
zmq_endpoint: Option<String>, zmq_endpoint: Option<String>,
zmq_topic: Option<String>, zmq_topic: Option<String>,
batching_timeout_us: Option<u64>,
) -> PyResult<Self> { ) -> PyResult<Self> {
let _ = worker_id; let _ = worker_id;
...@@ -163,6 +181,7 @@ impl KvEventPublisher { ...@@ -163,6 +181,7 @@ impl KvEventPublisher {
source_config, source_config,
enable_local_indexer, enable_local_indexer,
dp_rank, dp_rank,
batching_timeout_us,
) )
.map_err(to_pyerr)?; .map_err(to_pyerr)?;
......
...@@ -5,7 +5,7 @@ use std::collections::HashSet; ...@@ -5,7 +5,7 @@ use std::collections::HashSet;
use std::fmt; use std::fmt;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::time::Duration; use std::time::{Duration, Instant};
use anyhow::Result; use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
...@@ -53,6 +53,70 @@ const MAX_BACKOFF_MS: u64 = 5000; ...@@ -53,6 +53,70 @@ const MAX_BACKOFF_MS: u64 = 5000;
const MAX_CONSECUTIVE_ERRORS: u32 = 10; const MAX_CONSECUTIVE_ERRORS: u32 = 10;
const MAX_BACKOFF_EXPONENT: u32 = 8; // Cap at 2^8 = 256x multiplier to prevent overflow const MAX_BACKOFF_EXPONENT: u32 = 8; // Cap at 2^8 = 256x multiplier to prevent overflow
// Batching configuration
const BATCH_TIMEOUT_US: u64 = 10_000;
// -------------------------------------------------------------------------
// Batching State -----------------------------------------------------------
// -------------------------------------------------------------------------
/// Accumulator for in-flight KV cache events that will be merged into a single
/// [`RouterEvent`] before being forwarded to the event sink.
#[derive(Debug)]
struct BatchingState {
/// Block hashes accumulating for the next Removed event.
pending_removed: Option<KvCacheRemoveData>,
/// Blocks accumulating for the next Stored event.
pending_stored: Option<KvCacheStoreData>,
/// Monotonic published-batch counter. Increments by 1 per flush so downstream
/// consumers always see consecutive event IDs, regardless of how many raw source
/// events were merged into the batch.
next_publish_id: u64,
/// dp_rank of the events in the current pending batch.
/// A change signals that the batch must be flushed before accumulating further.
last_dp_rank: u32,
/// When the current batch started accumulating (set on the first event of each batch).
/// Used to compute the remaining window before the batch is force-flushed.
batch_start_time: Instant,
}
impl BatchingState {
fn new() -> Self {
Self {
pending_removed: None,
pending_stored: None,
next_publish_id: 1,
last_dp_rank: 0,
batch_start_time: Instant::now(),
}
}
fn has_pending(&self) -> bool {
self.pending_removed.is_some() || self.pending_stored.is_some()
}
/// Marks the start of a new batch, resetting the flush-window timer.
fn start_batch_timer(&mut self) {
self.batch_start_time = Instant::now();
}
/// Returns the time remaining in the current batch window (zero if already elapsed).
fn remaining_timeout(&self, timeout_us: u64) -> Duration {
let timeout = Duration::from_micros(timeout_us);
let elapsed = self.batch_start_time.elapsed();
if elapsed >= timeout {
Duration::ZERO
} else {
timeout - elapsed
}
}
/// Returns `true` when the batch window has elapsed (or `timeout_us` is zero).
fn is_timeout_elapsed(&self, timeout_us: u64) -> bool {
self.remaining_timeout(timeout_us) == Duration::ZERO
}
}
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
// KV Event Publishers ----------------------------------------------------- // KV Event Publishers -----------------------------------------------------
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
...@@ -131,7 +195,7 @@ impl KvEventPublisher { ...@@ -131,7 +195,7 @@ impl KvEventPublisher {
kv_block_size: u32, kv_block_size: u32,
source_config: Option<KvEventSourceConfig>, source_config: Option<KvEventSourceConfig>,
) -> Result<Self> { ) -> Result<Self> {
Self::new_with_local_indexer(component, kv_block_size, source_config, false, 0) Self::new_with_local_indexer(component, kv_block_size, source_config, false, 0, None)
} }
pub fn new_with_local_indexer( pub fn new_with_local_indexer(
...@@ -140,8 +204,10 @@ impl KvEventPublisher { ...@@ -140,8 +204,10 @@ impl KvEventPublisher {
source_config: Option<KvEventSourceConfig>, source_config: Option<KvEventSourceConfig>,
enable_local_indexer: bool, enable_local_indexer: bool,
dp_rank: DpRank, dp_rank: DpRank,
batching_timeout_us: Option<u64>,
) -> Result<Self> { ) -> Result<Self> {
let cancellation_token = CancellationToken::new(); let cancellation_token = CancellationToken::new();
let batching_timeout_us = batching_timeout_us.unwrap_or(BATCH_TIMEOUT_US);
let (tx, rx) = mpsc::unbounded_channel::<KvCacheEvent>(); let (tx, rx) = mpsc::unbounded_channel::<KvCacheEvent>();
...@@ -230,6 +296,7 @@ impl KvEventPublisher { ...@@ -230,6 +296,7 @@ impl KvEventPublisher {
cancellation_token_clone, cancellation_token_clone,
rx, rx,
local_indexer_clone, local_indexer_clone,
batching_timeout_us,
) )
.await .await
}); });
...@@ -255,6 +322,7 @@ impl KvEventPublisher { ...@@ -255,6 +322,7 @@ impl KvEventPublisher {
cancellation_token_clone, cancellation_token_clone,
rx, rx,
local_indexer_clone, local_indexer_clone,
batching_timeout_us,
) )
.await .await
}); });
...@@ -319,96 +387,225 @@ impl EventSink for NatsQueue { ...@@ -319,96 +387,225 @@ impl EventSink for NatsQueue {
} }
} }
/// Event processor for ephemeral transports (NATS Core / ZMQ). /// Publishes a single [`KvCacheEvent`] to the event sink and, when present, the local indexer.
async fn start_event_processor<P: EventSink + Send + Sync + 'static>( /// Errors are logged and swallowed so the caller loop can continue uninterrupted.
publisher: P, async fn emit<P: EventSink>(
publisher: &P,
local_indexer: &Option<Arc<LocalKvIndexer>>,
worker_id: u64, worker_id: u64,
cancellation_token: CancellationToken, event: KvCacheEvent,
mut rx: mpsc::UnboundedReceiver<KvCacheEvent>,
local_indexer: Option<Arc<LocalKvIndexer>>,
) { ) {
loop {
tokio::select! {
_ = cancellation_token.cancelled() => {
tracing::info!("KV Event source received cancellation signal");
break;
}
event = rx.recv() => {
let Some(event) = event else {
tracing::debug!("Event processor channel closed.");
break;
};
// Encapsulate in a router event.
tracing::trace!("Event processor for worker_id {} processing event: {:?}", worker_id, event.data);
let router_event = RouterEvent::new(worker_id, event); let router_event = RouterEvent::new(worker_id, event);
if let Some(indexer) = local_indexer
// Apply to local indexer first (if present) && let Err(e) = indexer.apply_event_with_buffer(router_event.clone()).await
if let Some(indexer) = &local_indexer { {
// Adds event into local indexer, and logs it into internal buffer tracing::warn!(worker_id, error = %e, "Failed to apply event to local indexer");
if let Err(e) = indexer.apply_event_with_buffer(router_event.clone()).await {
tracing::warn!(
"Failed to send event to local indexer for worker {}: {}",
worker_id,
e
);
}
} }
// Then publish to event plane for global distribution.
if let Err(e) = publisher.publish_event(&router_event).await { if let Err(e) = publisher.publish_event(&router_event).await {
tracing::error!("Failed to publish event: {}", e); tracing::error!(worker_id, error = %e, "Failed to publish event");
} }
}
impl BatchingState {
/// Publishes any pending batch as a single [`RouterEvent`] and advances the monotonic
/// batch ID. No-ops when nothing is pending, so callers may call unconditionally.
async fn flush<P: EventSink + Send + Sync + 'static>(
&mut self,
publisher: &P,
local_indexer: &Option<Arc<LocalKvIndexer>>,
worker_id: u64,
) {
if !self.has_pending() {
return;
}
let id = self.next_publish_id;
let dp_rank = self.last_dp_rank;
if let Some(data) = self.pending_removed.take() {
emit(
publisher,
local_indexer,
worker_id,
KvCacheEvent {
event_id: id,
data: KvCacheEventData::Removed(data),
dp_rank,
},
)
.await;
} }
if let Some(data) = self.pending_stored.take() {
emit(
publisher,
local_indexer,
worker_id,
KvCacheEvent {
event_id: id,
data: KvCacheEventData::Stored(data),
dp_rank,
},
)
.await;
} }
// Consecutive batch IDs (1, 2, 3, …) keep downstream gap-detection happy.
self.next_publish_id += 1;
} }
} }
/// Event processor using JetStream (durable). /// Batching loop: accumulates Removed/Stored events and flushes them as a single
async fn start_event_processor_jetstream( /// [`RouterEvent`] when any of the following conditions are met:
publisher: NatsQueue, /// - Event type switches (Removed ↔ Stored)
/// - `dp_rank` changes between consecutive events
/// - A `Stored` event's `parent_hash` breaks the sequential chain
/// - The batch window expires (`timeout_us`, default 10 ms)
/// - Channel is closed or a cancellation signal is received
async fn run_event_processor_loop<P: EventSink + Send + Sync + 'static>(
publisher: P,
worker_id: u64, worker_id: u64,
cancellation_token: CancellationToken, cancellation_token: CancellationToken,
mut rx: mpsc::UnboundedReceiver<KvCacheEvent>, mut rx: mpsc::UnboundedReceiver<KvCacheEvent>,
local_indexer: Option<Arc<LocalKvIndexer>>, local_indexer: Option<Arc<LocalKvIndexer>>,
timeout_us: u64,
) { ) {
let mut batching_state = BatchingState::new();
// Track last raw input event_id for gap detection (dropped events before batching).
// The raw event_id is a globally monotonic counter assigned by the ZMQ listener,
// so any gap here means events were silently dropped (e.g. send error on the channel).
let mut last_raw_input_id: Option<u64> = None;
loop { loop {
let remaining = batching_state.remaining_timeout(timeout_us);
tokio::select! { tokio::select! {
_ = cancellation_token.cancelled() => { _ = cancellation_token.cancelled() => {
tracing::info!("KV Event source received cancellation signal"); tracing::info!("KV Event source received cancellation signal");
batching_state.flush(&publisher, &local_indexer, worker_id).await;
break; break;
} }
event = rx.recv() => { event = rx.recv() => {
let Some(event) = event else { let Some(event) = event else {
tracing::debug!("Event processor channel closed."); tracing::debug!("Event processor channel closed.");
batching_state.flush(&publisher, &local_indexer, worker_id).await;
break; break;
}; };
// Encapsulate in a router event. // Warn if the raw input event_id is not consecutive — events were dropped
tracing::trace!("Event processor for worker_id {} processing event: {:?}", worker_id, event.data); // (e.g. channel send error) before they reached the batching layer.
let router_event = RouterEvent::new(worker_id, event); let raw_event_id = event.event_id;
if let Some(last_id) = last_raw_input_id
// Apply to local indexer first (if present) && raw_event_id > last_id + 1 {
if let Some(indexer) = &local_indexer {
// Adds event into local indexer, and logs it into internal buffer
if let Err(e) = indexer.apply_event_with_buffer(router_event.clone()).await {
tracing::warn!( tracing::warn!(
"Failed to send event to local indexer for worker {}: {}",
worker_id, worker_id,
e last_raw_input_id = last_id,
raw_event_id,
gap = raw_event_id - last_id - 1,
"Input event gap detected: raw events dropped before batching"
); );
} }
} last_raw_input_id = Some(raw_event_id);
tracing::trace!("Event processor for worker_id {} processing event: {:?}", worker_id, event.data);
let dp_rank_changed = batching_state.has_pending()
&& event.dp_rank != batching_state.last_dp_rank;
// Then publish to NATS JetStream for global distribution match event.data {
if let Err(e) = publisher.publish_event(KV_EVENT_SUBJECT, &router_event).await { KvCacheEventData::Removed(data) => {
tracing::error!("Failed to publish event to NATS JetStream: {}", e); if batching_state.pending_stored.is_some() || dp_rank_changed {
batching_state.flush(&publisher, &local_indexer, worker_id).await;
}
match &mut batching_state.pending_removed {
Some(pending) => pending.block_hashes.extend(data.block_hashes),
None => {
batching_state.pending_removed = Some(data);
batching_state.start_batch_timer();
}
}
}
KvCacheEventData::Stored(data) => {
// Flush if: type switch, dp_rank change, or the chain is broken
// (new event's parent_hash doesn't continue from the last stored block).
let should_flush = dp_rank_changed
|| batching_state.pending_removed.is_some()
|| batching_state.pending_stored.as_ref().is_some_and(|p| {
data.parent_hash != p.blocks.last().map(|b| b.block_hash)
});
if should_flush {
batching_state.flush(&publisher, &local_indexer, worker_id).await;
}
match &mut batching_state.pending_stored {
// Only extend blocks; parent_hash stays fixed from the first event.
Some(pending) => pending.blocks.extend(data.blocks),
None => {
batching_state.pending_stored = Some(data);
batching_state.start_batch_timer();
}
}
} }
KvCacheEventData::Cleared => {
batching_state.flush(&publisher, &local_indexer, worker_id).await;
emit(&publisher, &local_indexer, worker_id, KvCacheEvent {
event_id: batching_state.next_publish_id,
data: KvCacheEventData::Cleared,
dp_rank: event.dp_rank,
}).await;
batching_state.next_publish_id += 1;
}
}
// Track dp_rank after the match so in-flight flushes use the old value.
batching_state.last_dp_rank = event.dp_rank;
// Flush immediately if the timeout already elapsed (handles timeout_us=0).
// The sleep arm below only arms for timeout_us>0; this check covers the rest.
if batching_state.has_pending() && batching_state.is_timeout_elapsed(timeout_us) {
batching_state.flush(&publisher, &local_indexer, worker_id).await;
} }
} }
_ = tokio::time::sleep(remaining), if timeout_us > 0 && batching_state.has_pending() => {
batching_state.flush(&publisher, &local_indexer, worker_id).await;
} }
}
}
}
/// Batched event processor for ephemeral transports (NATS Core / ZMQ).
async fn start_event_processor<P: EventSink + Send + Sync + 'static>(
publisher: P,
worker_id: u64,
cancellation_token: CancellationToken,
rx: mpsc::UnboundedReceiver<KvCacheEvent>,
local_indexer: Option<Arc<LocalKvIndexer>>,
batching_timeout_us: u64,
) {
run_event_processor_loop(
publisher,
worker_id,
cancellation_token,
rx,
local_indexer,
batching_timeout_us,
)
.await
}
/// Batched event processor using JetStream (durable).
async fn start_event_processor_jetstream(
publisher: NatsQueue,
worker_id: u64,
cancellation_token: CancellationToken,
rx: mpsc::UnboundedReceiver<KvCacheEvent>,
local_indexer: Option<Arc<LocalKvIndexer>>,
batching_timeout_us: u64,
) {
run_event_processor_loop(
publisher,
worker_id,
cancellation_token,
rx,
local_indexer,
batching_timeout_us,
)
.await
} }
/// Calculate exponential backoff duration based on consecutive error count /// Calculate exponential backoff duration based on consecutive error count
...@@ -1580,7 +1777,14 @@ mod tests_startup_helpers { ...@@ -1580,7 +1777,14 @@ mod tests_startup_helpers {
tx.send(event).unwrap(); tx.send(event).unwrap();
drop(tx); drop(tx);
let handle = tokio::spawn(start_event_processor(component, 1, token, rx, None)); let handle = tokio::spawn(start_event_processor(
component,
1,
token,
rx,
None,
BATCH_TIMEOUT_US,
));
tokio::time::timeout(tokio::time::Duration::from_secs(1), handle) tokio::time::timeout(tokio::time::Duration::from_secs(1), handle)
.await .await
...@@ -1637,6 +1841,7 @@ mod tests_startup_helpers { ...@@ -1637,6 +1841,7 @@ mod tests_startup_helpers {
token.clone(), token.clone(),
rx, rx,
Some(local_indexer.clone()), // arc::clone just increments atomic counters Some(local_indexer.clone()), // arc::clone just increments atomic counters
BATCH_TIMEOUT_US,
)); ));
// Wait for processing // Wait for processing
...@@ -1720,6 +1925,7 @@ mod tests_startup_helpers { ...@@ -1720,6 +1925,7 @@ mod tests_startup_helpers {
token.clone(), token.clone(),
rx, rx,
Some(local_indexer.clone()), Some(local_indexer.clone()),
BATCH_TIMEOUT_US,
)); ));
// Then remove same event // Then remove same event
...@@ -1810,6 +2016,7 @@ mod tests_startup_helpers { ...@@ -1810,6 +2016,7 @@ mod tests_startup_helpers {
token.clone(), token.clone(),
rx, rx,
Some(local_indexer.clone()), Some(local_indexer.clone()),
BATCH_TIMEOUT_US,
)); ));
tokio::time::timeout(tokio::time::Duration::from_secs(1), handle) tokio::time::timeout(tokio::time::Duration::from_secs(1), handle)
...@@ -1879,6 +2086,7 @@ mod tests_startup_helpers { ...@@ -1879,6 +2086,7 @@ mod tests_startup_helpers {
new_token, new_token,
rx, rx,
Some(local_indexer), Some(local_indexer),
BATCH_TIMEOUT_US,
)); ));
tokio::time::timeout(tokio::time::Duration::from_secs(1), handle) tokio::time::timeout(tokio::time::Duration::from_secs(1), handle)
...@@ -2005,6 +2213,7 @@ mod tests_startup_helpers { ...@@ -2005,6 +2213,7 @@ mod tests_startup_helpers {
token.clone(), token.clone(),
worker_rx, worker_rx,
Some(local_indexer_1.clone()), Some(local_indexer_1.clone()),
BATCH_TIMEOUT_US,
)); ));
// === SETUP: Router Components === // === SETUP: Router Components ===
...@@ -2313,3 +2522,1026 @@ mod test_integration_publisher { ...@@ -2313,3 +2522,1026 @@ mod test_integration_publisher {
Ok(()) Ok(())
} }
} }
#[cfg(test)]
mod batching_state_tests {
use super::*;
#[test]
fn test_batching_state_default() {
let state = BatchingState::new();
assert!(!state.has_pending(), "Default state should have no pending");
assert!(
state.pending_removed.is_none(),
"Default pending_removed should be None"
);
assert!(
state.pending_stored.is_none(),
"Default pending_stored should be None"
);
}
#[test]
fn test_batching_state_new() {
let state = BatchingState::new();
// batch_start_time should be set to approximately now
let elapsed = state.batch_start_time.elapsed();
assert!(
elapsed < Duration::from_secs(1),
"new() should create state with flush time set to approximately now"
);
}
#[test]
fn test_batching_state_pending_removed() {
let mut state = BatchingState::new();
assert!(!state.has_pending(), "Should not have pending initially");
state.pending_removed = Some(KvCacheRemoveData {
block_hashes: vec![],
});
assert!(
state.has_pending(),
"Should have pending after setting pending_removed"
);
}
#[test]
fn test_batching_state_pending_stored() {
let mut state = BatchingState::new();
assert!(!state.has_pending(), "Should not have pending initially");
state.pending_stored = Some(KvCacheStoreData {
parent_hash: None,
blocks: vec![],
});
assert!(
state.has_pending(),
"Should have pending after setting pending_stored"
);
}
#[test]
fn test_batching_state_timeout() {
let mut state = BatchingState::new();
// Reset flush time to now so we can test timeout behavior
state.start_batch_timer();
// Test that remaining returns positive initially (using 10ms = 10_000us)
let remaining_before = state.remaining_timeout(10_000);
assert!(
remaining_before.as_millis() > 0,
"Should have remaining time initially"
);
// Test zero timeout returns zero
let remaining_zero = state.remaining_timeout(0);
assert_eq!(
remaining_zero.as_millis(),
0,
"0 timeout should return zero"
);
}
#[test]
fn test_batching_state_start_batch_timer() {
let mut state = BatchingState::new();
let initial_time = state.batch_start_time;
state.start_batch_timer();
assert!(
state.batch_start_time >= initial_time,
"start_batch_timer should update the time"
);
}
#[test]
fn test_batching_state_remaining_timeout() {
let mut state = BatchingState::new();
// Reset flush time to now so we can test timeout behavior
state.start_batch_timer();
// Test that remaining returns positive initially
let remaining = state.remaining_timeout(10_000); // 10ms
assert!(
remaining.as_millis() > 0,
"Should have remaining time initially"
);
// Test that with 0 timeout, returns zero
let remaining_zero = state.remaining_timeout(0);
assert_eq!(
remaining_zero,
Duration::ZERO,
"0 timeout should return zero"
);
}
#[test]
fn test_batching_state_accumulate_removed() {
let mut state = BatchingState::new();
let first = KvCacheRemoveData {
block_hashes: vec![ExternalSequenceBlockHash(1), ExternalSequenceBlockHash(2)],
};
state.pending_removed = Some(first);
if let Some(ref mut pending) = state.pending_removed {
pending
.block_hashes
.extend(vec![ExternalSequenceBlockHash(3)]);
}
let pending = state.pending_removed.as_ref().unwrap();
assert_eq!(
pending.block_hashes.len(),
3,
"Should have accumulated 3 block hashes"
);
}
#[test]
fn test_batching_state_accumulate_stored() {
let mut state = BatchingState::new();
let block1 = KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(1),
tokens_hash: LocalBlockHash(100),
mm_extra_info: None,
};
let first = KvCacheStoreData {
parent_hash: Some(ExternalSequenceBlockHash(0)),
blocks: vec![block1],
};
state.pending_stored = Some(first);
let block2 = KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(2),
tokens_hash: LocalBlockHash(200),
mm_extra_info: None,
};
if let Some(ref mut pending) = state.pending_stored {
pending.blocks.extend(vec![block2]);
}
let pending = state.pending_stored.as_ref().unwrap();
assert_eq!(pending.blocks.len(), 2, "Should have accumulated 2 blocks");
}
}
#[cfg(test)]
mod event_processor_tests {
use super::*;
use std::sync::{Arc, Mutex};
use tokio_util::sync::CancellationToken;
/// Mock publisher that collects published events
#[derive(Debug, Clone)]
struct MockPublisher {
events: Arc<Mutex<Vec<RouterEvent>>>,
}
impl MockPublisher {
fn new() -> Self {
Self {
events: Arc::new(Mutex::new(Vec::new())),
}
}
fn get_events(&self) -> Vec<RouterEvent> {
self.events.lock().unwrap().clone()
}
}
#[async_trait]
impl EventSink for MockPublisher {
async fn publish_event(&self, event: &RouterEvent) -> Result<()> {
self.events.lock().unwrap().push(event.clone());
Ok(())
}
}
/// Test that pushing N removed events results in batched output
/// Uses a 10ms timeout to ensure events are batched (events sent rapidly)
#[tokio::test]
async fn test_run_event_processor_loop_batches_removed_events_20() {
test_removed_events_batching(20, 10_000).await; // 20 events, 20ms timeout
}
#[tokio::test]
async fn test_run_event_processor_loop_batches_removed_events_10() {
test_removed_events_batching(10, 10_000).await; // 10 events, 10ms timeout
}
#[tokio::test]
async fn test_run_event_processor_loop_batches_removed_events_5() {
test_removed_events_batching(5, 10_000).await; // 5 events, 10ms timeout
}
#[tokio::test]
async fn test_run_event_processor_loop_batches_removed_events_3() {
test_removed_events_batching(3, 10_000).await; // 3 events, 10ms timeout
}
/// Helper function to test removed events batching with configurable count and timeout
async fn test_removed_events_batching(event_count: usize, timeout_us: u64) {
let (tx, rx) = mpsc::unbounded_channel::<KvCacheEvent>();
let publisher = MockPublisher::new();
let publisher_clone = publisher.clone();
let cancellation_token = CancellationToken::new();
let handle = tokio::spawn(async move {
run_event_processor_loop(publisher_clone, 1, cancellation_token, rx, None, timeout_us)
.await
});
for i in 0..event_count {
let event = KvCacheEvent {
event_id: i as u64,
data: KvCacheEventData::Removed(KvCacheRemoveData {
block_hashes: vec![ExternalSequenceBlockHash(i as u64)],
}),
dp_rank: 0,
};
tx.send(event).unwrap();
// Yield to allow event processor to process the event
tokio::task::yield_now().await;
}
// Wait for timeout to elapse so all events flush together as one batch
// Add small buffer to ensure flush happens before channel close
tokio::time::sleep(tokio::time::Duration::from_micros(timeout_us + 1000)).await;
drop(tx);
handle.await.unwrap();
let events = publisher.get_events();
assert!(
!events.is_empty(),
"Should have received at least one event"
);
// With a long timeout (100ms) and rapid event sending, all events should batch into few output events
// (first event may flush separately, rest should batch together)
assert!(
events.len() <= 2,
"With long timeout ({}us), all {} events should batch into at most 2 output events (got {})",
timeout_us,
event_count,
events.len()
);
let total_hashes: usize = events
.iter()
.map(|e| {
if let KvCacheEventData::Removed(data) = &e.event.data {
data.block_hashes.len()
} else {
0
}
})
.sum();
assert_eq!(
total_hashes, event_count,
"All {} block hashes should be accounted for",
event_count
);
}
/// Test sequential stored events accumulate with different counts
/// Uses a longer timeout (100ms) to ensure events have time to batch
#[tokio::test]
async fn test_run_event_processor_loop_batches_stored_events_20() {
test_stored_events_batching(20, 100_000).await; // 20 events, 100ms timeout
}
#[tokio::test]
async fn test_run_event_processor_loop_batches_stored_events_10() {
test_stored_events_batching(10, 100_000).await; // 10 events, 100ms timeout
}
#[tokio::test]
async fn test_run_event_processor_loop_batches_stored_events_5() {
test_stored_events_batching(5, 100_000).await; // 5 events, 100ms timeout
}
#[tokio::test]
async fn test_run_event_processor_loop_batches_stored_events_3() {
test_stored_events_batching(3, 100_000).await; // 3 events, 100ms timeout
}
/// Helper function to test stored events batching with configurable count and timeout
async fn test_stored_events_batching(event_count: usize, timeout_us: u64) {
let (tx, rx) = mpsc::unbounded_channel::<KvCacheEvent>();
let publisher = MockPublisher::new();
let publisher_clone = publisher.clone();
let cancellation_token = CancellationToken::new();
let handle = tokio::spawn(async move {
run_event_processor_loop(publisher_clone, 1, cancellation_token, rx, None, timeout_us)
.await
});
for i in 0..event_count {
// For sequential batching, each event's parent_hash should be the previous event's block_hash
let parent_hash = if i == 0 {
Some(ExternalSequenceBlockHash(0)) // First event has parent_hash = 0
} else {
Some(ExternalSequenceBlockHash((i - 1) as u64)) // Subsequent events reference previous block
};
let event = KvCacheEvent {
event_id: i as u64,
data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash,
blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(i as u64),
tokens_hash: LocalBlockHash(i as u64 * 100),
mm_extra_info: None,
}],
}),
dp_rank: 0,
};
tx.send(event).unwrap();
// Small sleep to allow event processor to batch events
tokio::time::sleep(tokio::time::Duration::from_micros(100)).await;
}
// Give the processor time to process all events before closing the channel
tokio::time::sleep(tokio::time::Duration::from_millis(2)).await;
drop(tx);
handle.await.unwrap();
let events = publisher.get_events();
assert!(
!events.is_empty(),
"Should have received at least one event"
);
// With a long timeout, events should be batched. Either 1 or can be at most 2, if the first event flushes separately due to initial timestamp.
assert!(
events.len() <= 2,
"With long timeout ({}us) and sequential parent hashes, all {} events should batch into at most 2 output events (got {})",
timeout_us,
event_count,
events.len()
);
if events.len() == 2 {
// If we got 2 events, the first one should contain only the first block, and the second should contain the rest
if let KvCacheEventData::Stored(data) = &events[0].event.data {
assert_eq!(
data.blocks.len(),
1,
"If 2 events, first event should have 1 block (got {})",
data.blocks.len()
);
} else {
panic!("Expected Stored event");
}
}
let total_blocks: usize = events
.iter()
.map(|e| {
if let KvCacheEventData::Stored(data) = &e.event.data {
data.blocks.len()
} else {
0
}
})
.sum();
assert_eq!(
total_blocks, event_count,
"All {} blocks should be accounted for",
event_count
);
}
/// Test non-sequential stored events trigger flush
#[tokio::test]
async fn test_run_event_processor_loop_non_sequential_flush() {
let timeout_us = 100_000; // 100ms in microseconds
let (tx, rx) = mpsc::unbounded_channel::<KvCacheEvent>();
let publisher = MockPublisher::new();
let publisher_clone = publisher.clone();
let cancellation_token = CancellationToken::new();
let handle = tokio::spawn(async move {
run_event_processor_loop(publisher_clone, 1, cancellation_token, rx, None, timeout_us)
.await
// SLEEP HERE?! so that events are not batched!
});
for i in 0..3 {
let event = KvCacheEvent {
event_id: i as u64,
data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: Some(ExternalSequenceBlockHash((i + 1) as u64 * 100)),
blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(i as u64),
tokens_hash: LocalBlockHash(i as u64 * 100),
mm_extra_info: None,
}],
}),
dp_rank: 0,
};
tx.send(event).unwrap();
}
drop(tx);
handle.await.unwrap();
let events = publisher.get_events();
assert!(!events.is_empty(), "Should have received events");
// With non-sequential parent hashes, each event should trigger a flush
// So we expect 3 separate events
assert_eq!(
events.len(),
3,
"Non-sequential events should trigger flush, resulting in 3 separate events"
);
let total_blocks: usize = events
.iter()
.map(|e| {
if let KvCacheEventData::Stored(data) = &e.event.data {
data.blocks.len()
} else {
0
}
})
.sum();
assert_eq!(total_blocks, 3, "All 3 blocks should be accounted for");
}
/// Test that with short timeout and slow input, events are NOT batched
/// Parametrized over different timeout values: 0ms, 0.1ms, 0.2ms
/// All use 2ms delay between events, so each event times out before the next arrives
#[tokio::test]
async fn test_run_event_processor_loop_no_batching_with_slow_input_0ms() {
test_no_batching_with_slow_input(0).await; // 0ms timeout
}
#[tokio::test]
async fn test_run_event_processor_loop_no_batching_with_slow_input_0_1ms() {
test_no_batching_with_slow_input(100).await; // 0.1ms timeout
}
#[tokio::test]
async fn test_run_event_processor_loop_no_batching_with_slow_input_0_2ms() {
test_no_batching_with_slow_input(200).await; // 0.2ms timeout
}
/// Helper function to test no batching with slow input
async fn test_no_batching_with_slow_input(timeout_us: u64) {
let (tx, rx) = mpsc::unbounded_channel::<KvCacheEvent>();
let publisher = MockPublisher::new();
let publisher_clone = publisher.clone();
let cancellation_token = CancellationToken::new();
let handle = tokio::spawn(async move {
run_event_processor_loop(publisher_clone, 1, cancellation_token, rx, None, timeout_us)
.await
});
// Send 5 removed events with 2ms delay between each
// Since timeout is <= 0.2ms, each event should timeout and be sent individually
for i in 0..5 {
let event = KvCacheEvent {
event_id: i as u64,
data: KvCacheEventData::Removed(KvCacheRemoveData {
block_hashes: vec![ExternalSequenceBlockHash(i as u64)],
}),
dp_rank: 0,
};
tx.send(event).unwrap();
// Wait 2ms between events (much longer than the timeout)
// This ensures each event times out before the next one arrives
tokio::time::sleep(tokio::time::Duration::from_millis(2)).await;
}
// Give the processor time to process the last event
tokio::time::sleep(tokio::time::Duration::from_millis(2)).await;
drop(tx);
handle.await.unwrap();
let events = publisher.get_events();
assert!(!events.is_empty(), "Should have received events");
// With slow input (2ms delay) and short timeout, most events should be sent individually
// We expect at least 3 separate events (showing reduced batching)
assert!(
events.len() >= 3,
"With slow input (2ms delay) and timeout={}us, should have at least 3 separate events (got {})",
timeout_us,
events.len()
);
let total_hashes: usize = events
.iter()
.map(|e| {
if let KvCacheEventData::Removed(data) = &e.event.data {
data.block_hashes.len()
} else {
0
}
})
.sum();
assert_eq!(
total_hashes, 5,
"All 5 block hashes should be accounted for"
);
}
/// Test that switching between Removed and Stored events causes immediate flush
#[tokio::test]
async fn test_event_type_switching_causes_flush() {
let timeout_us = 100_000; // 100ms timeout
let (tx, rx) = mpsc::unbounded_channel::<KvCacheEvent>();
let publisher = MockPublisher::new();
let publisher_clone = publisher.clone();
let cancellation_token = CancellationToken::new();
let handle = tokio::spawn(async move {
run_event_processor_loop(publisher_clone, 1, cancellation_token, rx, None, timeout_us)
.await
});
// Send a Removed event
tx.send(KvCacheEvent {
event_id: 0,
data: KvCacheEventData::Removed(KvCacheRemoveData {
block_hashes: vec![ExternalSequenceBlockHash(0)],
}),
dp_rank: 0,
})
.unwrap();
// Small sleep
tokio::time::sleep(tokio::time::Duration::from_micros(100)).await;
// Send a Stored event (should cause flush of the Removed event)
tx.send(KvCacheEvent {
event_id: 1,
data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: Some(ExternalSequenceBlockHash(0)),
blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(1),
tokens_hash: LocalBlockHash(100),
mm_extra_info: None,
}],
}),
dp_rank: 0,
})
.unwrap();
// Give time for processing
tokio::time::sleep(tokio::time::Duration::from_millis(2)).await;
drop(tx);
handle.await.unwrap();
let events = publisher.get_events();
// Should have 2 events: one Removed, one Stored (not batched together)
assert_eq!(
events.len(),
2,
"Switching from Removed to Stored should cause immediate flush, resulting in 2 separate events"
);
}
/// Test that dp_rank change causes immediate flush
#[tokio::test]
async fn test_dp_rank_change_causes_flush() {
let timeout_us = 100_000; // 100ms timeout
let (tx, rx) = mpsc::unbounded_channel::<KvCacheEvent>();
let publisher = MockPublisher::new();
let publisher_clone = publisher.clone();
let cancellation_token = CancellationToken::new();
let handle = tokio::spawn(async move {
run_event_processor_loop(publisher_clone, 1, cancellation_token, rx, None, timeout_us)
.await
});
// Send events with dp_rank=0
for i in 0..3 {
tx.send(KvCacheEvent {
event_id: i as u64,
data: KvCacheEventData::Removed(KvCacheRemoveData {
block_hashes: vec![ExternalSequenceBlockHash(i as u64)],
}),
dp_rank: 0,
})
.unwrap();
tokio::task::yield_now().await;
}
// Send events with dp_rank=1 (should cause flush of previous batch)
for i in 3..6 {
tx.send(KvCacheEvent {
event_id: i as u64,
data: KvCacheEventData::Removed(KvCacheRemoveData {
block_hashes: vec![ExternalSequenceBlockHash(i as u64)],
}),
dp_rank: 1,
})
.unwrap();
tokio::task::yield_now().await;
}
// Give time for processing
tokio::time::sleep(tokio::time::Duration::from_millis(2)).await;
drop(tx);
handle.await.unwrap();
let events = publisher.get_events();
// Should have 2 events: one for dp_rank=0 batch, one for dp_rank=1 batch
assert_eq!(
events.len(),
2,
"dp_rank change should cause immediate flush, resulting in 2 separate events"
);
// Verify all 6 block hashes are accounted for
let total_hashes: usize = events
.iter()
.map(|e| {
if let KvCacheEventData::Removed(data) = &e.event.data {
data.block_hashes.len()
} else {
0
}
})
.sum();
assert_eq!(
total_hashes, 6,
"All 6 block hashes should be accounted for"
);
// Verify dp_rank is correct for each batch
assert_eq!(
events[0].event.dp_rank, 0,
"First batch should have dp_rank=0"
);
assert_eq!(
events[1].event.dp_rank, 1,
"Second batch should have dp_rank=1"
);
}
/// Test that flushed events have correct metadata (event_id, dp_rank)
/// This verifies that metadata is NOT overwritten before flush
#[tokio::test]
async fn test_flushed_events_have_correct_metadata() {
let timeout_us = 100_000; // 100ms timeout
let (tx, rx) = mpsc::unbounded_channel::<KvCacheEvent>();
let publisher = MockPublisher::new();
let publisher_clone = publisher.clone();
let cancellation_token = CancellationToken::new();
let handle = tokio::spawn(async move {
run_event_processor_loop(publisher_clone, 1, cancellation_token, rx, None, timeout_us)
.await
});
// Send first batch: 3 events with dp_rank=0, event_ids 10-12
for i in 0..3 {
tx.send(KvCacheEvent {
event_id: 10 + i as u64,
data: KvCacheEventData::Removed(KvCacheRemoveData {
block_hashes: vec![ExternalSequenceBlockHash(i as u64)],
}),
dp_rank: 0,
})
.unwrap();
tokio::task::yield_now().await;
}
// Send second batch: 2 events with dp_rank=1, event_ids 20-21
// This should flush the first batch with dp_rank=0
for i in 0..2 {
tx.send(KvCacheEvent {
event_id: 20 + i as u64,
data: KvCacheEventData::Removed(KvCacheRemoveData {
block_hashes: vec![ExternalSequenceBlockHash((i + 3) as u64)],
}),
dp_rank: 1,
})
.unwrap();
tokio::task::yield_now().await;
}
tokio::time::sleep(tokio::time::Duration::from_millis(2)).await;
drop(tx);
handle.await.unwrap();
let events = publisher.get_events();
assert_eq!(
events.len(),
2,
"Should have 2 events (one per dp_rank batch)"
);
// First event should have dp_rank=0 and monotonic batch event_id=1
assert_eq!(
events[0].event.dp_rank, 0,
"First batch should have dp_rank=0"
);
assert_eq!(
events[0].event.event_id, 1,
"First batch should have monotonic event_id=1"
);
// Second event should have dp_rank=1 and monotonic batch event_id=2
assert_eq!(
events[1].event.dp_rank, 1,
"Second batch should have dp_rank=1"
);
assert_eq!(
events[1].event.event_id, 2,
"Second batch should have monotonic event_id=2"
);
}
/// Test that first event after idle period doesn't flush immediately.
#[tokio::test]
async fn test_first_event_after_idle_no_immediate_flush() {
let timeout_us = 50_000; // 50ms timeout
let (tx, rx) = mpsc::unbounded_channel::<KvCacheEvent>();
let publisher = MockPublisher::new();
let publisher_clone = publisher.clone();
let cancellation_token = CancellationToken::new();
let handle = tokio::spawn(async move {
run_event_processor_loop(publisher_clone, 1, cancellation_token, rx, None, timeout_us)
.await
});
// Wait longer than timeout to simulate idle period
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
// Send 3 events rapidly - they should batch together
for i in 0..3 {
tx.send(KvCacheEvent {
event_id: i as u64,
data: KvCacheEventData::Removed(KvCacheRemoveData {
block_hashes: vec![ExternalSequenceBlockHash(i as u64)],
}),
dp_rank: 0,
})
.unwrap();
tokio::task::yield_now().await;
}
// Wait for timeout to elapse so batch flushes
tokio::time::sleep(tokio::time::Duration::from_millis(60)).await;
drop(tx);
handle.await.unwrap();
let events = publisher.get_events();
// All 3 events should be batched into 1 output event
assert_eq!(
events.len(),
1,
"All 3 events should batch into 1 output event (not flush immediately due to stale timer)"
);
let total_hashes: usize = events
.iter()
.map(|e| {
if let KvCacheEventData::Removed(data) = &e.event.data {
data.block_hashes.len()
} else {
0
}
})
.sum();
assert_eq!(
total_hashes, 3,
"All 3 block hashes should be accounted for"
);
}
/// Test that stored events with dp_rank change have correct metadata
#[tokio::test]
async fn test_stored_events_with_dp_rank_change_correct_metadata() {
let timeout_us = 100_000; // 100ms timeout
let (tx, rx) = mpsc::unbounded_channel::<KvCacheEvent>();
let publisher = MockPublisher::new();
let publisher_clone = publisher.clone();
let cancellation_token = CancellationToken::new();
let handle = tokio::spawn(async move {
run_event_processor_loop(publisher_clone, 1, cancellation_token, rx, None, timeout_us)
.await
});
// Send first batch: 2 sequential stored events with dp_rank=0, event_ids 100-101
tx.send(KvCacheEvent {
event_id: 100,
data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: Some(ExternalSequenceBlockHash(0)),
blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(1),
tokens_hash: LocalBlockHash(100),
mm_extra_info: None,
}],
}),
dp_rank: 0,
})
.unwrap();
tokio::task::yield_now().await;
tx.send(KvCacheEvent {
event_id: 101,
data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: Some(ExternalSequenceBlockHash(1)),
blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(2),
tokens_hash: LocalBlockHash(200),
mm_extra_info: None,
}],
}),
dp_rank: 0,
})
.unwrap();
tokio::task::yield_now().await;
// Send second batch: 1 event with dp_rank=1, event_id=200
// This should flush the first batch with dp_rank=0, event_id=101
tx.send(KvCacheEvent {
event_id: 200,
data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: Some(ExternalSequenceBlockHash(0)),
blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(100),
tokens_hash: LocalBlockHash(1000),
mm_extra_info: None,
}],
}),
dp_rank: 1,
})
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(2)).await;
drop(tx);
handle.await.unwrap();
let events = publisher.get_events();
assert_eq!(
events.len(),
2,
"Should have 2 events (one per dp_rank batch)"
);
// First batch: dp_rank=0, monotonic event_id=1
assert_eq!(
events[0].event.dp_rank, 0,
"First batch should have dp_rank=0"
);
assert_eq!(
events[0].event.event_id, 1,
"First batch should have monotonic event_id=1"
);
// Second batch: dp_rank=1, monotonic event_id=2
assert_eq!(
events[1].event.dp_rank, 1,
"Second batch should have dp_rank=1"
);
assert_eq!(
events[1].event.event_id, 2,
"Second batch should have monotonic event_id=2"
);
// Verify block counts
if let KvCacheEventData::Stored(data) = &events[0].event.data {
assert_eq!(data.blocks.len(), 2, "First batch should have 2 blocks");
} else {
panic!("Expected Stored event");
}
if let KvCacheEventData::Stored(data) = &events[1].event.data {
assert_eq!(data.blocks.len(), 1, "Second batch should have 1 block");
} else {
panic!("Expected Stored event");
}
}
/// Test that extending a batch does NOT change parent_hash
/// First event with parent_hash=None should keep it None even if subsequent events have Some(X)
#[tokio::test]
async fn test_batch_parent_hash_preserved_when_extending() {
let timeout_us = 100_000; // 100ms timeout
let (tx, rx) = mpsc::unbounded_channel::<KvCacheEvent>();
let publisher = MockPublisher::new();
let publisher_clone = publisher.clone();
let cancellation_token = CancellationToken::new();
let handle = tokio::spawn(async move {
run_event_processor_loop(publisher_clone, 1, cancellation_token, rx, None, timeout_us)
.await
});
// First event: parent_hash=None, block_hash=1
tx.send(KvCacheEvent {
event_id: 0,
data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None, // Root block with no parent
blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(1),
tokens_hash: LocalBlockHash(100),
mm_extra_info: None,
}],
}),
dp_rank: 0,
})
.unwrap();
tokio::task::yield_now().await;
// Second event: parent_hash=Some(1), block_hash=2 (sequential)
tx.send(KvCacheEvent {
event_id: 1,
data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: Some(ExternalSequenceBlockHash(1)), // Points to previous block
blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(2),
tokens_hash: LocalBlockHash(200),
mm_extra_info: None,
}],
}),
dp_rank: 0,
})
.unwrap();
tokio::task::yield_now().await;
// Third event: parent_hash=Some(2), block_hash=3 (sequential)
tx.send(KvCacheEvent {
event_id: 2,
data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: Some(ExternalSequenceBlockHash(2)),
blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(3),
tokens_hash: LocalBlockHash(300),
mm_extra_info: None,
}],
}),
dp_rank: 0,
})
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(2)).await;
drop(tx);
handle.await.unwrap();
let events = publisher.get_events();
assert_eq!(
events.len(),
1,
"All 3 sequential events should batch into 1"
);
// The batch should have parent_hash=None (preserved from first event)
if let KvCacheEventData::Stored(data) = &events[0].event.data {
assert_eq!(data.blocks.len(), 3, "Batch should have 3 blocks");
assert_eq!(
data.parent_hash, None,
"Batch parent_hash should remain None (from first event), NOT overwritten by subsequent events"
);
} else {
panic!("Expected Stored event");
}
}
}
...@@ -330,6 +330,7 @@ impl MockVllmEngine { ...@@ -330,6 +330,7 @@ impl MockVllmEngine {
source_config, source_config,
args.enable_local_indexer, args.enable_local_indexer,
dp_rank, dp_rank,
None,
) { ) {
Ok(publisher) => ( Ok(publisher) => (
Some(Arc::new(sink) as Arc<dyn KvCacheEventSink>), Some(Arc::new(sink) as Arc<dyn KvCacheEventSink>),
...@@ -358,6 +359,7 @@ impl MockVllmEngine { ...@@ -358,6 +359,7 @@ impl MockVllmEngine {
None, None,
args.enable_local_indexer, args.enable_local_indexer,
dp_rank, dp_rank,
None,
) { ) {
Ok(publisher) => ( Ok(publisher) => (
Some(Arc::new(KvEventSinkAdapter(publisher)) Some(Arc::new(KvEventSinkAdapter(publisher))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment