Unverified Commit ef3027bd authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

feat: support decentralized router with NATs core (#4921)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent b24ccd29
...@@ -42,7 +42,10 @@ The main KV-aware routing arguments: ...@@ -42,7 +42,10 @@ The main KV-aware routing arguments:
- `--router-prune-target-ratio`: Target size ratio to prune down to when `--router-max-tree-size` is exceeded. For example, with a value of 0.8 (default) and max tree size of 1048576, the router will prune down to approximately 838860 blocks when the threshold is exceeded. Defaults to 0.8 when `--no-kv-events` is used. This creates headroom before the next pruning cycle. - `--router-prune-target-ratio`: Target size ratio to prune down to when `--router-max-tree-size` is exceeded. For example, with a value of 0.8 (default) and max tree size of 1048576, the router will prune down to approximately 838860 blocks when the threshold is exceeded. Defaults to 0.8 when `--no-kv-events` is used. This creates headroom before the next pruning cycle.
>[!Note] >[!Note]
> State persistence is only available when KV events are enabled (default). When using `--no-kv-events`, state persistence is not currently supported. > **State persistence** depends on the event transport mode:
> - **JetStream mode** (default): State persists across router restarts via JetStream and NATS object store snapshots.
> - **NATS Core with Local Indexer mode** (`--enable-local-indexer` on workers): State persists on workers—router rebuilds state by querying workers on startup.
> - **No KV events** (`--no-kv-events`): State persistence is not supported.
> >
> When `--kv-overlap-score-weight` is set to 0 or `--no-kv-events` is set, no KvIndexer will be launched to drain and process KV events. It's recommended to disable your backend workers from relaying events through `KvEventPublisher` to avoid event accumulation in JetStream. WIP to enable disabling publishing of KV events completely in these cases. > When `--kv-overlap-score-weight` is set to 0 or `--no-kv-events` is set, no KvIndexer will be launched to drain and process KV events. It's recommended to disable your backend workers from relaying events through `KvEventPublisher` to avoid event accumulation in JetStream. WIP to enable disabling publishing of KV events completely in these cases.
> >
...@@ -110,9 +113,16 @@ await register_llm( ...@@ -110,9 +113,16 @@ await register_llm(
The KV-aware router operates on two key principles to optimize request routing: The KV-aware router operates on two key principles to optimize request routing:
### Global KV Cache State via JetStream ### Global KV Cache State Synchronization
First, KV events from engines are sent to a persistent NATS JetStream. Each KV router/indexer replica acts as a durable consumer, pulling messages from this shared stream to maintain a global view of cached blocks across all engines. This architecture ensures consistency across router replicas and persistence across restarts. KV events from engines are collected by the router to maintain a global view of cached blocks across all workers. The router supports two event transport modes:
#### Mode 1: JetStream (Default)
KV events are sent to a persistent NATS JetStream. Each KV router/indexer replica acts as a durable consumer, pulling messages from this shared stream. This architecture ensures consistency across router replicas and persistence across restarts.
- **Best for**: Production deployments requiring durability and multi-replica router consistency
- **Tradeoffs**: Requires JetStream setup; slightly higher latency due to persistence guarantees
```mermaid ```mermaid
graph TD graph TD
...@@ -152,6 +162,60 @@ graph TD ...@@ -152,6 +162,60 @@ graph TD
style R2 fill:#f3e5f5,color:#5a850f style R2 fill:#f3e5f5,color:#5a850f
``` ```
#### Mode 2: NATS Core with Local Indexer
When workers are started with `--enable-local-indexer`, each worker maintains its own local radix tree (local indexer) and publishes events over NATS Core (fire-and-forget pub/sub) instead of JetStream. Each worker assigns monotonically increasing event IDs to its events. The router detects gaps in event sequences and recovers missed events by querying the worker's local indexer directly.
- **Best for**: Lower-latency setups; simpler deployments without JetStream; single-router scenarios
- **Tradeoffs**: State persists on workers (not centralized); recovery depends on workers being available
- **Enable with**: `--enable-local-indexer` flag on workers (vLLM, mocker)
```mermaid
graph TD
subgraph Engines
E1[Engine 1<br/>LocalKvIndexer]
E2[Engine 2<br/>LocalKvIndexer]
E3[Engine 3<br/>LocalKvIndexer]
end
subgraph "NATS Core"
NC[KV Events Pub/Sub<br/>- Block created<br/>- Block removed]
end
subgraph "Router Replicas"
R1[Router 1<br/>KVIndexer]
R2[Router 2<br/>KVIndexer]
end
E1 -->|Publish Events| NC
E2 -->|Publish Events| NC
E3 -->|Publish Events| NC
NC -->|Subscribe| R1
NC -->|Subscribe| R2
style NC fill:#e1f5fe,color:#5a850f
style E1 fill:#fff3e0,color:#5a850f
style E2 fill:#fff3e0,color:#5a850f
style E3 fill:#fff3e0,color:#5a850f
style R1 fill:#f3e5f5,color:#5a850f
style R2 fill:#f3e5f5,color:#5a850f
```
**How gap detection works:**
1. Each worker assigns monotonically increasing event IDs starting from 0
2. The router tracks the last received event ID per worker
3. If an event arrives with `event_id > last_id + 1`, the router detects a gap
4. The router queries the worker's local indexer for the missing event range `[last_id+1, event_id-1]`
5. On worker discovery (Added event), the router dumps the worker's entire local indexer state
**Startup behavior:**
- When a worker is discovered, the router queries and ingests its full local indexer state
- When a worker is removed, the router removes all its blocks from the global radix tree
>[!Note]
> The router automatically selects the transport mode based on worker configuration. If all connected workers have `enable_local_indexer=true`, the router uses NATS Core mode. Otherwise, it uses JetStream mode.
### Local Active Block Management with Replica Sync ### Local Active Block Management with Replica Sync
Second, in addition to cached blocks, each router replica needs to track active blocks (blocks being used for ongoing generation) as load metrics. Since this information is highly time-sensitive, it should be predicted immediately when: Second, in addition to cached blocks, each router replica needs to track active blocks (blocks being used for ongoing generation) as load metrics. Since this information is highly time-sensitive, it should be predicted immediately when:
...@@ -249,19 +313,26 @@ Without this flag, each replica maintains its own isolated view of active blocks ...@@ -249,19 +313,26 @@ Without this flag, each replica maintains its own isolated view of active blocks
### Persistence and Recovery ### Persistence and Recovery
**Prefix blocks persist by default:** Persistence behavior depends on which event transport mode is active:
- Stored in NATS JetStream with 1-hour retention
**JetStream Mode (default):**
- Prefix blocks are stored in NATS JetStream with 1-hour retention
- Snapshots saved to NATS object store at configurable thresholds - Snapshots saved to NATS object store at configurable thresholds
- New replicas automatically restore this state on startup - New replicas automatically restore this state on startup
- You can launch a third Router replica even if the first two are down, and it will recover the full prefix state
You can a launch a third Router replica even if the first two Router replicas are down, and it will recover the full prefix state. (As mentioned above, the tracking of active blocks will not persist, but will become eventually consistent through request handling.)
```bash ```bash
python -m dynamo.frontend --router-mode kv --port 8002 --router-replica-sync python -m dynamo.frontend --router-mode kv --port 8002 --router-replica-sync
``` ```
**NATS Core with Local Indexer Mode:**
- State persists on workers—events are fire-and-forget but workers retain their local indexer state
- On startup, the router queries each worker's local indexer to rebuild state
- Recovery depends on workers being available; if a worker is down, its blocks cannot be recovered
- Simpler infrastructure (no JetStream required) but less resilient
>[!Note] >[!Note]
> If you need to start with a fresh state, you have two options: > If you need to start with a fresh state in JetStream mode, you have two options:
> 1. **Recommended**: Use a different namespace/component (see [Distributed Runtime](/docs/design_docs/distributed_runtime.md)) which will start a new stream and NATS object store path > 1. **Recommended**: Use a different namespace/component (see [Distributed Runtime](/docs/design_docs/distributed_runtime.md)) which will start a new stream and NATS object store path
> 2. **Use with caution**: Launch a router with the `--router-reset-states` flag, which will purge the entire stream and radix snapshot. This should only be done when launching the first router replica in a component, as it can bring existing router replicas into an inconsistent state. > 2. **Use with caution**: Launch a router with the `--router-reset-states` flag, which will purge the entire stream and radix snapshot. This should only be done when launching the first router replica in a component, as it can bring existing router replicas into an inconsistent state.
...@@ -375,13 +446,6 @@ In distributed deployments with multiple routers, each router maintains visibili ...@@ -375,13 +446,6 @@ In distributed deployments with multiple routers, each router maintains visibili
Each event carries a unique router ID to prevent self-event processing. This asynchronous communication system ensures optimal routing decisions by maintaining consistent KV cache state across all routers, even as they handle different request streams. Each event carries a unique router ID to prevent self-event processing. This asynchronous communication system ensures optimal routing decisions by maintaining consistent KV cache state across all routers, even as they handle different request streams.
### Event Persistence and Recovery
KV cache events are persisted in NATS JetStream, allowing router replicas to maintain their global view of KV blocks across restarts. By default, routers persist their state - they download any available snapshot from NATS object store and continue consuming events from their last acknowledged position in the stream. This default behavior ensures KV cache awareness is maintained across router restarts without any additional configuration.
To manage stream growth, when the message count exceeds `--router-snapshot-threshold`, a router acquires an etcd-based distributed lock, purges acknowledged messages from the stream, and uploads the current radix tree state to NATS object store. This snapshot serves as a checkpoint for faster initialization of future router instances.
## Using KvPushRouter Python API ## Using KvPushRouter Python API
Instead of launching the KV Router via command line, you can create a `KvPushRouter` object directly in Python. This allows per-request routing configuration overrides. Instead of launching the KV Router via command line, you can create a `KvPushRouter` object directly in Python. This allows per-request routing configuration overrides.
......
...@@ -53,7 +53,7 @@ use crate::{ ...@@ -53,7 +53,7 @@ use crate::{
}, },
scheduler::{KvScheduler, KvSchedulerError, PotentialLoad, SchedulingRequest}, scheduler::{KvScheduler, KvSchedulerError, PotentialLoad, SchedulingRequest},
sequence::SequenceError, sequence::SequenceError,
subscriber::{recover_from_all_workers, start_kv_router_background}, subscriber::{start_kv_router_background, start_kv_router_background_nats_core},
}, },
local_model::runtime_config::ModelRuntimeConfig, local_model::runtime_config::ModelRuntimeConfig,
model_card::ModelDeploymentCard, model_card::ModelDeploymentCard,
...@@ -69,7 +69,7 @@ use crate::{ ...@@ -69,7 +69,7 @@ use crate::{
pub const KV_METRICS_ENDPOINT: &str = "load_metrics"; pub const KV_METRICS_ENDPOINT: &str = "load_metrics";
// for metric publishing (push-based) // for metric publishing (push-based)
pub const KV_EVENT_SUBJECT: &str = "kv_events"; pub const KV_EVENT_SUBJECT: &str = "kv-events";
pub const KV_HIT_RATE_SUBJECT: &str = "kv-hit-rate"; pub const KV_HIT_RATE_SUBJECT: &str = "kv-hit-rate";
pub const KV_METRICS_SUBJECT: &str = "kv_metrics"; pub const KV_METRICS_SUBJECT: &str = "kv_metrics";
...@@ -357,66 +357,90 @@ impl KvRouter { ...@@ -357,66 +357,90 @@ impl KvRouter {
tracing::info!("Worker query client initialized"); tracing::info!("Worker query client initialized");
// Start KV event subscriber background process (only when use_kv_events is enabled) // Start KV event subscriber background process (only when use_kv_events is enabled)
// This is spawned as a background task to avoid blocking router startup.
// The task waits for runtime_configs to determine whether to use NATS Core or JetStream.
if kv_router_config.use_kv_events if kv_router_config.use_kv_events
&& let Indexer::KvIndexer(ref kv_indexer) = indexer && let Indexer::KvIndexer(ref kv_indexer) = indexer
{ {
start_kv_router_background( // Clone everything needed for the background task
component.clone(), let component_clone = component.clone();
consumer_id, let kv_indexer_clone = kv_indexer.clone();
kv_indexer.event_sender(), let cancellation_token_clone = cancellation_token.clone();
kv_indexer.remove_worker_sender(), let mut runtime_configs_rx_clone = runtime_configs_rx.clone();
kv_router_config let worker_query_client_clone =
.router_snapshot_threshold worker_query::WorkerQueryClient::new(component.clone(), runtime_configs_rx.clone());
.map(|_| kv_indexer.get_workers_sender()),
kv_router_config tokio::spawn(async move {
.router_snapshot_threshold // Wait for runtime_configs to have at least one entry
.map(|_| kv_indexer.snapshot_event_sender()), let (all_local_indexer, count) = loop {
cancellation_token.clone(), {
kv_router_config.router_snapshot_threshold, let configs = runtime_configs_rx_clone.borrow();
kv_router_config.router_reset_states, if !configs.is_empty() {
) let all_local_indexer =
.await?; configs.values().all(|c| c.enable_local_indexer);
break (all_local_indexer, configs.len());
// Perform startup recovery from workers with local indexers }
// This catches up on any events missed while the router was offline }
let last_event_ids = kv_indexer
.get_last_received_event_ids()
.await
.unwrap_or_default();
let instances = client.instance_source.as_ref().borrow().clone();
let worker_ids: Vec<WorkerId> = instances.iter().map(|i| i.instance_id).collect();
if !worker_ids.is_empty() {
tracing::info!(
worker_count = worker_ids.len(),
"Starting recovery from workers with local indexers"
);
// NOTE: recover_from_all_workers() is a no-op if // Wait for changes to runtime_configs
// Worker with worker_id is not associated with a tokio::select! {
// local indexer instance. _ = cancellation_token_clone.cancelled() => {
let recovered = recover_from_all_workers( tracing::debug!("Subscriber selection task cancelled");
&worker_query_client, return;
&last_event_ids, }
&worker_ids, result = runtime_configs_rx_clone.changed() => {
&kv_indexer.event_sender(), if result.is_err() {
) tracing::debug!("Runtime configs channel closed");
.await; return;
}
}
}
};
if recovered > 0 { if all_local_indexer {
// All workers have local_indexer enabled - use NATS Core
tracing::info!( tracing::info!(
recovered_events = recovered, "All {count} workers have local_indexer enabled, using NATS Core subscription"
"KV Router startup: Recovered {} KV events from workers {:?}",
recovered,
worker_ids
); );
if let Err(e) = start_kv_router_background_nats_core(
component_clone.clone(),
kv_indexer_clone.event_sender(),
kv_indexer_clone.remove_worker_sender(),
cancellation_token_clone.clone(),
worker_query_client_clone,
)
.await
{
tracing::error!("Failed to start NATS Core subscriber: {e}");
}
} else { } else {
// Not all workers have local_indexer - use JetStream
tracing::info!( tracing::info!(
"KV Router startup: No KV events recovered from workers {:?}", "Not all workers have local_indexer enabled, using JetStream subscription"
worker_ids
); );
if let Err(e) = start_kv_router_background(
component_clone.clone(),
consumer_id,
kv_indexer_clone.event_sender(),
kv_indexer_clone.remove_worker_sender(),
kv_router_config
.router_snapshot_threshold
.map(|_| kv_indexer_clone.get_workers_sender()),
kv_router_config
.router_snapshot_threshold
.map(|_| kv_indexer_clone.snapshot_event_sender()),
cancellation_token_clone.clone(),
kv_router_config.router_snapshot_threshold,
kv_router_config.router_reset_states,
)
.await
{
tracing::error!("Failed to start JetStream subscriber: {e}");
}
} }
} });
} }
tracing::info!("KV Routing initialized"); tracing::info!("KV Routing initialized");
......
...@@ -178,9 +178,9 @@ pub fn compute_seq_hash_for_block(block_hashes: &[LocalBlockHash]) -> Vec<Sequen ...@@ -178,9 +178,9 @@ pub fn compute_seq_hash_for_block(block_hashes: &[LocalBlockHash]) -> Vec<Sequen
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RouterEvent { pub struct RouterEvent {
/// The ID of the worker emitting the event. /// The ID of the worker emitting the event.
worker_id: WorkerId, pub worker_id: WorkerId,
/// The cache event associated with the worker. /// The cache event associated with the worker.
event: KvCacheEvent, pub event: KvCacheEvent,
} }
impl RouterEvent { impl RouterEvent {
...@@ -209,19 +209,27 @@ pub struct WorkerKvQueryRequest { ...@@ -209,19 +209,27 @@ pub struct WorkerKvQueryRequest {
/// The worker ID of the worker to query. /// The worker ID of the worker to query.
pub worker_id: WorkerId, pub worker_id: WorkerId,
/// The query can specify the [start, end) range of event id's to return. /// Start event ID (inclusive). If `None`, dumps entire tree.
/// If neither is specified, the worker dumps all events.
/// If only one is specified, `start` is assumed to be the oldest logged event,
/// and `end` is assumed to be the newest logged event.
pub start_event_id: Option<u64>, pub start_event_id: Option<u64>,
/// End event ID (inclusive). If `None`, returns up to newest available.
pub end_event_id: Option<u64>, pub end_event_id: Option<u64>,
} }
/// Response from a worker's local KV indexer. /// Response from a worker's local KV indexer.
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub struct WorkerKvQueryResponse { pub enum WorkerKvQueryResponse {
/// The events from the worker local KvIndexer. /// Events served from the circular buffer (with original event IDs)
pub events: Vec<RouterEvent>, Events(Vec<RouterEvent>),
/// Full tree dump (with synthetic 0-indexed event IDs)
TreeDump(Vec<RouterEvent>),
/// Requested range is newer than available data
TooNew {
requested_start: Option<u64>,
requested_end: Option<u64>,
newest_available: u64,
},
/// Invalid range: end_id < start_id
InvalidRange { start_id: u64, end_id: u64 },
} }
/// A block in the Radix Tree. /// A block in the Radix Tree.
...@@ -902,6 +910,7 @@ struct RoutingDecisionRequest { ...@@ -902,6 +910,7 @@ struct RoutingDecisionRequest {
} }
/// The KV Indexer, managing the KV store and handling events and match requests. /// The KV Indexer, managing the KV store and handling events and match requests.
#[derive(Clone)]
pub struct KvIndexer { pub struct KvIndexer {
/// A `CancellationToken` for managing shutdown. /// A `CancellationToken` for managing shutdown.
cancel: CancellationToken, cancel: CancellationToken,
...@@ -919,10 +928,11 @@ pub struct KvIndexer { ...@@ -919,10 +928,11 @@ pub struct KvIndexer {
routing_tx: mpsc::Sender<RoutingDecisionRequest>, routing_tx: mpsc::Sender<RoutingDecisionRequest>,
/// A sender for getting last received event IDs (for fault tolerance recovery). /// A sender for getting last received event IDs (for fault tolerance recovery).
last_event_ids_tx: mpsc::Sender<GetLastReceivedEventIdsRequest>, last_event_ids_tx: mpsc::Sender<GetLastReceivedEventIdsRequest>,
/// A handle to the background task managing the KV store.
task: OnceLock<std::thread::JoinHandle<()>>,
/// The size of the KV block this indexer can handle. /// The size of the KV block this indexer can handle.
kv_block_size: u32, kv_block_size: u32,
/// Reference counter for Clone-aware Drop.
/// Only the last clone should cancel the token on drop.
_ref_count: Arc<()>,
} }
impl KvIndexer { impl KvIndexer {
...@@ -957,7 +967,7 @@ impl KvIndexer { ...@@ -957,7 +967,7 @@ impl KvIndexer {
let cancel_clone = token.clone(); let cancel_clone = token.clone();
let task = std::thread::spawn(move || { std::thread::spawn(move || {
// Create a single-threaded tokio runtime // Create a single-threaded tokio runtime
let runtime = tokio::runtime::Builder::new_current_thread() let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
...@@ -1182,9 +1192,6 @@ impl KvIndexer { ...@@ -1182,9 +1192,6 @@ impl KvIndexer {
tracing::debug!("KvCacheIndexer task completed"); tracing::debug!("KvCacheIndexer task completed");
}); });
let once = OnceLock::new();
once.set(task).unwrap();
Self { Self {
cancel: token, cancel: token,
event_tx, event_tx,
...@@ -1194,8 +1201,8 @@ impl KvIndexer { ...@@ -1194,8 +1201,8 @@ impl KvIndexer {
dump_tx, dump_tx,
routing_tx, routing_tx,
last_event_ids_tx, last_event_ids_tx,
task: once,
kv_block_size, kv_block_size,
_ref_count: Arc::new(()),
} }
} }
...@@ -1340,9 +1347,6 @@ impl KvIndexerInterface for KvIndexer { ...@@ -1340,9 +1347,6 @@ impl KvIndexerInterface for KvIndexer {
fn shutdown(&mut self) { fn shutdown(&mut self) {
self.cancel.cancel(); self.cancel.cancel();
if let Some(task) = self.task.take() {
task.join().expect("Failed to join kv indexer task");
}
} }
async fn dump_events(&self) -> Result<Vec<RouterEvent>, KvRouterError> { async fn dump_events(&self) -> Result<Vec<RouterEvent>, KvRouterError> {
...@@ -1396,7 +1400,11 @@ impl KvIndexerInterface for KvIndexer { ...@@ -1396,7 +1400,11 @@ impl KvIndexerInterface for KvIndexer {
impl Drop for KvIndexer { impl Drop for KvIndexer {
fn drop(&mut self) { fn drop(&mut self) {
self.shutdown(); // Only cancel the token if we're the last reference.
// This allows clones to be dropped without killing the background task.
if Arc::strong_count(&self._ref_count) == 1 {
self.shutdown();
}
} }
} }
...@@ -1439,167 +1447,112 @@ impl LocalKvIndexer { ...@@ -1439,167 +1447,112 @@ impl LocalKvIndexer {
/// Query events by ID range, returning events in `[start_id, end_id]` (both inclusive). /// Query events by ID range, returning events in `[start_id, end_id]` (both inclusive).
/// ///
/// This method attempts to serve the request from the in-memory event buffer when possible.
/// If the requested range extends beyond what's available in the buffer, a full tree dump
/// is performed instead.
///
/// ### Arguments /// ### Arguments
/// ///
/// * `start_id` - Starting event ID (inclusive). If `None`, returns from oldest available. /// * `start_id` - Starting event ID (inclusive). If `None`, dumps entire tree.
/// * `end_id` - Ending event ID (inclusive). If `None`, returns up to newest available. /// * `end_id` - Ending event ID (inclusive). If `None`, returns up to newest available.
/// ///
/// ### Behavior
///
/// - **Buffer path**: If `start_id >= first_buffered_id`, events are retrieved directly
/// from the buffer with their original event IDs.
///
/// - **Tree dump path**: If the range extends before the buffer or no range is specified,
/// a full tree dump is performed. **Note**: Tree dumps generate synthetic 0-indexed
/// event IDs that do NOT correspond to the original event IDs. The entire tree state
/// is returned regardless of the requested range.
///
/// ### Returns /// ### Returns
/// ///
/// A vector of `RouterEvent`s. When served from buffer, events have their original IDs. /// - `Events`: Buffered events with original IDs (when range is within buffer)
/// When served from tree dump, events have synthetic sequential IDs starting from 0. /// - `TreeDump`: Full tree dump with synthetic IDs (when range is too old or unspecified)
/// - `TooNew`: Error when requested range is newer than available data
/// - `InvalidRange`: Error when end_id < start_id
pub async fn get_events_in_id_range( pub async fn get_events_in_id_range(
&self, &self,
start_id: Option<u64>, start_id: Option<u64>,
end_id: Option<u64>, end_id: Option<u64>,
) -> Vec<RouterEvent> { ) -> WorkerKvQueryResponse {
// Validate range if both specified // Validate range if both specified
if let (Some(s), Some(e)) = (start_id, end_id) if let (Some(s), Some(e)) = (start_id, end_id)
&& s > e && e < s
{ {
tracing::warn!( tracing::warn!(start_id = s, end_id = e, "Invalid range: end_id < start_id");
start_id = s, return WorkerKvQueryResponse::InvalidRange {
end_id = e, start_id: s,
"Requested start_id > end_id; returning empty result." end_id: e,
); };
return Vec::new();
} }
// Check if we can serve from buffer // Get buffer state
let buffer_range = { let (first_id, last_id) = {
let buffer = self.event_buffer.lock().unwrap(); let buffer = self.event_buffer.lock().unwrap();
if buffer.is_empty() { if buffer.is_empty() {
None (None, None)
} else { } else {
Some(( (
buffer.front().unwrap().event.event_id, Some(buffer.front().unwrap().event.event_id),
buffer.back().unwrap().event.event_id, Some(buffer.back().unwrap().event.event_id),
)) )
} }
}; };
// Determine if request can be served from buffer // If no start_id specified, dump entire tree
let can_use_buffer = match (start_id, buffer_range) { if start_id.is_none() {
// No start specified means we need everything from the beginning -> tree dump tracing::debug!("No start_id specified, dumping entire tree");
(None, _) => false, let events = self.dump_events().await.unwrap_or_default();
// Buffer is empty -> tree dump return WorkerKvQueryResponse::TreeDump(events);
(_, None) => false,
// start_id is within or after buffer range -> can use buffer
(Some(s), Some((first_buffered, _))) => s >= first_buffered,
};
if can_use_buffer {
// Serve from buffer - these have real event IDs
self.get_buffer_events_in_id_range(start_id, end_id)
} else {
// Must dump entire tree
if let (Some(s), Some(e)) = (start_id, end_id) {
tracing::warn!(
requested_start_id = s,
requested_end_id = e,
buffer_range = ?buffer_range,
"Requested event ID range extends before buffer; dumping entire tree. \
Note: Tree dump returns synthetic 0-indexed event IDs, not original IDs."
);
} else if start_id.is_some() || end_id.is_some() {
tracing::warn!(
requested_start_id = ?start_id,
requested_end_id = ?end_id,
buffer_range = ?buffer_range,
"Partial range specified but cannot serve from buffer; dumping entire tree. \
Note: Tree dump returns synthetic 0-indexed event IDs, not original IDs."
);
}
// Return full tree dump - no filtering since IDs are synthetic
self.dump_events().await.unwrap_or_default()
}
}
/// Get events from the buffer in the range `[start_id, end_id]` (both inclusive).
pub fn get_buffer_events_in_id_range(
&self,
start_id: Option<u64>,
end_id: Option<u64>,
) -> Vec<RouterEvent> {
let buffer = self.event_buffer.lock().unwrap();
if buffer.is_empty() {
tracing::warn!("No events in buffer yet; returning empty result.");
return Vec::new();
} }
let first_id = buffer.front().map(|e| e.event.event_id).unwrap(); let start_id = start_id.unwrap();
let last_id = buffer.back().map(|e| e.event.event_id).unwrap(); let end_id = end_id.unwrap_or_else(|| last_id.unwrap_or(start_id));
let start_id = start_id.unwrap_or(first_id); // Check for empty buffer
let end_id = end_id.unwrap_or(last_id); let Some(first_buffered) = first_id else {
tracing::debug!("Buffer empty, dumping entire tree");
let events = self.dump_events().await.unwrap_or_default();
return WorkerKvQueryResponse::TreeDump(events);
};
let last_buffered = last_id.unwrap();
if start_id > end_id { // Check if request is too new
if start_id > last_buffered {
tracing::warn!( tracing::warn!(
start_id, start_id,
end_id, last_buffered,
"Requested start_id > end_id; returning empty result." "Requested start_id is newer than buffer"
); );
return Vec::new(); return WorkerKvQueryResponse::TooNew {
requested_start: Some(start_id),
requested_end: Some(end_id),
newest_available: last_buffered,
};
}
// Check if start_id is too old (before buffer) -> tree dump
if start_id < first_buffered {
tracing::info!(
start_id,
first_buffered,
"Requested start_id is older than buffer, dumping entire tree"
);
let events = self.dump_events().await.unwrap_or_default();
return WorkerKvQueryResponse::TreeDump(events);
} }
// Serve from buffer
let buffer = self.event_buffer.lock().unwrap();
let start_idx = match buffer.binary_search_by_key(&start_id, |e| e.event.event_id) { let start_idx = match buffer.binary_search_by_key(&start_id, |e| e.event.event_id) {
Ok(idx) => idx, Ok(idx) => idx,
Err(_) if start_id < first_id => {
tracing::warn!(
start_id,
first_id,
"Requested start_id precedes buffer; clamping to oldest."
);
0
}
Err(_) if start_id > last_id => {
tracing::error!(
start_id,
last_id,
"Requested start_id is newer than buffer; returning empty."
);
return Vec::new();
}
Err(insertion_point) => insertion_point, Err(insertion_point) => insertion_point,
}; };
// For inclusive end, we need idx + 1 when we find an exact match // Clamp end_id to buffer bounds
let end_idx = match buffer.binary_search_by_key(&end_id, |e| e.event.event_id) { let clamped_end_id = end_id.min(last_buffered);
let end_idx = match buffer.binary_search_by_key(&clamped_end_id, |e| e.event.event_id) {
Ok(idx) => idx + 1, // Include the matched element Ok(idx) => idx + 1, // Include the matched element
Err(_) if end_id < first_id => {
return Vec::new();
}
Err(_) if end_id > last_id => {
tracing::warn!(
end_id,
last_id,
"Requested end_id exceeds buffer; clamping to newest."
);
buffer.len()
}
Err(insertion_point) => insertion_point, Err(insertion_point) => insertion_point,
}; };
buffer let events: Vec<RouterEvent> = buffer
.iter() .iter()
.skip(start_idx) .skip(start_idx)
.take(end_idx.saturating_sub(start_idx)) .take(end_idx.saturating_sub(start_idx))
.cloned() .cloned()
.collect() .collect();
WorkerKvQueryResponse::Events(events)
} }
/// Record an event in the buffer /// Record an event in the buffer
...@@ -1714,46 +1667,44 @@ mod local_kv_indexer_tests { ...@@ -1714,46 +1667,44 @@ mod local_kv_indexer_tests {
indexer indexer
} }
#[test] #[tokio::test]
fn returns_slice_within_range() { async fn returns_slice_within_range() {
let indexer = make_indexer_with_events(&[1, 2, 3, 4, 5]); let indexer = make_indexer_with_events(&[1, 2, 3, 4, 5]);
// Test get_buffer_events_in_id_range (buffer-only queries) // Helper to extract events from response
let extract_events = |resp: WorkerKvQueryResponse| -> Vec<RouterEvent> {
match resp {
WorkerKvQueryResponse::Events(e) => e,
WorkerKvQueryResponse::TreeDump(e) => e,
_ => panic!("Unexpected response type"),
}
};
let get_ids = |events: Vec<RouterEvent>| -> Vec<u64> {
events.iter().map(|e| e.event.event_id).collect()
};
// Test get_events_in_id_range (buffer queries)
// Range is [start, end] inclusive // Range is [start, end] inclusive
let mut result = indexer.get_buffer_events_in_id_range(Some(2), Some(4)); let result = indexer.get_events_in_id_range(Some(2), Some(4)).await;
let mut ids: Vec<u64> = result let ids = get_ids(extract_events(result));
.iter()
.map(|router_event| router_event.event.event_id)
.collect();
assert_eq!(ids, vec![2, 3, 4]); // inclusive range [2, 4] assert_eq!(ids, vec![2, 3, 4]); // inclusive range [2, 4]
result = indexer.get_buffer_events_in_id_range(Some(2), Some(6)); let result = indexer.get_events_in_id_range(Some(2), Some(6)).await;
ids = result let ids = get_ids(extract_events(result));
.iter()
.map(|router_event| router_event.event.event_id)
.collect();
assert_eq!(ids, vec![2, 3, 4, 5]); // clamp end to buffer max assert_eq!(ids, vec![2, 3, 4, 5]); // clamp end to buffer max
result = indexer.get_buffer_events_in_id_range(Some(0), Some(4)); // start_id=0 is before buffer (first is 1), so should trigger tree dump
ids = result let result = indexer.get_events_in_id_range(Some(0), Some(4)).await;
.iter() assert!(matches!(result, WorkerKvQueryResponse::TreeDump(_)));
.map(|router_event| router_event.event.event_id)
.collect();
assert_eq!(ids, vec![1, 2, 3, 4]); // clamp start to buffer min, inclusive end
result = indexer.get_buffer_events_in_id_range(Some(3), Some(3)); let result = indexer.get_events_in_id_range(Some(3), Some(3)).await;
ids = result let ids = get_ids(extract_events(result));
.iter()
.map(|router_event| router_event.event.event_id)
.collect();
assert_eq!(ids, vec![3]); // single element when start == end assert_eq!(ids, vec![3]); // single element when start == end
result = indexer.get_buffer_events_in_id_range(Some(5), Some(2)); // Invalid range: end < start
ids = result let result = indexer.get_events_in_id_range(Some(5), Some(2)).await;
.iter() assert!(matches!(result, WorkerKvQueryResponse::InvalidRange { .. }));
.map(|router_event| router_event.event.event_id)
.collect();
assert!(ids.is_empty()); // return empty when start > end
} }
#[tokio::test] #[tokio::test]
...@@ -1800,6 +1751,15 @@ mod local_kv_indexer_tests { ...@@ -1800,6 +1751,15 @@ mod local_kv_indexer_tests {
// Wait for events to be processed by the tree // Wait for events to be processed by the tree
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
// Helper to extract events from response
let extract_events = |resp: WorkerKvQueryResponse| -> Vec<RouterEvent> {
match resp {
WorkerKvQueryResponse::Events(e) => e,
WorkerKvQueryResponse::TreeDump(e) => e,
_ => panic!("Unexpected response type: {:?}", resp),
}
};
// Helper to extract event IDs from result // Helper to extract event IDs from result
let get_ids = |events: Vec<RouterEvent>| -> Vec<u64> { let get_ids = |events: Vec<RouterEvent>| -> Vec<u64> {
events.iter().map(|e| e.event.event_id).collect() events.iter().map(|e| e.event.event_id).collect()
...@@ -1818,31 +1778,35 @@ mod local_kv_indexer_tests { ...@@ -1818,31 +1778,35 @@ mod local_kv_indexer_tests {
// Test: start_id within buffer, no end // Test: start_id within buffer, no end
let result = indexer.get_events_in_id_range(Some(11), None).await; let result = indexer.get_events_in_id_range(Some(11), None).await;
assert!(matches!(result, WorkerKvQueryResponse::Events(_)));
assert_eq!( assert_eq!(
get_ids(result), get_ids(extract_events(result)),
vec![11, 12, 13, 14], vec![11, 12, 13, 14],
"start_id=11 (in buffer) should return [11, 14]" "start_id=11 (in buffer) should return [11, 14]"
); );
// Test: start_id at buffer boundary // Test: start_id at buffer boundary
let result = indexer.get_events_in_id_range(Some(10), None).await; let result = indexer.get_events_in_id_range(Some(10), None).await;
assert!(matches!(result, WorkerKvQueryResponse::Events(_)));
assert_eq!( assert_eq!(
get_ids(result), get_ids(extract_events(result)),
vec![10, 11, 12, 13, 14], vec![10, 11, 12, 13, 14],
"start_id=10 (buffer start) should return [10, 14]" "start_id=10 (buffer start) should return [10, 14]"
); );
// Test: both start and end within buffer (inclusive) // Test: both start and end within buffer (inclusive)
let result = indexer.get_events_in_id_range(Some(11), Some(13)).await; let result = indexer.get_events_in_id_range(Some(11), Some(13)).await;
assert!(matches!(result, WorkerKvQueryResponse::Events(_)));
assert_eq!( assert_eq!(
get_ids(result), get_ids(extract_events(result)),
vec![11, 12, 13], vec![11, 12, 13],
"range [11, 13] inclusive should return 3 events" "range [11, 13] inclusive should return 3 events"
); );
let result = indexer.get_events_in_id_range(Some(10), Some(14)).await; let result = indexer.get_events_in_id_range(Some(10), Some(14)).await;
assert!(matches!(result, WorkerKvQueryResponse::Events(_)));
assert_eq!( assert_eq!(
get_ids(result), get_ids(extract_events(result)),
vec![10, 11, 12, 13, 14], vec![10, 11, 12, 13, 14],
"range [10, 14] should return all buffer events" "range [10, 14] should return all buffer events"
); );
...@@ -1853,31 +1817,35 @@ mod local_kv_indexer_tests { ...@@ -1853,31 +1817,35 @@ mod local_kv_indexer_tests {
// Test: (None, None) dumps entire tree // Test: (None, None) dumps entire tree
let result = indexer.get_events_in_id_range(None, None).await; let result = indexer.get_events_in_id_range(None, None).await;
assert!(matches!(result, WorkerKvQueryResponse::TreeDump(_)));
assert_eq!( assert_eq!(
result.len(), extract_events(result).len(),
10, 10,
"(None, None) should dump entire tree (10 events)" "(None, None) should dump entire tree (10 events)"
); );
// Test: (None, Some(_)) dumps entire tree // Test: (None, Some(_)) dumps entire tree
let result = indexer.get_events_in_id_range(None, Some(8)).await; let result = indexer.get_events_in_id_range(None, Some(8)).await;
assert!(matches!(result, WorkerKvQueryResponse::TreeDump(_)));
assert_eq!( assert_eq!(
result.len(), extract_events(result).len(),
10, 10,
"(None, Some(_)) dumps entire tree - end_id is ignored for tree dumps" "(None, Some(_)) dumps entire tree - end_id is ignored for tree dumps"
); );
// Test: start_id before buffer triggers tree dump // Test: start_id before buffer triggers tree dump
let result = indexer.get_events_in_id_range(Some(7), None).await; let result = indexer.get_events_in_id_range(Some(7), None).await;
assert!(matches!(result, WorkerKvQueryResponse::TreeDump(_)));
assert_eq!( assert_eq!(
result.len(), extract_events(result).len(),
10, 10,
"start_id=7 (before buffer) should dump entire tree" "start_id=7 (before buffer) should dump entire tree"
); );
let result = indexer.get_events_in_id_range(Some(5), Some(12)).await; let result = indexer.get_events_in_id_range(Some(5), Some(12)).await;
assert!(matches!(result, WorkerKvQueryResponse::TreeDump(_)));
assert_eq!( assert_eq!(
result.len(), extract_events(result).len(),
10, 10,
"range [5, 12] extending before buffer should dump entire tree" "range [5, 12] extending before buffer should dump entire tree"
); );
...@@ -1886,20 +1854,32 @@ mod local_kv_indexer_tests { ...@@ -1886,20 +1854,32 @@ mod local_kv_indexer_tests {
// Single element when start == end (inclusive range) // Single element when start == end (inclusive range)
let result = indexer.get_events_in_id_range(Some(12), Some(12)).await; let result = indexer.get_events_in_id_range(Some(12), Some(12)).await;
assert!(matches!(result, WorkerKvQueryResponse::Events(_)));
assert_eq!( assert_eq!(
get_ids(result), get_ids(extract_events(result)),
vec![12], vec![12],
"start == end should return single event" "start == end should return single event"
); );
// Empty when start > end // InvalidRange when start > end
let result = indexer.get_events_in_id_range(Some(15), Some(10)).await; let result = indexer.get_events_in_id_range(Some(15), Some(10)).await;
assert!(result.is_empty(), "start > end should return empty"); assert!(
matches!(result, WorkerKvQueryResponse::InvalidRange { .. }),
"start > end should return InvalidRange"
);
// Request beyond buffer but valid range -> buffer returns what it has // TooNew when start_id is beyond buffer
let result = indexer.get_events_in_id_range(Some(100), Some(200)).await;
assert!(
matches!(result, WorkerKvQueryResponse::TooNew { .. }),
"start_id beyond buffer should return TooNew"
);
// Request with end beyond buffer but valid start -> buffer returns what it has
let result = indexer.get_events_in_id_range(Some(12), Some(100)).await; let result = indexer.get_events_in_id_range(Some(12), Some(100)).await;
assert!(matches!(result, WorkerKvQueryResponse::Events(_)));
assert_eq!( assert_eq!(
get_ids(result), get_ids(extract_events(result)),
vec![12, 13, 14], vec![12, 13, 14],
"range with end beyond buffer should return available buffer events" "range with end beyond buffer should return available buffer events"
); );
...@@ -3746,22 +3726,24 @@ mod tests_local_indexer { ...@@ -3746,22 +3726,24 @@ mod tests_local_indexer {
assert_eq!(buffered_events[0].worker_id, worker_id); assert_eq!(buffered_events[0].worker_id, worker_id);
assert_eq!(buffered_events[0].event.event_id, 1); assert_eq!(buffered_events[0].event.event_id, 1);
// Build the response that would be sent // Build the response that would be sent (Events variant)
let response = WorkerKvQueryResponse { let response = WorkerKvQueryResponse::Events(buffered_events.clone());
events: buffered_events.clone(),
};
// Test serialization/deserialization (simulating NATS round-trip) // Test serialization/deserialization (simulating NATS round-trip)
let serialized = serde_json::to_vec(&response).unwrap(); let serialized = serde_json::to_vec(&response).unwrap();
let deserialized: WorkerKvQueryResponse = serde_json::from_slice(&serialized).unwrap(); let deserialized: WorkerKvQueryResponse = serde_json::from_slice(&serialized).unwrap();
// Verify response correctness // Verify response correctness
assert_eq!(deserialized.events.len(), 1); let events = match deserialized {
assert_eq!(deserialized.events[0].worker_id, worker_id); WorkerKvQueryResponse::Events(e) => e,
assert_eq!(deserialized.events[0].event.event_id, 1); _ => panic!("Expected Events variant"),
};
assert_eq!(events.len(), 1);
assert_eq!(events[0].worker_id, worker_id);
assert_eq!(events[0].event.event_id, 1);
// Verify event data // Verify event data
match &deserialized.events[0].event.data { match &events[0].event.data {
KvCacheEventData::Stored(store_data) => { KvCacheEventData::Stored(store_data) => {
assert_eq!(store_data.blocks.len(), 1); assert_eq!(store_data.blocks.len(), 1);
assert_eq!(store_data.blocks[0].block_hash.0, 100); assert_eq!(store_data.blocks[0].block_hash.0, 100);
......
...@@ -21,7 +21,7 @@ use dynamo_runtime::traits::{ ...@@ -21,7 +21,7 @@ use dynamo_runtime::traits::{
}; };
use dynamo_runtime::{ use dynamo_runtime::{
component::{Component, Namespace}, component::{Component, Namespace},
transports::nats::{NatsQueue, QUEUE_NAME, Slug}, transports::nats::{NatsQueue, Slug},
}; };
use futures::StreamExt; use futures::StreamExt;
...@@ -29,8 +29,8 @@ use crate::kv_router::{ ...@@ -29,8 +29,8 @@ use crate::kv_router::{
KV_EVENT_SUBJECT, KV_METRICS_SUBJECT, WORKER_KV_INDEXER_BUFFER_SIZE, KV_EVENT_SUBJECT, KV_METRICS_SUBJECT, WORKER_KV_INDEXER_BUFFER_SIZE,
WORKER_KV_INDEXER_QUERY_SUBJECT, WORKER_KV_INDEXER_QUERY_SUBJECT,
indexer::{ indexer::{
KvIndexerInterface, KvIndexerMetrics, LocalKvIndexer, RouterEvent, WorkerKvQueryRequest, KvIndexerMetrics, LocalKvIndexer, RouterEvent, WorkerKvQueryRequest,
WorkerKvQueryResponse, compute_block_hash_for_seq, compute_block_hash_for_seq,
}, },
protocols::*, protocols::*,
}; };
...@@ -179,35 +179,55 @@ impl KvEventPublisher { ...@@ -179,35 +179,55 @@ impl KvEventPublisher {
)) ))
}); });
let stream_name = Slug::slugify(&format!("{}.{}", component.subject(), KV_EVENT_SUBJECT))
.to_string()
.replace("_", "-");
let nats_server = std::env::var(env_nats::NATS_SERVER)
.unwrap_or_else(|_| "nats://localhost:4222".to_string());
// Create NatsQueue without consumer since we're only publishing
let mut nats_queue = NatsQueue::new_without_consumer(
stream_name,
nats_server,
std::time::Duration::from_secs(60), // 1 minute timeout
);
// Connect the NatsQueue before passing it to the event processor // Connect the NatsQueue before passing it to the event processor
let cancellation_token_clone = cancellation_token.clone(); let cancellation_token_clone = cancellation_token.clone();
let local_indexer_clone = local_indexer.clone(); let local_indexer_clone = local_indexer.clone();
component.drt().runtime().secondary().spawn(async move {
if let Err(e) = nats_queue.connect().await { if enable_local_indexer {
tracing::error!("Failed to connect NatsQueue: {}", e); // When local indexer is enabled, use NATS Core (Component) for publishing.
return; // This is simpler and doesn't require JetStream durability since recovery
} // is handled via the local indexer's event buffer.
start_event_processor( tracing::info!("Using NATS Core for KV event publishing (local_indexer mode)");
nats_queue, let component_clone = component.clone();
worker_id, component.drt().runtime().secondary().spawn(async move {
cancellation_token_clone, start_event_processor(
rx, component_clone,
local_indexer_clone, worker_id,
) cancellation_token_clone,
.await rx,
}); local_indexer_clone,
)
.await
});
} else {
// When local indexer is disabled, use JetStream (NatsQueue) for durability.
let stream_name =
Slug::slugify(&format!("{}.{}", component.subject(), KV_EVENT_SUBJECT))
.to_string()
.replace("_", "-");
let nats_server = std::env::var(env_nats::NATS_SERVER)
.unwrap_or_else(|_| "nats://localhost:4222".to_string());
let mut nats_queue = NatsQueue::new_without_consumer(
stream_name,
nats_server,
std::time::Duration::from_secs(60), // 1 minute timeout
);
component.drt().runtime().secondary().spawn(async move {
if let Err(e) = nats_queue.connect().await {
tracing::error!("Failed to connect NatsQueue: {e}");
return;
}
start_event_processor(
nats_queue,
worker_id,
cancellation_token_clone,
rx,
local_indexer_clone,
)
.await
});
}
Ok(Self { Ok(Self {
kv_block_size, kv_block_size,
...@@ -278,7 +298,9 @@ async fn start_event_processor<P: EventPublisher + Send + Sync + 'static>( ...@@ -278,7 +298,9 @@ async fn start_event_processor<P: EventPublisher + Send + Sync + 'static>(
} }
// Then publish to NATS for global distribution // Then publish to NATS for global distribution
if let Err(e) = publisher.publish(QUEUE_NAME, &router_event).await { // Use KV_EVENT_SUBJECT so both JetStream and NATS Core subscribers
// can receive events on the expected subject.
if let Err(e) = publisher.publish(KV_EVENT_SUBJECT, &router_event).await {
tracing::error!("Failed to publish event to NATS: {}", e); tracing::error!("Failed to publish event to NATS: {}", e);
} }
...@@ -332,31 +354,12 @@ async fn start_worker_kv_query_service( ...@@ -332,31 +354,12 @@ async fn start_worker_kv_query_service(
} }
}; };
// TODO extract request event id range. For now, just debug print
tracing::debug!("Received WorkerKvQueryRequest: {:?}", request); tracing::debug!("Received WorkerKvQueryRequest: {:?}", request);
// Resolve which events to return based on optional start/end ids // Query events based on optional start/end ids
let events = match (request.start_event_id, request.end_event_id) { let response = local_indexer
(None, None) => { .get_events_in_id_range(request.start_event_id, request.end_event_id)
match local_indexer.dump_events().await { .await;
Ok(events) => events,
Err(err) => {
tracing::error!(
error = %err,
worker_id,
"Failed to dump events for WorkerKvQueryRequest; returning buffered events instead"
);
local_indexer.get_all_events_in_buffer()
}
}
}
_ => {
local_indexer.get_events_in_id_range(request.start_event_id, request.end_event_id).await
}
};
// Build WorkerKvQueryResponse
let response = WorkerKvQueryResponse { events };
// Send reply back (if reply subject exists) // Send reply back (if reply subject exists)
if let Some(reply_subject) = msg.reply { if let Some(reply_subject) = msg.reply {
...@@ -1281,7 +1284,7 @@ mod tests_startup_helpers { ...@@ -1281,7 +1284,7 @@ mod tests_startup_helpers {
let published = published.lock().unwrap(); let published = published.lock().unwrap();
assert_eq!(published.len(), 1); assert_eq!(published.len(), 1);
let (subject, _) = &published[0]; let (subject, _) = &published[0];
assert_eq!(subject, QUEUE_NAME); assert_eq!(subject, KV_EVENT_SUBJECT);
} }
//-------------------------------------------------------------------- //--------------------------------------------------------------------
...@@ -1339,7 +1342,7 @@ mod tests_startup_helpers { ...@@ -1339,7 +1342,7 @@ mod tests_startup_helpers {
let published_events = published.lock().unwrap(); let published_events = published.lock().unwrap();
assert_eq!(published_events.len(), 1); assert_eq!(published_events.len(), 1);
let (subject, _) = &published_events[0]; let (subject, _) = &published_events[0];
assert_eq!(subject, QUEUE_NAME); assert_eq!(subject, KV_EVENT_SUBJECT);
} // drop lock } // drop lock
// Verify event was applied to local indexer // Verify event was applied to local indexer
...@@ -1726,7 +1729,7 @@ mod tests_startup_helpers { ...@@ -1726,7 +1729,7 @@ mod tests_startup_helpers {
assert_eq!(published.len(), 1, "Worker should have published 1 event"); assert_eq!(published.len(), 1, "Worker should have published 1 event");
(published[0].0.clone(), published[0].1.clone()) (published[0].0.clone(), published[0].1.clone())
}; // drop worker_published before await }; // drop worker_published before await
assert_eq!(subject, QUEUE_NAME); assert_eq!(subject, KV_EVENT_SUBJECT);
let router_event: RouterEvent = rmp_serde::from_slice(&bytes).unwrap(); let router_event: RouterEvent = rmp_serde::from_slice(&bytes).unwrap();
router_indexer router_indexer
...@@ -1828,9 +1831,14 @@ mod tests_startup_helpers { ...@@ -1828,9 +1831,14 @@ mod tests_startup_helpers {
); );
// Step 4b: Query worker's local indexer for events after last_known_id // Step 4b: Query worker's local indexer for events after last_known_id
let missed_events = local_indexer_1 let response = local_indexer_1
.get_events_in_id_range(Some(last_known_id + 1), None) .get_events_in_id_range(Some(last_known_id + 1), None)
.await; .await;
let missed_events = match response {
crate::kv_router::indexer::WorkerKvQueryResponse::Events(e) => e,
crate::kv_router::indexer::WorkerKvQueryResponse::TreeDump(e) => e,
other => panic!("Unexpected response: {:?}", other),
};
assert_eq!( assert_eq!(
missed_events.len(), missed_events.len(),
1, 1,
...@@ -2293,11 +2301,16 @@ mod test_integration_publisher_with_kvindexer { ...@@ -2293,11 +2301,16 @@ mod test_integration_publisher_with_kvindexer {
let response = kv_router let response = kv_router
.query_worker_local_kv(worker_id, None, None) .query_worker_local_kv(worker_id, None, None)
.await?; .await?;
if response.events.is_empty() { let events = match response {
crate::kv_router::indexer::WorkerKvQueryResponse::Events(e) => e,
crate::kv_router::indexer::WorkerKvQueryResponse::TreeDump(e) => e,
_ => vec![],
};
if events.is_empty() {
continue; continue;
} }
let event_count = response.events.len(); let event_count = events.len();
tracing::info!( tracing::info!(
worker_id, worker_id,
events = event_count, events = event_count,
...@@ -2323,8 +2336,13 @@ mod test_integration_publisher_with_kvindexer { ...@@ -2323,8 +2336,13 @@ mod test_integration_publisher_with_kvindexer {
let response = kv_router let response = kv_router
.query_worker_local_kv(worker_id, None, None) .query_worker_local_kv(worker_id, None, None)
.await?; .await?;
let events = match response {
crate::kv_router::indexer::WorkerKvQueryResponse::Events(e) => e,
crate::kv_router::indexer::WorkerKvQueryResponse::TreeDump(e) => e,
_ => vec![],
};
assert!( assert!(
response.events.is_empty(), events.is_empty(),
"Worker {worker_id} should not report buffered KV events; best worker {best_worker_id} reported {best_worker_event_count}" "Worker {worker_id} should not report buffered KV events; best worker {best_worker_id} reported {best_worker_event_count}"
); );
} }
...@@ -2437,6 +2455,24 @@ mod test_integration_publisher_with_kvindexer { ...@@ -2437,6 +2455,24 @@ mod test_integration_publisher_with_kvindexer {
.endpoint("generate"); .endpoint("generate");
let pre_client = pre_backend_endpoint.client().await?; let pre_client = pre_backend_endpoint.client().await?;
// Wait for the client to discover both workers
let discovery_timeout = Duration::from_secs(5);
let discovery_start = std::time::Instant::now();
loop {
let instances = pre_client.instance_source.as_ref().borrow().clone();
if instances.len() >= 2 {
tracing::info!("Discovered {} workers", instances.len());
break;
}
if discovery_start.elapsed() > discovery_timeout {
anyhow::bail!(
"Timed out waiting for worker discovery: expected 2, found {}",
instances.len()
);
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
// Create a PushRouter to send requests directly to a specific worker // Create a PushRouter to send requests directly to a specific worker
let pre_push_router = let pre_push_router =
PushRouter::<PreprocessedRequest, Annotated<LLMEngineOutput>>::from_client_with_threshold( PushRouter::<PreprocessedRequest, Annotated<LLMEngineOutput>>::from_client_with_threshold(
...@@ -2495,13 +2531,32 @@ mod test_integration_publisher_with_kvindexer { ...@@ -2495,13 +2531,32 @@ mod test_integration_publisher_with_kvindexer {
.await?, .await?,
); );
// At this point kvrouter's indexer should already have the // The KvRouter now starts its subscriber asynchronously in a background task
// events stored in the workers, due to the catch-up built into KvRouter::new. // that waits for runtime_configs. Poll until events appear or timeout.
// Each request generates 2 events: input block (parent_hash: None) + output block (parent_hash: Some) // Each request generates 2 events: input block (parent_hash: None) + output block (parent_hash: Some)
// With 2 workers, that's 4 events total. // With 2 workers, that's 4 events total.
let global_kv_events = kv_router.indexer.dump_events().await?; let expected_events = 4;
tracing::debug!("Global KV events: {:?}", global_kv_events); let max_wait = Duration::from_secs(10);
assert_eq!(global_kv_events.len(), 4); // 2 workers × 2 events per request (input + output) let poll_interval = Duration::from_millis(100);
let start = std::time::Instant::now();
let global_kv_events = loop {
let events = kv_router.indexer.dump_events().await?;
tracing::debug!("Global KV events ({}): {:?}", events.len(), events);
if events.len() >= expected_events {
break events;
}
if start.elapsed() > max_wait {
anyhow::bail!(
"Timed out waiting for KV events: expected {}, got {}",
expected_events,
events.len()
);
}
tokio::time::sleep(poll_interval).await;
};
assert_eq!(global_kv_events.len(), expected_events); // 2 workers × 2 events per request (input + output)
// === Cleanup === // === Cleanup ===
for handle in server_handles { for handle in server_handles {
......
...@@ -9,7 +9,7 @@ use dynamo_runtime::{ ...@@ -9,7 +9,7 @@ use dynamo_runtime::{
config::environment_names::nats as env_nats, config::environment_names::nats as env_nats,
discovery::{DiscoveryEvent, DiscoveryQuery}, discovery::{DiscoveryEvent, DiscoveryQuery},
prelude::*, prelude::*,
traits::events::EventPublisher, traits::events::{EventPublisher, EventSubscriber},
transports::nats::{NatsQueue, Slug}, transports::nats::{NatsQueue, Slug},
}; };
use futures::StreamExt; use futures::StreamExt;
...@@ -19,7 +19,7 @@ use tokio_util::sync::CancellationToken; ...@@ -19,7 +19,7 @@ use tokio_util::sync::CancellationToken;
use crate::kv_router::{ use crate::kv_router::{
KV_EVENT_SUBJECT, RADIX_STATE_BUCKET, RADIX_STATE_FILE, KV_EVENT_SUBJECT, RADIX_STATE_BUCKET, RADIX_STATE_FILE,
indexer::{DumpRequest, GetWorkersRequest, RouterEvent}, indexer::{DumpRequest, GetWorkersRequest, RouterEvent, WorkerKvQueryResponse},
protocols::WorkerId, protocols::WorkerId,
router_discovery_query, router_discovery_query,
worker_query::WorkerQueryClient, worker_query::WorkerQueryClient,
...@@ -140,8 +140,8 @@ pub async fn recover_from_worker( ...@@ -140,8 +140,8 @@ pub async fn recover_from_worker(
); );
} else { } else {
tracing::warn!( tracing::warn!(
"Worker {} does not have local indexer enabled, skipping recovery", worker_id,
worker_id "Worker does not have local indexer enabled, skipping recovery"
); );
return Ok(0); return Ok(0);
} }
...@@ -151,13 +151,46 @@ pub async fn recover_from_worker( ...@@ -151,13 +151,46 @@ pub async fn recover_from_worker(
.query_worker(worker_id, start_event_id, end_event_id) .query_worker(worker_id, start_event_id, end_event_id)
.await?; .await?;
let events_count = response.events.len(); // Handle response variants
let events = match response {
WorkerKvQueryResponse::Events(events) => {
tracing::debug!(worker_id, count = events.len(), "Got buffered events");
events
}
WorkerKvQueryResponse::TreeDump(events) => {
tracing::info!(
worker_id,
count = events.len(),
"Got tree dump (range too old or unspecified)"
);
events
}
WorkerKvQueryResponse::TooNew {
requested_start,
requested_end,
newest_available,
} => {
tracing::warn!(
worker_id,
?requested_start,
?requested_end,
newest_available,
"Requested range is newer than available data"
);
return Ok(0);
}
WorkerKvQueryResponse::InvalidRange { start_id, end_id } => {
anyhow::bail!("Invalid range: end_id ({end_id}) < start_id ({start_id})");
}
};
let events_count = events.len();
if events_count == 0 { if events_count == 0 {
tracing::debug!( tracing::debug!(
worker_id, worker_id,
start_event_id = ?start_event_id, start_event_id = ?start_event_id,
"No missed events to recover from worker" "No events to recover from worker"
); );
return Ok(0); return Ok(0);
} }
...@@ -166,19 +199,14 @@ pub async fn recover_from_worker( ...@@ -166,19 +199,14 @@ pub async fn recover_from_worker(
worker_id, worker_id,
start_event_id = ?start_event_id, start_event_id = ?start_event_id,
events_count, events_count,
"Recovered {} missed events from worker", "Recovered {events_count} events from worker"
events_count
); );
// Apply recovered events to the indexer // Apply recovered events to the indexer
for event in response.events { for event in events {
if let Err(e) = event_tx.send(event).await { if let Err(e) = event_tx.send(event).await {
tracing::error!( tracing::error!(worker_id, error = %e, "Failed to send recovered event to indexer");
worker_id, anyhow::bail!("Failed to send recovered event: {e}");
error = %e,
"Failed to send recovered event to indexer"
);
anyhow::bail!("Failed to send recovered event: {}", e);
} }
} }
...@@ -606,6 +634,187 @@ pub async fn start_kv_router_background( ...@@ -606,6 +634,187 @@ pub async fn start_kv_router_background(
Ok(()) Ok(())
} }
/// Start a simplified background task for event consumption using NATS Core.
///
/// This is used when local indexer mode is enabled. Unlike `start_kv_router_background`,
/// this function:
/// - Uses NATS Core pub/sub instead of JetStream
/// - Does not support snapshots, purging, or durable consumers
/// - On worker Added: dumps worker's local indexer into router
/// - On worker Removed: removes worker from router indexer
///
/// This is appropriate when workers have local indexers enabled.
pub async fn start_kv_router_background_nats_core(
component: Component,
kv_events_tx: mpsc::Sender<RouterEvent>,
remove_worker_tx: mpsc::Sender<WorkerId>,
cancellation_token: CancellationToken,
worker_query_client: WorkerQueryClient,
) -> Result<()> {
// Subscribe to KV events using NATS Core
let mut subscriber = component.subscribe(KV_EVENT_SUBJECT).await?;
tracing::info!(
"KV Router using NATS Core subscription on subject: {}.{} (local_indexer mode)",
component.subject(),
KV_EVENT_SUBJECT
);
// Get the generate endpoint and watch for instance events (add/remove)
let discovery_client = component.drt().discovery();
let generate_discovery_key = DiscoveryQuery::Endpoint {
namespace: component.namespace().name().to_string(),
component: component.name().to_string(),
endpoint: "generate".to_string(),
};
let mut instance_event_stream = discovery_client
.list_and_watch(generate_discovery_key, Some(cancellation_token.clone()))
.await?;
tokio::spawn(async move {
// Track last received event ID per worker for gap detection
let mut last_event_ids: HashMap<WorkerId, u64> = HashMap::new();
loop {
tokio::select! {
biased;
_ = cancellation_token.cancelled() => {
tracing::debug!("KV Router NATS Core background task received cancellation signal");
break;
}
// Handle generate endpoint instance add/remove events
Some(discovery_event_result) = instance_event_stream.next() => {
let Ok(discovery_event) = discovery_event_result else {
continue;
};
match discovery_event {
DiscoveryEvent::Added(_instance) => {
// Extract worker_id from the instance
let worker_id = _instance.instance_id();
tracing::info!(
worker_id = worker_id,
"DISCOVERY: Worker added, dumping local indexer into router"
);
// Query worker's local indexer and dump all events
match recover_from_worker(
&worker_query_client,
worker_id,
None, // Start from beginning
None, // Get all events
&kv_events_tx,
)
.await
{
Ok(count) => {
tracing::info!(
worker_id = worker_id,
events_recovered = count,
"Successfully dumped worker's local indexer"
);
}
Err(e) => {
tracing::warn!(
worker_id = worker_id,
error = %e,
"Failed to dump worker's local indexer (may not have local indexer enabled)"
);
}
}
}
DiscoveryEvent::Removed(worker_id) => {
tracing::warn!(
worker_id = worker_id,
"DISCOVERY: Worker removed, removing from router indexer"
);
if let Err(e) = remove_worker_tx.send(worker_id).await {
tracing::warn!("Failed to send worker removal for worker {worker_id}: {e}");
}
}
}
}
// Handle event consumption from NATS Core subscription
Some(msg) = subscriber.next() => {
let event: RouterEvent = match serde_json::from_slice(&msg.payload) {
Ok(event) => event,
Err(e) => {
tracing::warn!("Failed to deserialize RouterEvent from NATS Core: {e:?}");
continue;
}
};
let worker_id = event.worker_id;
let event_id = event.event.event_id;
// Gap detection: check if event ID is monotonically increasing per worker
// Note: event_id <= last_id is duplicate/out-of-order, apply anyway (idempotent)
if let Some(&last_id) = last_event_ids.get(&worker_id)
&& event_id > last_id + 1
{
// Gap detected - recover missing events before processing current
let gap_start = last_id + 1;
let gap_end = event_id - 1;
tracing::warn!(
worker_id,
gap_start,
gap_end,
gap_size = gap_end - gap_start + 1,
"Event ID gap detected, recovering events [{gap_start}, {gap_end}]"
);
// Note: While recovering, new events may queue in the NATS subscriber's
// internal buffer. We don't explicitly buffer them here for simplicity.
// The subscriber will process them in order after recovery completes.
if let Err(e) = recover_from_worker(
&worker_query_client,
worker_id,
Some(gap_start),
Some(gap_end),
&kv_events_tx,
).await {
tracing::error!(
worker_id,
gap_start,
gap_end,
error = %e,
"Failed to recover gap events; proceeding with current event anyway"
);
// Note: If recovery fails, we still apply the current event.
// The tree will have a gap, but it's better than dropping the event.
}
}
// First event from this worker is always valid - we accept whatever ID it has.
// This handles initial startup and worker restarts without requiring event 0.
// Update last seen event ID (use max to handle out-of-order)
last_event_ids
.entry(worker_id)
.and_modify(|id| *id = (*id).max(event_id))
.or_insert(event_id);
// Forward the RouterEvent to the indexer
if let Err(e) = kv_events_tx.send(event).await {
tracing::warn!(
"failed to send kv event to indexer; shutting down: {e:?}"
);
break;
}
}
}
}
tracing::debug!("KV Router NATS Core background task exiting");
});
Ok(())
}
/// Cleanup orphaned NATS consumers that no longer have corresponding router entries /// Cleanup orphaned NATS consumers that no longer have corresponding router entries
async fn cleanup_orphaned_consumers( async fn cleanup_orphaned_consumers(
nats_queue: &mut NatsQueue, nats_queue: &mut NatsQueue,
......
...@@ -57,19 +57,16 @@ impl WorkerQueryClient { ...@@ -57,19 +57,16 @@ impl WorkerQueryClient {
// Check if worker has local indexer enabled // Check if worker has local indexer enabled
if !self.has_local_indexer(worker_id) { if !self.has_local_indexer(worker_id) {
anyhow::bail!( anyhow::bail!(
"Worker {} does not have local indexer enabled (enable_local_indexer=false or not set in MDC user_data)", "Worker {worker_id} does not have local indexer enabled (enable_local_indexer=false or not set in MDC user_data)"
worker_id
); );
} }
// Match worker's subscribe format // Match worker's subscribe format
let subject_str = format!("{}.{}", WORKER_KV_INDEXER_QUERY_SUBJECT, worker_id); // see publisher.rs/start_worker_kv_query_service() let subject_str = format!("{}.{worker_id}", WORKER_KV_INDEXER_QUERY_SUBJECT); // see publisher.rs/start_worker_kv_query_service()
let subject = format!("{}.{}", self.component.subject(), subject_str); let subject = format!("{}.{subject_str}", self.component.subject());
tracing::debug!( tracing::debug!(
"Router sending query request to worker {} on NATS subject: {}", "Router sending query request to worker {worker_id} on NATS subject: {subject}"
worker_id,
subject
); );
// Create and serialize request // Create and serialize request
...@@ -89,10 +86,7 @@ impl WorkerQueryClient { ...@@ -89,10 +86,7 @@ impl WorkerQueryClient {
.kv_router_nats_request(subject.clone(), request_bytes.into(), timeout) .kv_router_nats_request(subject.clone(), request_bytes.into(), timeout)
.await .await
.with_context(|| { .with_context(|| {
format!( format!("Failed to send request to worker {worker_id} on subject {subject}")
"Failed to send request to worker {} on subject {}",
worker_id, subject
)
})?; })?;
// Deserialize response // Deserialize response
......
...@@ -173,6 +173,8 @@ def _build_mocker_command( ...@@ -173,6 +173,8 @@ def _build_mocker_command(
command.extend(["--watermark", str(mocker_args["watermark"])]) command.extend(["--watermark", str(mocker_args["watermark"])])
if "dp_size" in mocker_args: if "dp_size" in mocker_args:
command.extend(["--data-parallel-size", str(mocker_args["dp_size"])]) command.extend(["--data-parallel-size", str(mocker_args["dp_size"])])
if mocker_args.get("enable_local_indexer"):
command.append("--enable-local-indexer")
return command return command
...@@ -559,22 +561,32 @@ def test_query_instance_id_returns_worker_and_tokens( ...@@ -559,22 +561,32 @@ def test_query_instance_id_returns_worker_and_tokens(
@pytest.mark.parallel @pytest.mark.parallel
def test_router_decisions(request, runtime_services_session, predownload_tokenizers): @pytest.mark.parametrize("use_nats_core", [False, True], ids=["jetstream", "nats_core"])
"""Validate KV cache prefix reuse and dp_rank routing by sending progressive requests with overlapping prefixes.""" def test_router_decisions(
request, runtime_services_session, predownload_tokenizers, use_nats_core
):
"""Validate KV cache prefix reuse and dp_rank routing by sending progressive requests with overlapping prefixes.
Parameterized to test both JetStream (default) and NATS Core (local indexer) modes.
"""
# runtime_services starts etcd and nats # runtime_services starts etcd and nats
logger.info("Starting test router prefix reuse and KV events synchronization") mode = "NATS Core (local indexer)" if use_nats_core else "JetStream"
logger.info(
f"Starting test router prefix reuse and KV events synchronization ({mode})"
)
# Create mocker args dictionary with dp_size=4 # Create mocker args dictionary with dp_size=4
mocker_args = { mocker_args = {
"speedup_ratio": SPEEDUP_RATIO, "speedup_ratio": SPEEDUP_RATIO,
"block_size": BLOCK_SIZE, "block_size": BLOCK_SIZE,
"dp_size": 4, "dp_size": 4,
"enable_local_indexer": use_nats_core,
} }
try: try:
logger.info( logger.info(
"Starting 2 mocker instances with dp_size=4 each (8 total dp ranks)" f"Starting 2 mocker instances with dp_size=4 each (8 total dp ranks), {mode}"
) )
mockers = MockerProcess(request, mocker_args=mocker_args, num_mockers=2) mockers = MockerProcess(request, mocker_args=mocker_args, num_mockers=2)
logger.info(f"All mockers using endpoint: {mockers.endpoint}") logger.info(f"All mockers using endpoint: {mockers.endpoint}")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment