".github/actions/vscode:/vscode.git/clone" did not exist on "0bb3be00e7a40190a537f1cd9aafe65289172c08"
Unverified Commit fca0a801 authored by Michael Feil's avatar Michael Feil Committed by GitHub
Browse files

feat: follow up of optinal, batching-of-client-side-events (#6741)


Signed-off-by: default avatarmichaelfeil <63565275+michaelfeil@users.noreply.github.com>
Signed-off-by: default avatarMichael Feil <michaelfeil@users.noreply.github.com>
Signed-off-by: default avatarMichael Feil <63565275+michaelfeil@users.noreply.github.com>
parent e9f740ab
......@@ -124,12 +124,14 @@ impl KvEventPublisher {
/// so that routers can recover events directly from this worker.
/// zmq_endpoint: Optional ZMQ SUB endpoint to read raw engine events from.
/// zmq_topic: ZMQ topic filter (default "").
/// batching_timeout_us: Maximum time (in **microseconds**) to accumulate
/// batching_timeout_ms: Maximum time (in **milliseconds**) to accumulate
/// events into a single batch before flushing.
/// ``None`` uses the default window of 10000 µs (10 ms).
/// ``0`` disables batching: every event is published immediately.
/// ``None`` disables batching: every event is published immediately.
/// ``50`` to enable batching with a 50 ms window.
/// ``0`` is treated as ``None`` (also disables batching).
/// Maximum allowed is 15_000 (15 seconds); larger values are capped.
#[new]
#[pyo3(signature = (endpoint, worker_id=0, kv_block_size=0, dp_rank=0, enable_local_indexer=false, zmq_endpoint=None, zmq_topic=None, batching_timeout_us=None))]
#[pyo3(signature = (endpoint, worker_id=0, kv_block_size=0, dp_rank=0, enable_local_indexer=false, zmq_endpoint=None, zmq_topic=None, batching_timeout_ms=llm_rs::kv_router::publisher::DEFAULT_BATCHING_TIMEOUT_MS))]
#[allow(clippy::too_many_arguments)]
fn new(
endpoint: Endpoint,
......@@ -139,7 +141,7 @@ impl KvEventPublisher {
enable_local_indexer: bool,
zmq_endpoint: Option<String>,
zmq_topic: Option<String>,
batching_timeout_us: Option<u64>,
batching_timeout_ms: Option<u64>,
) -> PyResult<Self> {
let _ = worker_id;
......@@ -161,7 +163,7 @@ impl KvEventPublisher {
source_config,
enable_local_indexer,
dp_rank,
batching_timeout_us,
batching_timeout_ms,
)
.map_err(to_pyerr)?;
......
......@@ -12,6 +12,7 @@ use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use zeromq::{Socket, SocketRecv, SubSocket};
use dynamo_runtime::metrics::MetricsHierarchy;
use dynamo_runtime::traits::DistributedRuntimeProvider;
use dynamo_runtime::transports::event_plane::EventPublisher;
use dynamo_runtime::{
......@@ -52,7 +53,81 @@ const MAX_CONSECUTIVE_ERRORS: u32 = 10;
const MAX_BACKOFF_EXPONENT: u32 = 8; // Cap at 2^8 = 256x multiplier to prevent overflow
// Batching configuration
const BATCH_TIMEOUT_US: u64 = 10_000;
const MAX_BATCHING_TIMEOUT_MS: u64 = 15_000; // 15 seconds, prevents misconfiguration
pub const DEFAULT_BATCHING_TIMEOUT_MS: Option<u64> = None; // disabled by default
const DEFAULT_MAX_BATCH_BLOCKS: usize = 128; // Max blocks to batch before flushing
// ---------------------------------------------------------------------------
// Engines dropped events metric
// ---------------------------------------------------------------------------
use std::sync::OnceLock;
use dynamo_runtime::metrics::prometheus_names::kv_publisher;
/// Metrics for the KV publisher, created via the MetricsHierarchy API.
/// This provides automatic `dynamo_namespace`, `dynamo_component`, and other
/// hierarchy labels for free.
pub struct KvPublisherMetrics {
/// Total number of raw events dropped by engines before reaching publisher
pub engines_dropped_events_total: prometheus::IntCounterVec,
}
static KV_PUBLISHER_METRICS: OnceLock<Arc<KvPublisherMetrics>> = OnceLock::new();
impl KvPublisherMetrics {
/// Create from a Component, memoized in a static OnceLock.
/// Uses the MetricsHierarchy API which auto-prepends `dynamo_component_`,
/// injects hierarchy labels, and registers with the DRT `MetricsRegistry`.
pub fn from_component(component: &Component) -> Arc<Self> {
KV_PUBLISHER_METRICS
.get_or_init(|| {
let metrics = component.metrics();
match metrics.create_intcountervec(
kv_publisher::ENGINES_DROPPED_EVENTS_TOTAL,
"Total number of raw events dropped by engines before reaching publisher (detected via event_id gaps)",
&["worker_id"],
&[],
) {
Ok(engines_dropped_events_total) => {
Arc::new(Self { engines_dropped_events_total })
}
Err(e) => {
tracing::warn!("Failed to create kv_publisher metrics from component: {}. Using unregistered metrics as fallback.", e);
Arc::new(Self::new_unregistered())
}
}
})
.clone()
}
/// Creates unregistered metrics for use when the MetricsRegistry is not available.
/// This is used as a fallback when metric creation fails.
pub fn new_unregistered() -> Self {
Self {
engines_dropped_events_total: prometheus::IntCounterVec::new(
prometheus::Opts::new(
kv_publisher::ENGINES_DROPPED_EVENTS_TOTAL,
"Total number of raw events dropped by engines before reaching publisher (detected via event_id gaps)",
),
&["worker_id"],
)
.expect("failed to create engines_dropped_events_total counter"),
}
}
/// Increment the engines dropped events counter by the given amount.
pub fn increment_engines_dropped_events(&self, worker_id: u64, count: u64) {
self.engines_dropped_events_total
.with_label_values(&[&worker_id.to_string()])
.inc_by(count);
}
}
/// Get the KV publisher metrics if initialized.
fn kv_publisher_metrics() -> Option<Arc<KvPublisherMetrics>> {
KV_PUBLISHER_METRICS.get().cloned()
}
// -------------------------------------------------------------------------
// Batching State -----------------------------------------------------------
......@@ -73,9 +148,10 @@ struct BatchingState {
/// dp_rank of the events in the current pending batch.
/// A change signals that the batch must be flushed before accumulating further.
last_dp_rank: u32,
/// When the current batch started accumulating (set on the first event of each batch).
/// Used to compute the remaining window before the batch is force-flushed.
batch_start_time: Instant,
/// When we last flushed (or initialized). Used to detect stale pending data:
/// if a new event arrives after a long idle period (exceeding timeout),
/// we flush immediately for lower latency on sparse important events.
last_flush_time: Instant,
}
impl BatchingState {
......@@ -85,7 +161,7 @@ impl BatchingState {
pending_stored: None,
next_publish_id: 1,
last_dp_rank: 0,
batch_start_time: Instant::now(),
last_flush_time: Instant::now(),
}
}
......@@ -93,15 +169,28 @@ impl BatchingState {
self.pending_removed.is_some() || self.pending_stored.is_some()
}
/// Marks the start of a new batch, resetting the flush-window timer.
fn start_batch_timer(&mut self) {
self.batch_start_time = Instant::now();
fn pending_block_count(&self) -> usize {
self.pending_removed
.as_ref()
.map(|r| r.block_hashes.len())
.unwrap_or(0)
+ self
.pending_stored
.as_ref()
.map(|s| s.blocks.len())
.unwrap_or(0)
}
/// Records that a flush just happened. Called after every flush to track
/// idle periods for stale-data detection.
fn record_flush_time(&mut self) {
self.last_flush_time = Instant::now();
}
/// Returns the time remaining in the current batch window (zero if already elapsed).
fn remaining_timeout(&self, timeout_us: u64) -> Duration {
let timeout = Duration::from_micros(timeout_us);
let elapsed = self.batch_start_time.elapsed();
fn remaining_timeout(&self, timeout_ms: u64) -> Duration {
let timeout = Duration::from_millis(timeout_ms);
let elapsed = self.last_flush_time.elapsed();
if elapsed >= timeout {
Duration::ZERO
} else {
......@@ -109,9 +198,9 @@ impl BatchingState {
}
}
/// Returns `true` when the batch window has elapsed (or `timeout_us` is zero).
fn is_timeout_elapsed(&self, timeout_us: u64) -> bool {
self.remaining_timeout(timeout_us) == Duration::ZERO
/// Returns `true` when the batch window has elapsed (or `timeout_ms` is zero).
fn is_timeout_elapsed(&self, timeout_ms: u64) -> bool {
self.remaining_timeout(timeout_ms) == Duration::ZERO
}
}
......@@ -193,7 +282,14 @@ impl KvEventPublisher {
kv_block_size: u32,
source_config: Option<KvEventSourceConfig>,
) -> Result<Self> {
Self::new_with_local_indexer(component, kv_block_size, source_config, false, 0, None)
Self::new_with_local_indexer(
component,
kv_block_size,
source_config,
false,
0,
DEFAULT_BATCHING_TIMEOUT_MS,
)
}
pub fn new_with_local_indexer(
......@@ -202,16 +298,34 @@ impl KvEventPublisher {
source_config: Option<KvEventSourceConfig>,
enable_local_indexer: bool,
dp_rank: DpRank,
batching_timeout_us: Option<u64>,
batching_timeout_ms: Option<u64>,
) -> Result<Self> {
let cancellation_token = CancellationToken::new();
let batching_timeout_us = batching_timeout_us.unwrap_or(BATCH_TIMEOUT_US);
// None = disabled (flush every event); Some(0) normalised to None; Some(ms) = opt-in.
// Cap at MAX_BATCHING_TIMEOUT_MS to prevent misconfiguration.
let batching_timeout_ms = batching_timeout_ms
.filter(|&ms| {
if ms > MAX_BATCHING_TIMEOUT_MS {
tracing::warn!(
requested_ms = ms,
max_ms = MAX_BATCHING_TIMEOUT_MS,
"batching_timeout_ms too high, capping to 15s"
);
}
// if ms is 0, treat as disabled (None)
ms > 0
})
.map(|ms| ms.min(MAX_BATCHING_TIMEOUT_MS));
let (tx, rx) = mpsc::unbounded_channel::<KvCacheEvent>();
// Infer worker_id from component's connection
let worker_id = component.drt().connection_id();
// Initialize the KV publisher metrics via MetricsHierarchy API
// This provides automatic hierarchy labels (dynamo_namespace, dynamo_component, etc.)
KvPublisherMetrics::from_component(&component);
let component_name = component.name();
tracing::info!(
"Initializing KvEventPublisher for worker {worker_id} in component {component_name}"
......@@ -294,7 +408,7 @@ impl KvEventPublisher {
cancellation_token_clone,
rx,
local_indexer_clone,
batching_timeout_us,
batching_timeout_ms,
)
.await
});
......@@ -320,7 +434,7 @@ impl KvEventPublisher {
cancellation_token_clone,
rx,
local_indexer_clone,
batching_timeout_us,
batching_timeout_ms,
)
.await
});
......@@ -445,6 +559,8 @@ impl BatchingState {
}
// Consecutive batch IDs (1, 2, 3, …) keep downstream gap-detection happy.
self.next_publish_id += 1;
// Record when we flushed for stale-data detection on next event.
self.record_flush_time();
}
}
......@@ -453,7 +569,7 @@ impl BatchingState {
/// - Event type switches (Removed ↔ Stored)
/// - `dp_rank` changes between consecutive events
/// - A `Stored` event's `parent_hash` breaks the sequential chain
/// - The batch window expires (`timeout_us`, default 10 ms)
/// - The batch window expires (`Some(timeout_ms)`; `None` = disabled, flush every event)
/// - Channel is closed or a cancellation signal is received
async fn run_event_processor_loop<P: EventSink + Send + Sync + 'static>(
publisher: P,
......@@ -461,7 +577,8 @@ async fn run_event_processor_loop<P: EventSink + Send + Sync + 'static>(
cancellation_token: CancellationToken,
mut rx: mpsc::UnboundedReceiver<KvCacheEvent>,
local_indexer: Option<Arc<LocalKvIndexer>>,
timeout_us: u64,
timeout_ms: Option<u64>,
max_batch_blocks: usize,
) {
let mut batching_state = BatchingState::new();
// Track last raw input event_id for gap detection (dropped events before batching).
......@@ -470,8 +587,6 @@ async fn run_event_processor_loop<P: EventSink + Send + Sync + 'static>(
let mut last_raw_input_id: Option<u64> = None;
loop {
let remaining = batching_state.remaining_timeout(timeout_us);
tokio::select! {
_ = cancellation_token.cancelled() => {
tracing::info!("KV Event source received cancellation signal");
......@@ -489,14 +604,26 @@ async fn run_event_processor_loop<P: EventSink + Send + Sync + 'static>(
// (e.g. channel send error) before they reached the batching layer.
let raw_event_id = event.event_id;
if let Some(last_id) = last_raw_input_id
&& raw_event_id > last_id + 1 {
&& raw_event_id > last_id + 1
{
let gap = raw_event_id - last_id - 1;
tracing::warn!(
worker_id,
last_raw_input_id = last_id,
raw_event_id,
gap = raw_event_id - last_id - 1,
gap,
"Input event gap detected: raw events dropped before batching"
);
// Increment Prometheus counter for dropped events (if initialized)
if let Some(metrics) = kv_publisher_metrics() {
metrics.increment_engines_dropped_events(worker_id, gap);
} else {
tracing::warn!(
worker_id,
gap,
"Failed to record dropped events metric: metrics not initialized"
);
}
}
last_raw_input_id = Some(raw_event_id);
......@@ -514,7 +641,6 @@ async fn run_event_processor_loop<P: EventSink + Send + Sync + 'static>(
Some(pending) => pending.block_hashes.extend(data.block_hashes),
None => {
batching_state.pending_removed = Some(data);
batching_state.start_batch_timer();
}
}
}
......@@ -534,7 +660,6 @@ async fn run_event_processor_loop<P: EventSink + Send + Sync + 'static>(
Some(pending) => pending.blocks.extend(data.blocks),
None => {
batching_state.pending_stored = Some(data);
batching_state.start_batch_timer();
}
}
}
......@@ -552,13 +677,20 @@ async fn run_event_processor_loop<P: EventSink + Send + Sync + 'static>(
// Track dp_rank after the match so in-flight flushes use the old value.
batching_state.last_dp_rank = event.dp_rank;
// Flush immediately if the timeout already elapsed (handles timeout_us=0).
// The sleep arm below only arms for timeout_us>0; this check covers the rest.
if batching_state.has_pending() && batching_state.is_timeout_elapsed(timeout_us) {
// Flush after every event when disabled (None), or when the window has elapsed,
// or when the batch exceeds the max block count.
// The sleep arm only arms when batching is enabled; this covers the disabled path.
if batching_state.has_pending()
&& (timeout_ms.is_none_or(|ms| batching_state.is_timeout_elapsed(ms))
|| batching_state.pending_block_count() > max_batch_blocks)
{
batching_state.flush(&publisher, &local_indexer, worker_id).await;
}
}
_ = tokio::time::sleep(remaining), if timeout_us > 0 && batching_state.has_pending() => {
// if has some pending and has timeout, and no new events come in, then flush when timeout elapsed to prevent stale events
_ = tokio::time::sleep(
timeout_ms.map(|ms| batching_state.remaining_timeout(ms)).unwrap_or(Duration::from_secs(3600))
), if timeout_ms.is_some() && batching_state.has_pending() => {
batching_state.flush(&publisher, &local_indexer, worker_id).await;
}
}
......@@ -572,7 +704,7 @@ async fn start_event_processor<P: EventSink + Send + Sync + 'static>(
cancellation_token: CancellationToken,
rx: mpsc::UnboundedReceiver<KvCacheEvent>,
local_indexer: Option<Arc<LocalKvIndexer>>,
batching_timeout_us: u64,
batching_timeout_ms: Option<u64>,
) {
run_event_processor_loop(
publisher,
......@@ -580,7 +712,8 @@ async fn start_event_processor<P: EventSink + Send + Sync + 'static>(
cancellation_token,
rx,
local_indexer,
batching_timeout_us,
batching_timeout_ms,
DEFAULT_MAX_BATCH_BLOCKS,
)
.await
}
......@@ -592,7 +725,7 @@ async fn start_event_processor_jetstream<P: EventSink + Send + Sync + 'static>(
cancellation_token: CancellationToken,
rx: mpsc::UnboundedReceiver<KvCacheEvent>,
local_indexer: Option<Arc<LocalKvIndexer>>,
batching_timeout_us: u64,
batching_timeout_ms: Option<u64>,
) {
run_event_processor_loop(
publisher,
......@@ -600,7 +733,8 @@ async fn start_event_processor_jetstream<P: EventSink + Send + Sync + 'static>(
cancellation_token,
rx,
local_indexer,
batching_timeout_us,
batching_timeout_ms,
DEFAULT_MAX_BATCH_BLOCKS,
)
.await
}
......@@ -1317,7 +1451,7 @@ mod tests_startup_helpers {
token,
rx,
None,
BATCH_TIMEOUT_US,
Some(10_000),
));
tokio::time::timeout(tokio::time::Duration::from_secs(1), handle)
......@@ -1375,7 +1509,7 @@ mod tests_startup_helpers {
token.clone(),
rx,
Some(local_indexer.clone()), // arc::clone just increments atomic counters
BATCH_TIMEOUT_US,
Some(10_000),
));
// Wait for processing
......@@ -1459,7 +1593,7 @@ mod tests_startup_helpers {
token.clone(),
rx,
Some(local_indexer.clone()),
BATCH_TIMEOUT_US,
Some(10_000),
));
// Then remove same event
......@@ -1550,7 +1684,7 @@ mod tests_startup_helpers {
token.clone(),
rx,
Some(local_indexer.clone()),
BATCH_TIMEOUT_US,
Some(10_000),
));
tokio::time::timeout(tokio::time::Duration::from_secs(1), handle)
......@@ -1620,7 +1754,7 @@ mod tests_startup_helpers {
new_token,
rx,
Some(local_indexer),
BATCH_TIMEOUT_US,
Some(10_000),
));
tokio::time::timeout(tokio::time::Duration::from_secs(1), handle)
......@@ -1747,7 +1881,7 @@ mod tests_startup_helpers {
token.clone(),
worker_rx,
Some(local_indexer_1.clone()),
BATCH_TIMEOUT_US,
Some(10), // 10ms batching timeout
));
// === SETUP: Router Components ===
......@@ -2078,8 +2212,8 @@ mod batching_state_tests {
#[test]
fn test_batching_state_new() {
let state = BatchingState::new();
// batch_start_time should be set to approximately now
let elapsed = state.batch_start_time.elapsed();
// last_flush_time should be set to approximately now
let elapsed = state.last_flush_time.elapsed();
assert!(
elapsed < Duration::from_secs(1),
"new() should create state with flush time set to approximately now"
......@@ -2120,10 +2254,10 @@ mod batching_state_tests {
let mut state = BatchingState::new();
// Reset flush time to now so we can test timeout behavior
state.start_batch_timer();
state.record_flush_time();
// Test that remaining returns positive initially (using 10ms = 10_000us)
let remaining_before = state.remaining_timeout(10_000);
// Test that remaining returns positive initially (10ms timeout)
let remaining_before = state.remaining_timeout(10);
assert!(
remaining_before.as_millis() > 0,
"Should have remaining time initially"
......@@ -2139,16 +2273,16 @@ mod batching_state_tests {
}
#[test]
fn test_batching_state_start_batch_timer() {
fn test_batching_state_record_flush_time() {
let mut state = BatchingState::new();
let initial_time = state.batch_start_time;
let initial_time = state.last_flush_time;
state.start_batch_timer();
state.record_flush_time();
assert!(
state.batch_start_time >= initial_time,
"start_batch_timer should update the time"
state.last_flush_time >= initial_time,
"record_flush_time should update the time"
);
}
......@@ -2157,10 +2291,10 @@ mod batching_state_tests {
let mut state = BatchingState::new();
// Reset flush time to now so we can test timeout behavior
state.start_batch_timer();
state.record_flush_time();
// Test that remaining returns positive initially
let remaining = state.remaining_timeout(10_000); // 10ms
// Test that remaining returns positive initially (10ms timeout)
let remaining = state.remaining_timeout(10);
assert!(
remaining.as_millis() > 0,
"Should have remaining time initially"
......@@ -2265,33 +2399,41 @@ mod event_processor_tests {
/// Uses a 10ms timeout to ensure events are batched (events sent rapidly)
#[tokio::test]
async fn test_run_event_processor_loop_batches_removed_events_20() {
test_removed_events_batching(20, 10_000).await; // 20 events, 20ms timeout
test_removed_events_batching(20, Some(10)).await; // 20 events, 10ms timeout
}
#[tokio::test]
async fn test_run_event_processor_loop_batches_removed_events_10() {
test_removed_events_batching(10, 10_000).await; // 10 events, 10ms timeout
test_removed_events_batching(10, Some(10)).await; // 10 events, 10ms timeout
}
#[tokio::test]
async fn test_run_event_processor_loop_batches_removed_events_5() {
test_removed_events_batching(5, 10_000).await; // 5 events, 10ms timeout
test_removed_events_batching(5, Some(10)).await; // 5 events, 10ms timeout
}
#[tokio::test]
async fn test_run_event_processor_loop_batches_removed_events_3() {
test_removed_events_batching(3, 10_000).await; // 3 events, 10ms timeout
test_removed_events_batching(3, Some(10)).await; // 3 events, 10ms timeout
}
/// Helper function to test removed events batching with configurable count and timeout
async fn test_removed_events_batching(event_count: usize, timeout_us: u64) {
async fn test_removed_events_batching(event_count: usize, timeout_ms: Option<u64>) {
let (tx, rx) = mpsc::unbounded_channel::<KvCacheEvent>();
let publisher = MockPublisher::new();
let publisher_clone = publisher.clone();
let cancellation_token = CancellationToken::new();
let handle = tokio::spawn(async move {
run_event_processor_loop(publisher_clone, 1, cancellation_token, rx, None, timeout_us)
run_event_processor_loop(
publisher_clone,
1,
cancellation_token,
rx,
None,
timeout_ms,
DEFAULT_MAX_BATCH_BLOCKS,
)
.await
});
......@@ -2310,7 +2452,10 @@ mod event_processor_tests {
// Wait for timeout to elapse so all events flush together as one batch
// Add small buffer to ensure flush happens before channel close
tokio::time::sleep(tokio::time::Duration::from_micros(timeout_us + 1000)).await;
tokio::time::sleep(tokio::time::Duration::from_millis(
timeout_ms.unwrap_or(0) + 1,
))
.await;
drop(tx);
handle.await.unwrap();
......@@ -2326,9 +2471,7 @@ mod event_processor_tests {
// (first event may flush separately, rest should batch together)
assert!(
events.len() <= 2,
"With long timeout ({}us), all {} events should batch into at most 2 output events (got {})",
timeout_us,
event_count,
"With long timeout ({timeout_ms:?}), all {event_count} events should batch into at most 2 output events (got {})",
events.len()
);
......@@ -2353,33 +2496,41 @@ mod event_processor_tests {
/// Uses a longer timeout (100ms) to ensure events have time to batch
#[tokio::test]
async fn test_run_event_processor_loop_batches_stored_events_20() {
test_stored_events_batching(20, 100_000).await; // 20 events, 100ms timeout
test_stored_events_batching(20, Some(100)).await; // 20 events, 100ms timeout
}
#[tokio::test]
async fn test_run_event_processor_loop_batches_stored_events_10() {
test_stored_events_batching(10, 100_000).await; // 10 events, 100ms timeout
test_stored_events_batching(10, Some(100)).await; // 10 events, 100ms timeout
}
#[tokio::test]
async fn test_run_event_processor_loop_batches_stored_events_5() {
test_stored_events_batching(5, 100_000).await; // 5 events, 100ms timeout
test_stored_events_batching(5, Some(100)).await; // 5 events, 100ms timeout
}
#[tokio::test]
async fn test_run_event_processor_loop_batches_stored_events_3() {
test_stored_events_batching(3, 100_000).await; // 3 events, 100ms timeout
test_stored_events_batching(3, Some(100)).await; // 3 events, 100ms timeout
}
/// Helper function to test stored events batching with configurable count and timeout
async fn test_stored_events_batching(event_count: usize, timeout_us: u64) {
async fn test_stored_events_batching(event_count: usize, timeout_ms: Option<u64>) {
let (tx, rx) = mpsc::unbounded_channel::<KvCacheEvent>();
let publisher = MockPublisher::new();
let publisher_clone = publisher.clone();
let cancellation_token = CancellationToken::new();
let handle = tokio::spawn(async move {
run_event_processor_loop(publisher_clone, 1, cancellation_token, rx, None, timeout_us)
run_event_processor_loop(
publisher_clone,
1,
cancellation_token,
rx,
None,
timeout_ms,
DEFAULT_MAX_BATCH_BLOCKS,
)
.await
});
......@@ -2424,9 +2575,7 @@ mod event_processor_tests {
// With a long timeout, events should be batched. Either 1 or can be at most 2, if the first event flushes separately due to initial timestamp.
assert!(
events.len() <= 2,
"With long timeout ({}us) and sequential parent hashes, all {} events should batch into at most 2 output events (got {})",
timeout_us,
event_count,
"With long timeout ({timeout_ms:?}) and sequential parent hashes, all {event_count} events should batch into at most 2 output events (got {})",
events.len()
);
if events.len() == 2 {
......@@ -2463,7 +2612,7 @@ mod event_processor_tests {
/// Test non-sequential stored events trigger flush
#[tokio::test]
async fn test_run_event_processor_loop_non_sequential_flush() {
let timeout_us = 100_000; // 100ms in microseconds
let timeout_ms = Some(100); // 100ms timeout
let (tx, rx) = mpsc::unbounded_channel::<KvCacheEvent>();
let publisher = MockPublisher::new();
......@@ -2471,7 +2620,15 @@ mod event_processor_tests {
let cancellation_token = CancellationToken::new();
let handle = tokio::spawn(async move {
run_event_processor_loop(publisher_clone, 1, cancellation_token, rx, None, timeout_us)
run_event_processor_loop(
publisher_clone,
1,
cancellation_token,
rx,
None,
timeout_ms,
DEFAULT_MAX_BATCH_BLOCKS,
)
.await
// SLEEP HERE?! so that events are not batched!
});
......@@ -2525,28 +2682,36 @@ mod event_processor_tests {
/// All use 2ms delay between events, so each event times out before the next arrives
#[tokio::test]
async fn test_run_event_processor_loop_no_batching_with_slow_input_0ms() {
test_no_batching_with_slow_input(0).await; // 0ms timeout
test_no_batching_with_slow_input(None).await; // disabled (no timeout)
}
#[tokio::test]
async fn test_run_event_processor_loop_no_batching_with_slow_input_0_1ms() {
test_no_batching_with_slow_input(100).await; // 0.1ms timeout
test_no_batching_with_slow_input(Some(1)).await; // 1ms timeout (was 0.1ms in us)
}
#[tokio::test]
async fn test_run_event_processor_loop_no_batching_with_slow_input_0_2ms() {
test_no_batching_with_slow_input(200).await; // 0.2ms timeout
test_no_batching_with_slow_input(Some(2)).await; // 2ms timeout (was 0.2ms in us)
}
/// Helper function to test no batching with slow input
async fn test_no_batching_with_slow_input(timeout_us: u64) {
async fn test_no_batching_with_slow_input(timeout_ms: Option<u64>) {
let (tx, rx) = mpsc::unbounded_channel::<KvCacheEvent>();
let publisher = MockPublisher::new();
let publisher_clone = publisher.clone();
let cancellation_token = CancellationToken::new();
let handle = tokio::spawn(async move {
run_event_processor_loop(publisher_clone, 1, cancellation_token, rx, None, timeout_us)
run_event_processor_loop(
publisher_clone,
1,
cancellation_token,
rx,
None,
timeout_ms,
DEFAULT_MAX_BATCH_BLOCKS,
)
.await
});
......@@ -2580,8 +2745,7 @@ mod event_processor_tests {
// We expect at least 3 separate events (showing reduced batching)
assert!(
events.len() >= 3,
"With slow input (2ms delay) and timeout={}us, should have at least 3 separate events (got {})",
timeout_us,
"With slow input (2ms delay) and timeout={timeout_ms:?}, should have at least 3 separate events (got {})",
events.len()
);
......@@ -2604,7 +2768,7 @@ mod event_processor_tests {
/// Test that switching between Removed and Stored events causes immediate flush
#[tokio::test]
async fn test_event_type_switching_causes_flush() {
let timeout_us = 100_000; // 100ms timeout
let timeout_ms = Some(100); // 100ms timeout
let (tx, rx) = mpsc::unbounded_channel::<KvCacheEvent>();
let publisher = MockPublisher::new();
......@@ -2612,7 +2776,15 @@ mod event_processor_tests {
let cancellation_token = CancellationToken::new();
let handle = tokio::spawn(async move {
run_event_processor_loop(publisher_clone, 1, cancellation_token, rx, None, timeout_us)
run_event_processor_loop(
publisher_clone,
1,
cancellation_token,
rx,
None,
timeout_ms,
DEFAULT_MAX_BATCH_BLOCKS,
)
.await
});
......@@ -2663,7 +2835,7 @@ mod event_processor_tests {
/// Test that dp_rank change causes immediate flush
#[tokio::test]
async fn test_dp_rank_change_causes_flush() {
let timeout_us = 100_000; // 100ms timeout
let timeout_ms = Some(100); // 100ms timeout
let (tx, rx) = mpsc::unbounded_channel::<KvCacheEvent>();
let publisher = MockPublisher::new();
......@@ -2671,7 +2843,15 @@ mod event_processor_tests {
let cancellation_token = CancellationToken::new();
let handle = tokio::spawn(async move {
run_event_processor_loop(publisher_clone, 1, cancellation_token, rx, None, timeout_us)
run_event_processor_loop(
publisher_clone,
1,
cancellation_token,
rx,
None,
timeout_ms,
DEFAULT_MAX_BATCH_BLOCKS,
)
.await
});
......@@ -2747,7 +2927,7 @@ mod event_processor_tests {
/// This verifies that metadata is NOT overwritten before flush
#[tokio::test]
async fn test_flushed_events_have_correct_metadata() {
let timeout_us = 100_000; // 100ms timeout
let timeout_ms = Some(100); // 100ms timeout
let (tx, rx) = mpsc::unbounded_channel::<KvCacheEvent>();
let publisher = MockPublisher::new();
......@@ -2755,7 +2935,15 @@ mod event_processor_tests {
let cancellation_token = CancellationToken::new();
let handle = tokio::spawn(async move {
run_event_processor_loop(publisher_clone, 1, cancellation_token, rx, None, timeout_us)
run_event_processor_loop(
publisher_clone,
1,
cancellation_token,
rx,
None,
timeout_ms,
DEFAULT_MAX_BATCH_BLOCKS,
)
.await
});
......@@ -2820,10 +3008,12 @@ mod event_processor_tests {
);
}
/// Test that first event after idle period doesn't flush immediately.
/// Test that events after a long idle period flush immediately (stale timer).
/// This gives low latency for sparse important events after idle periods.
/// After the initial stale flush, subsequent rapid events batch normally.
#[tokio::test]
async fn test_first_event_after_idle_no_immediate_flush() {
let timeout_us = 50_000; // 50ms timeout
async fn test_first_event_after_idle_flushes_immediately_then_batches() {
let timeout_ms = Some(50); // 50ms timeout
let (tx, rx) = mpsc::unbounded_channel::<KvCacheEvent>();
let publisher = MockPublisher::new();
......@@ -2831,14 +3021,23 @@ mod event_processor_tests {
let cancellation_token = CancellationToken::new();
let handle = tokio::spawn(async move {
run_event_processor_loop(publisher_clone, 1, cancellation_token, rx, None, timeout_us)
run_event_processor_loop(
publisher_clone,
1,
cancellation_token,
rx,
None,
timeout_ms,
DEFAULT_MAX_BATCH_BLOCKS,
)
.await
});
// Wait longer than timeout to simulate idle period
// Wait longer than timeout to simulate idle period (timer becomes stale)
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
// Send 3 events rapidly - they should batch together
// Send 3 events rapidly - first should flush immediately (stale timer),
// remaining 2 should batch together
for i in 0..3 {
tx.send(KvCacheEvent {
event_id: i as u64,
......@@ -2851,7 +3050,7 @@ mod event_processor_tests {
tokio::task::yield_now().await;
}
// Wait for timeout to elapse so batch flushes
// Wait for timeout to elapse so remaining batch flushes
tokio::time::sleep(tokio::time::Duration::from_millis(60)).await;
drop(tx);
......@@ -2859,33 +3058,32 @@ mod event_processor_tests {
let events = publisher.get_events();
// All 3 events should be batched into 1 output event
// First event flushes immediately (stale timer), remaining 2 batch together
assert_eq!(
events.len(),
1,
"All 3 events should batch into 1 output event (not flush immediately due to stale timer)"
2,
"First event should flush immediately (stale), remaining 2 should batch"
);
let total_hashes: usize = events
.iter()
.map(|e| {
if let KvCacheEventData::Removed(data) = &e.event.data {
// First event has 1 hash, second event (batch) has 2 hashes
let first_len = if let KvCacheEventData::Removed(data) = &events[0].event.data {
data.block_hashes.len()
} else {
0
}
})
.sum();
assert_eq!(
total_hashes, 3,
"All 3 block hashes should be accounted for"
);
};
let second_len = if let KvCacheEventData::Removed(data) = &events[1].event.data {
data.block_hashes.len()
} else {
0
};
assert_eq!(first_len, 1, "First event should have 1 hash");
assert_eq!(second_len, 2, "Second event (batched) should have 2 hashes");
}
/// Test that stored events with dp_rank change have correct metadata
#[tokio::test]
async fn test_stored_events_with_dp_rank_change_correct_metadata() {
let timeout_us = 100_000; // 100ms timeout
let timeout_ms = Some(100); // 100ms timeout
let (tx, rx) = mpsc::unbounded_channel::<KvCacheEvent>();
let publisher = MockPublisher::new();
......@@ -2893,7 +3091,15 @@ mod event_processor_tests {
let cancellation_token = CancellationToken::new();
let handle = tokio::spawn(async move {
run_event_processor_loop(publisher_clone, 1, cancellation_token, rx, None, timeout_us)
run_event_processor_loop(
publisher_clone,
1,
cancellation_token,
rx,
None,
timeout_ms,
DEFAULT_MAX_BATCH_BLOCKS,
)
.await
});
......@@ -2994,7 +3200,7 @@ mod event_processor_tests {
/// First event with parent_hash=None should keep it None even if subsequent events have Some(X)
#[tokio::test]
async fn test_batch_parent_hash_preserved_when_extending() {
let timeout_us = 100_000; // 100ms timeout
let timeout_ms = Some(100); // 100ms timeout
let (tx, rx) = mpsc::unbounded_channel::<KvCacheEvent>();
let publisher = MockPublisher::new();
......@@ -3002,7 +3208,15 @@ mod event_processor_tests {
let cancellation_token = CancellationToken::new();
let handle = tokio::spawn(async move {
run_event_processor_loop(publisher_clone, 1, cancellation_token, rx, None, timeout_us)
run_event_processor_loop(
publisher_clone,
1,
cancellation_token,
rx,
None,
timeout_ms,
DEFAULT_MAX_BATCH_BLOCKS,
)
.await
});
......
......@@ -519,12 +519,18 @@ pub mod tokio_perf {
pub const ALIVE_TASKS: &str = "alive_tasks";
}
// KvRouter (including KvInexer) Prometheus metric names
// KvRouter (including KvIndexer) Prometheus metric names
pub mod kvrouter {
/// Number of KV cache events applied to the index (including status)
pub const KV_CACHE_EVENTS_APPLIED: &str = "kv_cache_events_applied";
}
/// KV Publisher metrics
pub mod kv_publisher {
/// Total number of raw events dropped by engines before reaching publisher (detected via event_id gaps)
pub const ENGINES_DROPPED_EVENTS_TOTAL: &str = "kv_publisher_engines_dropped_events_total";
}
/// Additional TRT-LLM worker metrics beyond what the engine natively provides.
///
/// These metrics are Python-only (registered via `prometheus_client`) and share the
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment