Unverified Commit 7ebd5f82 authored by Biswa Panda's avatar Biswa Panda Committed by GitHub
Browse files

feat: [router] serve worker KV query over dynamo endpoint instead of nats (#5451)

parent 283b20c9
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
set -e
trap 'echo Cleaning up...; kill 0' EXIT
# Set deterministic hash for KV event IDs
export PYTHONHASHSEED=0
# Common configuration
MODEL="Qwen/Qwen3-0.6B"
BLOCK_SIZE=64
# run two routers (different HTTP + system ports)
# Note: use --router-reset-states only on one router to avoid wiping shared state twice.
DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT_R1:-8091} \
python -m dynamo.frontend \
--router-mode kv \
--router-reset-states \
--http-port ${DYN_HTTP_PORT_R1:-8000} &
DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT_R2:-8092} \
python -m dynamo.frontend \
--router-mode kv \
--http-port ${DYN_HTTP_PORT_R2:-8001} &
# run workers (enable local indexer so routers can query on restart)
DYN_LOCAL_INDEXER=true \
DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT1:-8081} \
CUDA_VISIBLE_DEVICES=0 python3 -m dynamo.vllm \
--model $MODEL \
--block-size $BLOCK_SIZE \
--enforce-eager \
--connector none \
--enable-local-indexer true \
--kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:20080","enable_kv_cache_events":true}' &
DYN_LOCAL_INDEXER=true \
DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT2:-8082} \
VLLM_NIXL_SIDE_CHANNEL_PORT=20097 \
CUDA_VISIBLE_DEVICES=1 python3 -m dynamo.vllm \
--model $MODEL \
--block-size $BLOCK_SIZE \
--enforce-eager \
--connector none \
--enable-local-indexer true \
--kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:20081","enable_kv_cache_events":true}'
\ No newline at end of file
...@@ -79,7 +79,7 @@ pub const RADIX_STATE_BUCKET: &str = "radix-bucket"; ...@@ -79,7 +79,7 @@ pub const RADIX_STATE_BUCKET: &str = "radix-bucket";
pub const RADIX_STATE_FILE: &str = "radix-state"; pub const RADIX_STATE_FILE: &str = "radix-state";
// for worker-local kvindexer query // for worker-local kvindexer query
pub const WORKER_KV_INDEXER_QUERY_SUBJECT: &str = "worker_kv_indexer_query"; pub const WORKER_KV_INDEXER_QUERY_ENDPOINT: &str = "worker_kv_indexer_query";
pub const WORKER_KV_INDEXER_BUFFER_SIZE: usize = 1024; // store 1024 most recent events in worker buffer pub const WORKER_KV_INDEXER_BUFFER_SIZE: usize = 1024; // store 1024 most recent events in worker buffer
// for router discovery registration // for router discovery registration
......
...@@ -35,6 +35,7 @@ use async_trait::async_trait; ...@@ -35,6 +35,7 @@ use async_trait::async_trait;
use dynamo_runtime::{ use dynamo_runtime::{
component::Component, component::Component,
metrics::{MetricsHierarchy, prometheus_names::kvrouter}, metrics::{MetricsHierarchy, prometheus_names::kvrouter},
protocols::maybe_error::MaybeError,
}; };
use prometheus::{IntCounterVec, Opts}; use prometheus::{IntCounterVec, Opts};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
...@@ -142,6 +143,21 @@ pub enum WorkerKvQueryResponse { ...@@ -142,6 +143,21 @@ pub enum WorkerKvQueryResponse {
}, },
/// Invalid range: end_id < start_id /// Invalid range: end_id < start_id
InvalidRange { start_id: u64, end_id: u64 }, InvalidRange { start_id: u64, end_id: u64 },
/// Query failed on worker (serialized error)
Error(String),
}
impl MaybeError for WorkerKvQueryResponse {
fn from_err(err: Box<dyn std::error::Error + Send + Sync>) -> Self {
WorkerKvQueryResponse::Error(err.to_string())
}
fn err(&self) -> Option<anyhow::Error> {
match self {
WorkerKvQueryResponse::Error(msg) => Some(anyhow::Error::msg(msg.clone())),
_ => None,
}
}
} }
/// A block in the Radix Tree. /// A block in the Radix Tree.
......
...@@ -16,20 +16,17 @@ use tokio_util::sync::CancellationToken; ...@@ -16,20 +16,17 @@ use tokio_util::sync::CancellationToken;
use zeromq::{Socket, SocketRecv, SubSocket}; use zeromq::{Socket, SocketRecv, SubSocket};
use dynamo_runtime::metrics::{MetricsHierarchy, prometheus_names::kvstats}; use dynamo_runtime::metrics::{MetricsHierarchy, prometheus_names::kvstats};
use dynamo_runtime::traits::{ use dynamo_runtime::traits::{DistributedRuntimeProvider, events::EventPublisher};
DistributedRuntimeProvider, events::EventPublisher, events::EventSubscriber,
};
use dynamo_runtime::{ use dynamo_runtime::{
component::{Component, Namespace}, component::{Component, Namespace},
transports::nats::{NatsQueue, Slug}, transports::nats::{NatsQueue, Slug},
}; };
use futures::StreamExt;
use crate::kv_router::{ use crate::kv_router::{
KV_EVENT_SUBJECT, KV_METRICS_SUBJECT, WORKER_KV_INDEXER_BUFFER_SIZE, KV_EVENT_SUBJECT, KV_METRICS_SUBJECT, WORKER_KV_INDEXER_BUFFER_SIZE,
WORKER_KV_INDEXER_QUERY_SUBJECT, indexer::{KvIndexerMetrics, LocalKvIndexer, RouterEvent},
indexer::{KvIndexerMetrics, LocalKvIndexer, RouterEvent, WorkerKvQueryRequest},
protocols::*, protocols::*,
worker_query::start_worker_kv_query_endpoint,
}; };
use dynamo_runtime::config::environment_names::nats as env_nats; use dynamo_runtime::config::environment_names::nats as env_nats;
...@@ -173,11 +170,10 @@ impl KvEventPublisher { ...@@ -173,11 +170,10 @@ impl KvEventPublisher {
.drt() .drt()
.runtime() .runtime()
.secondary() .secondary()
.spawn(start_worker_kv_query_service( .spawn(start_worker_kv_query_endpoint(
component, component,
worker_id, worker_id,
local_indexer, local_indexer,
cancellation_token.clone(),
)) ))
}); });
...@@ -311,80 +307,6 @@ async fn start_event_processor<P: EventPublisher + Send + Sync + 'static>( ...@@ -311,80 +307,6 @@ async fn start_event_processor<P: EventPublisher + Send + Sync + 'static>(
} }
} }
// Processor for Router -> LocalKvIndexer query service
async fn start_worker_kv_query_service(
component: Component,
worker_id: u64,
local_indexer: Arc<LocalKvIndexer>,
cancellation_token: CancellationToken,
) {
// Create NATS subscriber on a subject specific to worker's id
let subject = format!("{}.{}", WORKER_KV_INDEXER_QUERY_SUBJECT, worker_id);
let mut subscriber = match component.subscribe(&subject).await {
Ok(sub) => sub,
Err(e) => {
tracing::error!(
"Query service failed to subscribe for worker {worker_id} on subject {subject}: {e}"
);
return;
}
};
tracing::info!("Query service listening on NATS for worker {worker_id} on subject {subject}");
// Receive query request from router, retrieve event(s) from LocalKvIndexer, return response
loop {
tokio::select! {
_ = cancellation_token.cancelled() => {
tracing::info!("Query service received cancellation signal for worker {worker_id}");
break;
}
msg = subscriber.next() => {
let Some(msg) = msg else {
tracing::warn!("Query service NATS stream ended for worker {worker_id}");
break;
};
// deserialize from msg (async_nats::Message)
let request: WorkerKvQueryRequest = match serde_json::from_slice(&msg.payload) {
Ok(request) => request,
Err(e) => {
tracing::error!("Failed to deserialize WorkerKvQueryRequest for worker {worker_id}: {e}");
continue;
}
};
tracing::debug!("Received query request for worker {worker_id}: {request:?}");
// Query events based on optional start/end ids
let response = local_indexer
.get_events_in_id_range(request.start_event_id, request.end_event_id)
.await;
// Send reply back (if reply subject exists)
if let Some(reply_subject) = msg.reply {
let payload = match serde_json::to_vec(&response) {
Ok(p) => p,
Err(e) => {
tracing::error!("Failed to serialize response for worker {worker_id}: {e}");
continue;
}
};
// Publish through DRT/NATS directly instead of namespace (adds a prefix)
if let Err(e) = component
.drt()
.kv_router_nats_publish(reply_subject.to_string(), payload.into())
.await
{
tracing::error!("Failed to send reply for worker {worker_id}: {e}");
}
}
}
}
}
}
/// Calculate exponential backoff duration based on consecutive error count /// Calculate exponential backoff duration based on consecutive error count
fn calculate_backoff_ms(consecutive_errors: u32) -> u64 { fn calculate_backoff_ms(consecutive_errors: u32) -> u64 {
std::cmp::min( std::cmp::min(
...@@ -1864,6 +1786,9 @@ mod tests_startup_helpers { ...@@ -1864,6 +1786,9 @@ mod tests_startup_helpers {
let missed_events = match response { let missed_events = match response {
crate::kv_router::indexer::WorkerKvQueryResponse::Events(e) => e, crate::kv_router::indexer::WorkerKvQueryResponse::Events(e) => e,
crate::kv_router::indexer::WorkerKvQueryResponse::TreeDump(e) => e, crate::kv_router::indexer::WorkerKvQueryResponse::TreeDump(e) => e,
crate::kv_router::indexer::WorkerKvQueryResponse::Error(message) => {
panic!("Unexpected error response: {message}")
}
other => panic!("Unexpected response: {:?}", other), other => panic!("Unexpected response: {:?}", other),
}; };
assert_eq!( assert_eq!(
......
...@@ -232,6 +232,9 @@ pub async fn recover_from_worker( ...@@ -232,6 +232,9 @@ pub async fn recover_from_worker(
WorkerKvQueryResponse::InvalidRange { start_id, end_id } => { WorkerKvQueryResponse::InvalidRange { start_id, end_id } => {
anyhow::bail!("Invalid range: end_id ({end_id}) < start_id ({start_id})"); anyhow::bail!("Invalid range: end_id ({end_id}) < start_id ({start_id})");
} }
WorkerKvQueryResponse::Error(message) => {
anyhow::bail!("Worker {worker_id} query failed: {message}");
}
}; };
let events_count = events.len(); let events_count = events.len();
......
...@@ -2,27 +2,34 @@ ...@@ -2,27 +2,34 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use dynamo_runtime::component::Component; use dynamo_runtime::component::Component;
use dynamo_runtime::traits::DistributedRuntimeProvider; use dynamo_runtime::pipeline::{
use dynamo_runtime::traits::events::EventPublisher; AsyncEngine, AsyncEngineContextProvider, ManyOut, PushRouter, ResponseStream, RouterMode,
use tokio::sync::watch; SingleIn, async_trait, network::Ingress,
};
use dynamo_runtime::protocols::maybe_error::MaybeError;
use tokio::sync::{OnceCell, watch};
use tokio_stream::StreamExt;
use crate::kv_router::WORKER_KV_INDEXER_QUERY_SUBJECT; use crate::kv_router::WORKER_KV_INDEXER_QUERY_ENDPOINT;
use crate::kv_router::indexer::{WorkerKvQueryRequest, WorkerKvQueryResponse}; use crate::kv_router::indexer::{LocalKvIndexer, WorkerKvQueryRequest, WorkerKvQueryResponse};
use crate::kv_router::protocols::WorkerId; use crate::kv_router::protocols::WorkerId;
use crate::local_model::runtime_config::ModelRuntimeConfig; use crate::local_model::runtime_config::ModelRuntimeConfig;
use dynamo_runtime::stream;
/// Router-side client for querying worker local KV indexers /// Router-side client for querying worker local KV indexers
/// ///
/// Performs request/reply communication with workers via NATS. /// Performs request/reply communication with workers via request plane endpoint routing.
/// (Only queries workers that have `enable_local_indexer=true` in their MDC user_data) /// (Only queries workers that have `enable_local_indexer=true` in their MDC user_data)
/// The client is spawned by KvRouter; it watches same discovery stream as the router. /// The client is spawned by KvRouter; it watches same discovery stream as the router.
pub struct WorkerQueryClient { pub struct WorkerQueryClient {
component: Component, component: Component,
/// Watch receiver for enable_local_indexer state per worker /// Watch receiver for enable_local_indexer state per worker
model_runtime_config_rx: watch::Receiver<HashMap<WorkerId, ModelRuntimeConfig>>, model_runtime_config_rx: watch::Receiver<HashMap<WorkerId, ModelRuntimeConfig>>,
router: OnceCell<Arc<PushRouter<WorkerKvQueryRequest, WorkerKvQueryResponse>>>,
} }
impl WorkerQueryClient { impl WorkerQueryClient {
...@@ -34,6 +41,7 @@ impl WorkerQueryClient { ...@@ -34,6 +41,7 @@ impl WorkerQueryClient {
Self { Self {
component, component,
model_runtime_config_rx, model_runtime_config_rx,
router: OnceCell::new(),
} }
} }
...@@ -61,38 +69,183 @@ impl WorkerQueryClient { ...@@ -61,38 +69,183 @@ impl WorkerQueryClient {
); );
} }
// Match worker's subscribe format let router = self
let subject_str = format!("{}.{worker_id}", WORKER_KV_INDEXER_QUERY_SUBJECT); // see publisher.rs/start_worker_kv_query_service() .router
let subject = format!("{}.{subject_str}", self.component.subject()); .get_or_try_init(|| async {
let endpoint = self.component.endpoint(WORKER_KV_INDEXER_QUERY_ENDPOINT);
let client = endpoint.client().await?;
let router = PushRouter::from_client(client, RouterMode::RoundRobin).await?;
Ok::<_, anyhow::Error>(Arc::new(router))
})
.await?;
tracing::debug!(
"Router sending query request to worker {worker_id} on NATS subject: {subject}"
);
// Create and serialize request
let request = WorkerKvQueryRequest { let request = WorkerKvQueryRequest {
worker_id, worker_id,
start_event_id, start_event_id,
end_event_id, end_event_id,
}; };
let request_bytes = let mut stream = router
serde_json::to_vec(&request).context("Failed to serialize WorkerKvQueryRequest")?; .direct(SingleIn::new(request), worker_id)
// Send NATS request with timeout using DRT helper
let timeout = tokio::time::Duration::from_secs(1);
let response_msg = self
.component
.drt()
.kv_router_nats_request(subject.clone(), request_bytes.into(), timeout)
.await .await
.with_context(|| { .with_context(|| {
format!("Failed to send request to worker {worker_id} on subject {subject}") format!("Failed to send worker KV query request to worker {worker_id} via endpoint")
})?; })?;
// Deserialize response let response = stream
let response: WorkerKvQueryResponse = serde_json::from_slice(&response_msg.payload) .next()
.context("Failed to deserialize WorkerKvQueryResponse")?; .await
.context("Worker KV query returned an empty response stream")?;
if let Some(err) = response.err() {
return Err(err).context("Worker KV query response error");
}
Ok(response) Ok(response)
} }
} }
// Worker-side endpoint registration for Router -> LocalKvIndexer query service
pub(crate) async fn start_worker_kv_query_endpoint(
component: Component,
worker_id: u64,
local_indexer: Arc<LocalKvIndexer>,
) {
let engine = Arc::new(WorkerKvQueryEngine {
worker_id,
local_indexer,
});
let ingress = match Ingress::for_engine(engine) {
Ok(ingress) => ingress,
Err(e) => {
tracing::error!(
"Failed to build WorkerKvQuery endpoint handler for worker {worker_id}: {e}"
);
return;
}
};
tracing::info!(
"WorkerKvQuery endpoint starting for worker {worker_id} on endpoint '{}'",
WORKER_KV_INDEXER_QUERY_ENDPOINT
);
if let Err(e) = component
.endpoint(WORKER_KV_INDEXER_QUERY_ENDPOINT)
.endpoint_builder()
.handler(ingress)
.graceful_shutdown(true)
.start()
.await
{
tracing::error!("WorkerKvQuery endpoint failed for worker {worker_id}: {e}");
}
}
struct WorkerKvQueryEngine {
worker_id: u64,
local_indexer: Arc<LocalKvIndexer>,
}
#[async_trait]
impl AsyncEngine<SingleIn<WorkerKvQueryRequest>, ManyOut<WorkerKvQueryResponse>, anyhow::Error>
for WorkerKvQueryEngine
{
async fn generate(
&self,
request: SingleIn<WorkerKvQueryRequest>,
) -> anyhow::Result<ManyOut<WorkerKvQueryResponse>> {
let (request, ctx) = request.into_parts();
tracing::debug!(
"Received query request for worker {}: {:?}",
self.worker_id,
request
);
// This is a sanity check to ensure the request is for the correct worker.
// In production, this should never happen since the router should only
// send requests to the worker it is associated with.
if request.worker_id != self.worker_id {
let error_message = format!(
"WorkerKvQueryEngine::generate worker_id mismatch: request.worker_id={} this.worker_id={}",
request.worker_id, self.worker_id
);
let response = WorkerKvQueryResponse::Error(error_message);
return Ok(ResponseStream::new(
Box::pin(stream::iter(vec![response])),
ctx.context(),
));
}
let response = self
.local_indexer
.get_events_in_id_range(request.start_event_id, request.end_event_id)
.await;
Ok(ResponseStream::new(
Box::pin(stream::iter(vec![response])),
ctx.context(),
))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::kv_router::RouterEvent;
use crate::kv_router::indexer::KvIndexerMetrics;
use crate::kv_router::protocols::{KvCacheEvent, KvCacheEventData};
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
#[tokio::test]
async fn test_worker_kv_query_engine_returns_buffered_events() {
let worker_id = 7u64;
let token = CancellationToken::new();
let metrics = Arc::new(KvIndexerMetrics::new_unregistered());
let local_indexer = Arc::new(LocalKvIndexer::new(token, 4, metrics, 32));
let event = RouterEvent::new(
worker_id,
KvCacheEvent {
event_id: 1,
data: KvCacheEventData::Cleared,
dp_rank: 0,
},
);
local_indexer
.apply_event_with_buffer(event)
.await
.expect("apply_event_with_buffer should succeed");
let engine = WorkerKvQueryEngine {
worker_id,
local_indexer,
};
let request = WorkerKvQueryRequest {
worker_id,
start_event_id: Some(1),
end_event_id: Some(1),
};
let mut stream = engine
.generate(SingleIn::new(request))
.await
.expect("generate should succeed");
let response = stream
.next()
.await
.expect("response stream should yield one item");
match response {
WorkerKvQueryResponse::Events(events) => {
assert_eq!(events.len(), 1);
assert_eq!(events[0].event.event_id, 1);
}
other => panic!("Unexpected response: {other:?}"),
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment