Unverified Commit bba70a41 authored by Thomas Montfort's avatar Thomas Montfort Committed by GitHub
Browse files

feat: standalone KV indexer runtime integration (#7295)

parent 3718da8c
......@@ -35,6 +35,7 @@ _KV_ROUTER_FIELDS: tuple[str, ...] = (
"router_event_threads",
"router_enable_cache_control",
"router_queue_policy",
"remote_indexer_component",
)
......@@ -58,6 +59,7 @@ class KvRouterConfigBase(ConfigBase):
router_event_threads: int
router_enable_cache_control: bool
router_queue_policy: str
remote_indexer_component: Optional[str]
def kv_router_kwargs(self) -> dict:
"""Return a dict suitable for ``KvRouterConfig(**kwargs)``."""
......@@ -269,3 +271,15 @@ class KvRouterArgGroup(ArgGroup):
arg_type=str,
choices=["fcfs", "wspt"],
)
add_argument(
g,
flag_name="--remote-indexer-component",
env_var="DYN_REMOTE_INDEXER_COMPONENT",
default=None,
help=(
"[EXPERIMENTAL] KV Router: Component name of a standalone KV indexer to use for overlap scoring. "
"When set, the router queries the standalone indexer via the request plane instead "
"of maintaining a local radix tree (e.g. 'kv-indexer')."
),
arg_type=str,
)
......@@ -471,6 +471,13 @@ def parse_args() -> argparse.Namespace:
default=os.environ.get("DYN_REQUEST_PLANE", "tcp"),
help="Determines how requests are distributed from routers to workers. 'tcp' is fastest [nats|http|tcp]",
)
parser.add_argument(
"--event-plane",
type=str,
choices=["nats", "zmq"],
default=os.environ.get("DYN_EVENT_PLANE", "nats"),
help="Determines how events are published [nats|zmq]",
)
args = parser.parse_args()
validate_worker_type_args(args)
......
......@@ -18,6 +18,7 @@ import uvloop
os.environ.setdefault("DYN_COMPUTE_THREADS", "0")
from dynamo.common.utils.runtime import create_runtime
from dynamo.llm import (
EngineType,
EntrypointArgs,
......@@ -26,7 +27,6 @@ from dynamo.llm import (
make_engine,
run_input,
)
from dynamo.runtime import DistributedRuntime
from dynamo.runtime.logging import configure_dynamo_logging
from .args import create_temp_engine_args_file, parse_args, resolve_planner_profile_data
......@@ -193,7 +193,6 @@ async def launch_workers(args: argparse.Namespace, extra_engine_args_path: Path)
- Independent service registration and stats scraping
- But still sharing the same tokio runtime (efficient)
"""
loop = asyncio.get_running_loop()
futures = []
runtimes = []
per_worker_temp_files: list[Path] = []
......@@ -227,10 +226,12 @@ async def launch_workers(args: argparse.Namespace, extra_engine_args_path: Path)
logger.info(f"Creating mocker worker {worker_id + 1}/{args.num_workers}")
# Create a separate DistributedRuntime for this worker (on same event loop)
runtime = DistributedRuntime(
loop,
runtime, loop = create_runtime(
args.discovery_backend,
args.request_plane,
args.event_plane,
True, # statically set to True, just determines to enable_nats if event_plane is nats
)
runtimes.append(runtime)
......
......@@ -7,7 +7,10 @@ subtitle: Run the KV cache indexer as an independent HTTP service for querying b
## Overview
The standalone KV indexer (`dynamo-kv-indexer`) is a lightweight HTTP binary that subscribes to ZMQ KV event streams from workers, maintains a radix tree of cached blocks, and exposes HTTP endpoints for querying and managing workers.
The standalone KV indexer (`dynamo-kv-indexer`) is a lightweight binary that maintains a radix tree of cached blocks and exposes HTTP endpoints for querying and managing workers. It supports two operational modes:
- **Standalone mode** (default): Subscribes to ZMQ KV event streams directly from workers. No Dynamo runtime dependencies required.
- **Dynamo runtime mode** (`--dynamo-runtime`): Integrates with the Dynamo runtime for automatic worker discovery via MDC, KV event ingestion via the event plane (NATS or ZMQ), and serves indexer queries over the request plane for remote frontends.
This is distinct from the [Standalone Router](../../../components/src/dynamo/router/README.md), which is a full routing service. The standalone indexer provides only the indexing and query layer without routing logic.
......@@ -23,7 +26,9 @@ The indexer maintains one radix tree per `(model_name, tenant_id)` pair. Workers
## Compatibility
The standalone indexer works with any engine that publishes KV cache events over ZMQ in the expected msgpack format. This includes bare vLLM and SGLang engines, which emit ZMQ KV events natively — no Dynamo-specific wrapper is required.
In standalone mode, the indexer works with any engine that publishes KV cache events over ZMQ in the expected msgpack format. This includes bare vLLM and SGLang engines, which emit ZMQ KV events natively — no Dynamo-specific wrapper is required.
In Dynamo runtime mode, the indexer discovers workers automatically via MDC and receives KV events through the event plane. It also registers a query endpoint on the request plane, allowing frontends to query overlap scores remotely without needing direct HTTP access.
## Use Cases
......@@ -31,6 +36,7 @@ The standalone indexer works with any engine that publishes KV cache events over
- **State verification**: Confirm that the indexer's view of KV cache state matches the router's internal state (used in integration tests).
- **Custom routing**: Build external routing logic that queries the indexer for overlap scores and makes its own worker selection decisions.
- **Monitoring**: Observe KV cache distribution across workers without running a full router.
- **Remote indexing**: In Dynamo runtime mode, frontends can offload KV cache indexing to a dedicated service and query it over the request plane.
## P2P Recovery
......@@ -75,18 +81,56 @@ Peers can be registered at startup via `--peers` or dynamically via the HTTP API
## Building
The binary is a feature-gated target in the `dynamo-kv-router` crate:
The binary is a feature-gated target in the `dynamo-kv-router` crate. The available cargo features control which capabilities are compiled in:
| Feature | Description |
|---------|-------------|
| `standalone-indexer` | Core standalone indexer library (HTTP server, ZMQ listeners, P2P recovery) |
| `metrics` | Prometheus metrics (`/metrics` endpoint, request/worker gauges) |
| `indexer-bin` | CLI binary target |
| `indexer-runtime` | Dynamo runtime integration (discovery, event plane, request plane) |
| `test-endpoints` | Test-only endpoints (`/test/pause_listener`, `/test/resume_listener`) |
### Standalone build (no runtime dependency)
```bash
cargo build -p dynamo-kv-router --features indexer-bin --bin dynamo-kv-indexer
```
This produces a binary with no `dynamo-runtime` dependency. It supports ZMQ event listeners, HTTP API, and P2P recovery.
### Standalone build with metrics
```bash
cargo build -p dynamo-kv-router --features indexer-bin,metrics --bin dynamo-kv-indexer
```
Adds Prometheus metrics support (`/metrics` endpoint). Pulls in `dynamo-runtime` for the metrics implementation.
### Runtime-enabled build
```bash
cargo build -p dynamo-kv-router --features indexer-bin,indexer-runtime --bin dynamo-kv-indexer
```
Enables the `--dynamo-runtime` CLI flag for MDC discovery, event plane subscription, and request plane query endpoint. Includes metrics.
## CLI
### Standalone mode (default)
```bash
dynamo-kv-indexer --port 8090 [--threads 4] [--block-size 16 --model-name my-model --tenant-id default --workers "1=tcp://host:5557,2:1=tcp://host:5558"] [--peers "http://peer1:8090,http://peer2:8091"]
```
### Dynamo runtime mode (requires `indexer-runtime` feature)
```bash
dynamo-kv-indexer --dynamo-runtime --namespace default --component-name kv-indexer --worker-component backend --port 8090 [--threads 4]
```
In runtime mode, workers are discovered automatically via MDC. The `--workers` flag can still be used to register additional static workers alongside discovered ones.
| Flag | Default | Description |
|------|---------|-------------|
| `--block-size` | (none) | KV cache block size for initial `--workers` (required when `--workers` is set) |
......@@ -96,6 +140,10 @@ dynamo-kv-indexer --port 8090 [--threads 4] [--block-size 16 --model-name my-mod
| `--model-name` | `default` | Model name for initial `--workers` |
| `--tenant-id` | `default` | Tenant ID for initial `--workers` |
| `--peers` | (none) | Comma-separated peer indexer URLs for P2P recovery on startup |
| `--dynamo-runtime` | `false` | Enable Dynamo runtime integration (requires `indexer-runtime` feature) |
| `--namespace` | `default` | Dynamo namespace to register the indexer component under (runtime mode) |
| `--component-name` | `kv-indexer` | Component name for this indexer in the Dynamo runtime (runtime mode) |
| `--worker-component` | `backend` | Component name that workers register under, for event plane subscription (runtime mode) |
## HTTP API
......@@ -109,7 +157,7 @@ curl http://localhost:8090/health
### `GET /metrics` — Prometheus metrics
Returns metrics in Prometheus text exposition format. Available when the binary is built with the `metrics` feature (enabled by default via `standalone-indexer`).
Returns metrics in Prometheus text exposition format. Available when the binary is built with the `metrics` or `indexer-runtime` feature.
```bash
curl http://localhost:8090/metrics
......@@ -313,13 +361,44 @@ If no `replay_endpoint` is configured, gaps are logged as warnings but not recov
The sequence counter (`last_seq`) persists across unregister/register cycles, so re-registering a worker after a gap will trigger replay on the first batch received by the new listener.
## Dynamo Runtime Mode
When started with `--dynamo-runtime`, the indexer integrates with the Dynamo distributed runtime:
### Worker Discovery
The indexer watches MDC (Model Discovery Catalog) for worker additions and removals. When a worker registers with MDC, the indexer automatically creates an indexer for its model and block size. Workers discovered via MDC are tracked separately from those registered via `--workers` or the `/register` HTTP API — a worker cannot be registered through both paths simultaneously.
### Event Plane Subscription
Instead of connecting directly to ZMQ PUB sockets on each worker, the indexer subscribes to KV events through the Dynamo event plane. The transport (NATS or ZMQ) is determined by the `DYNAMO_EVENT_TRANSPORT` environment variable. Events are routed to the appropriate indexer based on the worker ID.
### Request Plane Query Endpoint
The indexer registers a query endpoint on the Dynamo request plane, allowing frontends to send `IndexerQueryRequest` messages containing a model name, namespace, and block hashes. The indexer looks up the appropriate radix tree and returns overlap scores. This enables frontends to use a remote indexer for KV-aware routing without direct HTTP access.
### Example
```bash
# Start the indexer with runtime integration
dynamo-kv-indexer --dynamo-runtime \
--namespace my-namespace \
--component-name kv-indexer \
--worker-component backend \
--port 8090 --threads 4
```
The HTTP API remains fully available in runtime mode. Static workers can be added via `--workers` alongside discovered workers.
## Limitations
- **ZMQ only**: Workers must publish KV events via ZMQ PUB sockets. The standalone indexer does not subscribe to NATS event streams.
- **Standalone mode is ZMQ only**: In standalone mode, workers must publish KV events via ZMQ PUB sockets. Build with `indexer-runtime` and use `--dynamo-runtime` to receive events via the event plane (NATS or ZMQ).
- **No routing logic**: The indexer only maintains the radix tree and answers queries. It does not track active blocks, manage request lifecycle, or perform worker selection.
## Architecture
### Standalone Mode
```mermaid
graph TD
subgraph Workers
......@@ -353,6 +432,62 @@ graph TD
style CLIENT fill:#fff3e0,stroke:#333,color:#333
```
### Dynamo Runtime Mode
```mermaid
graph TD
subgraph Workers
W1[Worker 1]
W2[Worker 2]
end
subgraph "Dynamo Runtime"
MDC[MDC Discovery]
EP[Event Plane<br/>NATS / ZMQ]
RP[Request Plane]
end
subgraph "Standalone Indexer"
DISC[Discovery Watcher]
SUB[Event Subscriber]
REG[Worker Registry]
IDX["Indexer Map<br/>(model, tenant) → Radix Tree"]
QE[Query Endpoint]
HTTP[HTTP API<br/>/query /dump /register]
end
FRONTEND[Frontend / Router]
CLIENT[External Client]
W1 -->|register| MDC
W2 -->|register| MDC
MDC -->|added/removed| DISC
DISC -->|add/remove workers| REG
W1 -->|KV events| EP
W2 -->|KV events| EP
EP -->|RouterEvent| SUB
SUB -->|apply events| IDX
FRONTEND -->|IndexerQueryRequest| RP
RP --> QE
QE -->|query| IDX
CLIENT -->|POST /query, GET /dump| HTTP
HTTP -->|query| IDX
style W1 fill:#f3e5f5,stroke:#333,color:#333
style W2 fill:#f3e5f5,stroke:#333,color:#333
style MDC fill:#e3f2fd,stroke:#333,color:#333
style EP fill:#e3f2fd,stroke:#333,color:#333
style RP fill:#e3f2fd,stroke:#333,color:#333
style IDX fill:#2e8b57,stroke:#333,color:#fff
style SUB fill:#2e8b57,stroke:#333,color:#fff
style DISC fill:#2e8b57,stroke:#333,color:#fff
style REG fill:#2e8b57,stroke:#333,color:#fff
style QE fill:#2e8b57,stroke:#333,color:#fff
style HTTP fill:#2e8b57,stroke:#333,color:#fff
style FRONTEND fill:#fff3e0,stroke:#333,color:#333
style CLIENT fill:#fff3e0,stroke:#333,color:#333
```
### P2P Recovery Flow
```mermaid
......
......@@ -691,8 +691,9 @@ pub unsafe extern "C" fn create_routers(
.kv_chooser_for(
&endpoint,
block_size,
Some(kv_router_config),
Some(kv_router_config.clone()),
WORKER_TYPE_DECODE,
Some(model_name.clone()),
)
.await
{
......
......@@ -40,21 +40,21 @@ pub enum EngineType {
}
#[pyclass]
#[derive(Default, Clone, Debug, Copy)]
#[derive(Default, Clone, Debug)]
pub struct KvRouterConfig {
inner: RsKvRouterConfig,
}
impl KvRouterConfig {
pub fn inner(&self) -> RsKvRouterConfig {
self.inner
self.inner.clone()
}
}
#[pymethods]
impl KvRouterConfig {
#[new]
#[pyo3(signature = (overlap_score_weight=1.0, router_temperature=0.0, use_kv_events=true, durable_kv_events=false, router_replica_sync=false, router_track_active_blocks=true, router_track_output_blocks=false, router_assume_kv_reuse=true, router_snapshot_threshold=1000000, router_reset_states=false, router_ttl_secs=120.0, router_max_tree_size=1048576, router_prune_target_ratio=0.8, router_queue_threshold=Some(2.0), router_event_threads=4, router_enable_cache_control=false, router_queue_policy="fcfs"))]
#[pyo3(signature = (overlap_score_weight=1.0, router_temperature=0.0, use_kv_events=true, durable_kv_events=false, router_replica_sync=false, router_track_active_blocks=true, router_track_output_blocks=false, router_assume_kv_reuse=true, router_snapshot_threshold=1000000, router_reset_states=false, router_ttl_secs=120.0, router_max_tree_size=1048576, router_prune_target_ratio=0.8, router_queue_threshold=Some(2.0), router_event_threads=4, router_enable_cache_control=false, router_queue_policy="fcfs", remote_indexer_component=None))]
#[allow(clippy::too_many_arguments)]
fn new(
overlap_score_weight: f64,
......@@ -74,6 +74,7 @@ impl KvRouterConfig {
router_event_threads: u32,
router_enable_cache_control: bool,
router_queue_policy: &str,
remote_indexer_component: Option<String>,
) -> Self {
KvRouterConfig {
inner: RsKvRouterConfig {
......@@ -96,6 +97,7 @@ impl KvRouterConfig {
router_queue_policy: router_queue_policy.parse().unwrap_or_else(|_| {
panic!("invalid router_queue_policy: {router_queue_policy:?}")
}),
remote_indexer_component,
},
}
}
......
......@@ -591,12 +591,51 @@ async fn create_kv_router_from_endpoint(
} else {
llm_rs::discovery::WORKER_TYPE_DECODE
};
// Only query discovery for model_name when a remote indexer is configured,
// since model_name is only needed for the RemoteIndexer path.
let needs_model_name = kv_router_config
.as_ref()
.map(|cfg| cfg.remote_indexer_component.is_some())
.unwrap_or(false);
let model_name = if needs_model_name {
let discovery = endpoint.inner.component().drt().discovery();
let instances = discovery
.list(rs::discovery::DiscoveryQuery::EndpointModels {
namespace: endpoint_id.namespace.clone(),
component: endpoint_id.component.clone(),
endpoint: endpoint_id.name.clone(),
})
.await
.map_err(to_pyerr)?;
Some(
instances
.into_iter()
.find_map(|inst| {
inst.deserialize_model::<llm_rs::model_card::ModelDeploymentCard>()
.ok()
.map(|card| card.display_name)
})
.ok_or_else(|| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
"no model card found in discovery for endpoint {}/{}/{}",
endpoint_id.namespace, endpoint_id.component, endpoint_id.name
))
})?,
)
} else {
None
};
let kv_router = model_manager
.kv_chooser_for(
&endpoint.inner,
block_size as u32,
kv_router_config,
worker_type,
model_name,
)
.await
.map_err(to_pyerr)?;
......
......@@ -14,9 +14,10 @@ repository.workspace = true
default = []
metrics = ["dep:dynamo-runtime"]
bench = ["dep:clap", "dep:indicatif", "dep:serde_json", "dep:plotters"]
standalone-indexer = ["metrics", "dep:axum", "dep:bytes", "dep:zeromq", "dep:serde_json", "dep:reqwest"]
standalone-indexer = ["dep:axum", "dep:bytes", "dep:zeromq", "dep:serde_json", "dep:reqwest"]
indexer-bin = ["standalone-indexer", "dep:clap", "dep:tracing-subscriber"]
test-endpoints = ["indexer-bin"]
indexer-runtime = ["metrics", "standalone-indexer"]
[dependencies]
# repo
......@@ -57,8 +58,9 @@ rustc-hash = "2.1.1"
axum = { workspace = true, optional = true }
bytes = { workspace = true, optional = true }
reqwest = { workspace = true, optional = true }
zeromq = { version = "0.4.1", optional = true }
tracing-subscriber = { workspace = true, optional = true }
zeromq = { version = "0.4.1", optional = true }
[package.metadata.cargo-machete]
ignored = ["indicatif", "plotters"]
......@@ -67,7 +69,7 @@ ignored = ["indicatif", "plotters"]
rstest = "0.18.2"
rstest_reuse = "0.7.0"
serde_json = { workspace = true }
tokio = { workspace = true, features = ["rt", "macros", "time"] }
tokio = { workspace = true, features = ["rt", "macros", "time", "test-util"] }
dynamo-tokens = { workspace = true }
[[bin]]
......
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::sync::Arc;
use clap::Parser;
use tokio::net::TcpListener;
use tokio_util::sync::CancellationToken;
use dynamo_kv_router::standalone_indexer::{
self, recovery,
registry::WorkerRegistry,
server::{AppState, create_router},
};
use dynamo_kv_router::standalone_indexer::{self, IndexerConfig};
#[cfg(feature = "indexer-runtime")]
mod runtime;
#[derive(Parser)]
#[command(name = "dynamo-kv-indexer", about = "Standalone KV cache indexer")]
......@@ -35,27 +46,245 @@ struct Cli {
/// Comma-separated peer URLs for P2P recovery (e.g. "http://host1:8090,http://host2:8091")
#[arg(long)]
peers: Option<String>,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.init();
/// Enable Dynamo runtime integration (discovery, event plane, request plane).
/// When enabled, workers are discovered via MDC and events arrive via the event plane.
/// Also enables router to configure a remote indexer via the request plane.
#[cfg(feature = "indexer-runtime")]
#[arg(long)]
dynamo_runtime: bool,
/// Dynamo namespace to register the indexer component under
#[cfg(feature = "indexer-runtime")]
#[arg(long, default_value = "default")]
namespace: String,
/// Component name for this indexer in the Dynamo runtime
#[cfg(feature = "indexer-runtime")]
#[arg(long, default_value = "kv-indexer")]
component_name: String,
/// Component name that workers register under (for event plane subscription)
#[cfg(feature = "indexer-runtime")]
#[arg(long, default_value = "backend")]
worker_component: String,
}
fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
standalone_indexer::run_server(IndexerConfig {
block_size: cli.block_size,
port: cli.port,
threads: cli.threads,
workers: cli.workers,
model_name: cli.model_name,
tenant_id: cli.tenant_id,
peers: cli.peers,
})
.await
#[cfg(feature = "indexer-runtime")]
if cli.dynamo_runtime {
// Full Dynamo runtime mode: discovery, event plane, request plane
dynamo_runtime::logging::init();
let worker = dynamo_runtime::Worker::from_settings()?;
return worker.execute(move |runtime| app_with_runtime(runtime, cli));
}
// Standalone HTTP-only mode: no runtime dependencies
tracing_subscriber::fmt::init();
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(app_standalone(cli))
}
async fn app_standalone(cli: Cli) -> anyhow::Result<()> {
let cancel_token = CancellationToken::new();
// Install signal handler for graceful shutdown
let shutdown_token = cancel_token.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok();
tracing::info!("Received shutdown signal");
shutdown_token.cancel();
});
tracing::info!(
block_size = ?cli.block_size,
port = cli.port,
threads = cli.threads,
model_name = %cli.model_name,
tenant_id = %cli.tenant_id,
num_peers = cli.peers.as_ref().map(|p| p.split(',').count()).unwrap_or(0),
"Starting standalone KV cache indexer (HTTP-only mode)"
);
let registry = Arc::new(WorkerRegistry::new(cli.threads));
run_common(&cli, &registry, cancel_token).await
}
#[cfg(feature = "indexer-runtime")]
async fn app_with_runtime(runtime: dynamo_runtime::Runtime, cli: Cli) -> anyhow::Result<()> {
use dynamo_kv_router::indexer::{
IndexerQueryRequest, IndexerQueryResponse, KV_INDEXER_QUERY_ENDPOINT,
};
use dynamo_runtime::{
DistributedRuntime,
pipeline::{ManyOut, SingleIn, network::Ingress},
};
let distributed_runtime = DistributedRuntime::from_settings(runtime).await?;
let cancel_token = distributed_runtime.primary_token();
let component = distributed_runtime
.namespace(&cli.namespace)?
.component(&cli.component_name)?;
tracing::info!(
namespace = %cli.namespace,
component = %cli.component_name,
block_size = ?cli.block_size,
port = cli.port,
threads = cli.threads,
model_name = %cli.model_name,
tenant_id = %cli.tenant_id,
worker_component = %cli.worker_component,
num_peers = cli.peers.as_ref().map(|p| p.split(',').count()).unwrap_or(0),
"Starting standalone KV cache indexer (Dynamo runtime mode)"
);
let registry = Arc::new(WorkerRegistry::new(cli.threads));
let engine = Arc::new(runtime::query_engine::IndexerQueryEngine {
registry: registry.clone(),
});
let ingress =
Ingress::<SingleIn<IndexerQueryRequest>, ManyOut<IndexerQueryResponse>>::for_engine(
engine,
)?;
let query_endpoint = component
.endpoint(KV_INDEXER_QUERY_ENDPOINT)
.endpoint_builder()
.handler(ingress)
.graceful_shutdown(true);
distributed_runtime.runtime().secondary().spawn(async move {
if let Err(e) = query_endpoint.start().await {
tracing::error!(error = %e, "Query endpoint failed");
}
});
tracing::info!(
endpoint = KV_INDEXER_QUERY_ENDPOINT,
"Query endpoint registered"
);
runtime::discovery::spawn_discovery_watcher(
&distributed_runtime,
registry.clone(),
cancel_token.clone(),
)
.await?;
runtime::subscriber::spawn_event_subscriber(
&distributed_runtime,
&cli.namespace,
&cli.worker_component,
registry.clone(),
cancel_token.clone(),
)
.await?;
run_common(&cli, &registry, cancel_token).await
}
/// Shared logic for both standalone and runtime modes:
/// register CLI workers, P2P recovery, signal ready, start HTTP server.
async fn run_common(
cli: &Cli,
registry: &Arc<WorkerRegistry>,
cancel_token: CancellationToken,
) -> anyhow::Result<()> {
if let Some(ref workers_str) = cli.workers {
let block_size = cli.block_size.ok_or_else(|| {
anyhow::anyhow!("--block-size is required when --workers is specified")
})?;
for (instance_id, dp_rank, endpoint) in standalone_indexer::parse_workers(workers_str) {
tracing::info!(instance_id, dp_rank, endpoint, "Registering initial worker");
registry
.register(
instance_id,
endpoint,
dp_rank,
cli.model_name.clone(),
cli.tenant_id.clone(),
block_size,
None,
)
.await?;
}
}
let peers: Vec<String> = cli
.peers
.as_deref()
.map(|s| {
s.split(',')
.filter(|p| !p.is_empty())
.map(|p| p.trim().to_string())
.collect()
})
.unwrap_or_default();
// P2P recovery: fetch dump from a peer before starting ZMQ listeners.
if !peers.is_empty() {
match recovery::recover_from_peers(&peers, registry).await {
Ok(true) => tracing::info!("P2P recovery completed"),
Ok(false) => tracing::warn!("no reachable peers, starting with empty state"),
Err(e) => tracing::warn!(error = %e, "P2P recovery failed, starting with empty state"),
}
for peer in &peers {
registry.register_peer(peer.clone());
}
}
// Signal ready — unblocks all ZMQ listeners to start draining buffered events
registry.signal_ready();
#[cfg(feature = "metrics")]
let prom_registry = {
let r = prometheus::Registry::new();
dynamo_kv_router::standalone_indexer::metrics::register(&r)
.expect("failed to register indexer metrics");
r
};
let state = Arc::new(AppState {
registry: registry.clone(),
#[cfg(feature = "metrics")]
prom_registry,
});
let app = create_router(state);
let listener = TcpListener::bind(("0.0.0.0", cli.port)).await?;
tracing::info!("HTTP server listening on 0.0.0.0:{}", cli.port);
axum::serve(listener, app)
.with_graceful_shutdown(async move {
cancel_token.cancelled().await;
tracing::info!("Received shutdown signal, stopping HTTP server");
})
.await?;
Ok(())
}
#[cfg(test)]
mod tests {
use dynamo_kv_router::standalone_indexer::parse_workers;
#[test]
fn test_parse_workers() {
let input = "1=tcp://host:5557,2:1=tcp://host:5558";
let result = parse_workers(input);
assert_eq!(result.len(), 2);
assert_eq!(result[0], (1, 0, "tcp://host:5557".to_string()));
assert_eq!(result[1], (2, 1, "tcp://host:5558".to_string()));
}
#[test]
fn test_parse_workers_empty() {
assert!(parse_workers("").is_empty());
}
}
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::sync::Arc;
use dynamo_runtime::stream::StreamExt;
use dynamo_runtime::{
DistributedRuntime,
discovery::{
DiscoveryEvent, DiscoveryInstance, DiscoveryInstanceId, DiscoveryQuery, DiscoveryStream,
},
};
use serde::Deserialize;
use tokio_util::sync::CancellationToken;
use dynamo_kv_router::standalone_indexer::registry::WorkerRegistry;
/// Minimal subset of ModelDeploymentCard — only the fields the indexer needs.
/// Using `#[serde(default)]` on optional fields lets us safely ignore the rest.
#[derive(Deserialize, Debug)]
struct PartialModelCard {
pub display_name: String,
#[serde(default)]
pub kv_cache_block_size: u32,
}
/// Spawn a background task that watches MDC discovery for worker additions/removals
/// and updates the WorkerRegistry accordingly.
pub async fn spawn_discovery_watcher(
drt: &DistributedRuntime,
registry: Arc<WorkerRegistry>,
cancel_token: CancellationToken,
) -> anyhow::Result<()> {
let discovery = drt.discovery();
let mut stream: DiscoveryStream = discovery
.list_and_watch(DiscoveryQuery::AllModels, Some(cancel_token.clone()))
.await?;
tokio::spawn(async move {
tracing::info!("Discovery watcher started");
while let Some(result) = stream.next().await {
let event = match result {
Ok(event) => event,
Err(err) => {
tracing::error!(%err, "Error in discovery stream");
continue;
}
};
match event {
DiscoveryEvent::Added(instance) => {
let (instance_id, namespace, card) = match &instance {
DiscoveryInstance::Model {
instance_id,
namespace,
..
} => match instance.deserialize_model::<PartialModelCard>() {
Ok(card) => (*instance_id, namespace.clone(), card),
Err(err) => {
tracing::error!(%err, instance_id, "Failed to deserialize model card");
continue;
}
},
_ => {
tracing::debug!("Ignoring non-model discovery instance");
continue;
}
};
let model_name = card.display_name.clone();
let block_size = card.kv_cache_block_size;
// Use the Dynamo namespace as the tenant_id
let tenant_id = namespace;
if block_size == 0 {
tracing::warn!(
instance_id,
model_name,
"Skipping worker with kv_cache_block_size=0"
);
continue;
}
tracing::info!(
instance_id,
model_name,
tenant_id,
block_size,
"Discovery: adding worker"
);
if let Err(e) = registry.add_worker_from_discovery(
instance_id,
model_name.clone(),
tenant_id,
block_size,
) {
tracing::error!(
instance_id,
model_name,
error = %e,
"Failed to add discovered worker"
);
}
}
DiscoveryEvent::Removed(id) => {
let instance_id = match &id {
DiscoveryInstanceId::Model(mcid) => mcid.instance_id,
_ => {
tracing::debug!("Ignoring non-model discovery removal");
continue;
}
};
tracing::info!(instance_id, "Discovery: removing worker");
registry.remove_worker_from_discovery(instance_id).await;
}
}
}
tracing::info!("Discovery watcher exiting");
});
Ok(())
}
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
pub mod discovery;
pub mod query_engine;
pub mod subscriber;
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::sync::Arc;
use anyhow::Result;
use dynamo_runtime::stream;
use dynamo_runtime::pipeline::{
AsyncEngine, AsyncEngineContextProvider, ManyOut, ResponseStream, SingleIn, async_trait,
};
use dynamo_kv_router::indexer::{IndexerQueryRequest, IndexerQueryResponse};
use dynamo_kv_router::standalone_indexer::registry::{IndexerKey, WorkerRegistry};
/// AsyncEngine that serves indexer queries over the request plane.
///
/// When a frontend sends an `IndexerQueryRequest` (model_name, namespace, block hashes),
/// this engine finds the appropriate indexer in the registry and returns overlap scores.
pub struct IndexerQueryEngine {
pub registry: Arc<WorkerRegistry>,
}
#[async_trait]
impl AsyncEngine<SingleIn<IndexerQueryRequest>, ManyOut<IndexerQueryResponse>, anyhow::Error>
for IndexerQueryEngine
{
async fn generate(
&self,
request: SingleIn<IndexerQueryRequest>,
) -> Result<ManyOut<IndexerQueryResponse>> {
let (req, ctx) = request.into_parts();
let key = IndexerKey {
model_name: req.model_name.clone(),
tenant_id: req.namespace.clone(),
};
let response = match self.registry.get_indexer(&key) {
Some(ie) => match ie.indexer.find_matches(req.block_hashes).await {
Ok(scores) => IndexerQueryResponse::Scores(scores.into()),
Err(e) => IndexerQueryResponse::Error(e.to_string()),
},
None => IndexerQueryResponse::Error(format!(
"no indexer for model={} namespace={}",
req.model_name, req.namespace
)),
};
let resp_stream = stream::iter(vec![response]);
Ok(ResponseStream::new(Box::pin(resp_stream), ctx.context()))
}
}
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::sync::Arc;
use anyhow::Result;
use tokio_util::sync::CancellationToken;
use dynamo_runtime::{
DistributedRuntime, discovery::EventTransportKind, transports::event_plane::EventSubscriber,
};
use dynamo_kv_router::protocols::{KV_EVENT_SUBJECT, RouterEvent};
use dynamo_kv_router::standalone_indexer::registry::WorkerRegistry;
/// Spawn a background task that subscribes to KV events from the worker component
/// via the event plane and applies them to the appropriate indexer in the registry.
pub async fn spawn_event_subscriber(
drt: &DistributedRuntime,
namespace: &str,
worker_component_name: &str,
registry: Arc<WorkerRegistry>,
cancel_token: CancellationToken,
) -> Result<()> {
let transport_kind = EventTransportKind::from_env_or_default();
// Create a Component reference for the worker component to subscribe to its events.
let worker_component = drt.namespace(namespace)?.component(worker_component_name)?;
let mut subscriber = EventSubscriber::for_component_with_transport(
&worker_component,
KV_EVENT_SUBJECT,
transport_kind,
)
.await?
.typed::<RouterEvent>();
let kv_event_subject = format!(
"namespace.{}.component.{}.{}",
namespace, worker_component_name, KV_EVENT_SUBJECT
);
match transport_kind {
EventTransportKind::Nats => {
tracing::info!(
subject = %kv_event_subject,
"KV Indexer subscribing to NATS Core events"
);
}
EventTransportKind::Zmq => {
tracing::info!(
subject = %kv_event_subject,
"KV Indexer subscribing to ZMQ event plane"
);
}
}
tokio::spawn(async move {
loop {
tokio::select! {
biased;
_ = cancel_token.cancelled() => {
tracing::debug!("Event subscriber received cancellation signal");
break;
}
Some(result) = subscriber.next() => {
let (_envelope, event) = match result {
Ok((envelope, event)) => (envelope, event),
Err(e) => {
tracing::warn!("Failed to receive RouterEvent from event plane: {e:?}");
continue;
}
};
let worker_id = event.worker_id;
// Apply the event to the indexer that tracks this worker.
// If the worker was discovered via MDC, it will be in the registry.
// If it was registered via --workers CLI, the indexer also exists.
if let Some(indexer) = registry.get_indexer_for_worker(worker_id) {
indexer.apply_event(event).await;
} else {
tracing::trace!(
worker_id,
"Received event for unknown worker (not yet discovered?)"
);
}
}
}
}
tracing::info!("Event subscriber exiting");
});
Ok(())
}
......@@ -89,6 +89,92 @@ impl MaybeError for WorkerKvQueryResponse {
}
}
// -------
// Standalone indexer query types (request plane)
// -------
/// Endpoint name for the standalone KV indexer query service.
pub const KV_INDEXER_QUERY_ENDPOINT: &str = "kv_indexer_query";
/// Request to query the standalone KV indexer for overlap scores.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct IndexerQueryRequest {
/// Model name to query the indexer for.
pub model_name: String,
/// Dynamo namespace (used as tenant_id for indexer lookup).
pub namespace: String,
/// Block hashes to find matches for in the radix tree.
pub block_hashes: Vec<LocalBlockHash>,
}
/// Wire-friendly overlap scores for JSON serialization.
/// `OverlapScores` uses `FxHashMap<WorkerWithDpRank, _>` which can't be
/// serialized as JSON (struct keys aren't valid JSON map keys), so we flatten
/// to vecs of tuples for the wire protocol.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct WireOverlapScores {
pub scores: Vec<(WorkerWithDpRank, u32)>,
pub frequencies: Vec<usize>,
pub tree_sizes: Vec<(WorkerWithDpRank, usize)>,
}
impl From<OverlapScores> for WireOverlapScores {
fn from(s: OverlapScores) -> Self {
Self {
scores: s.scores.into_iter().collect(),
frequencies: s.frequencies,
tree_sizes: s.tree_sizes.into_iter().collect(),
}
}
}
impl From<WireOverlapScores> for OverlapScores {
fn from(w: WireOverlapScores) -> Self {
Self {
scores: w.scores.into_iter().collect(),
frequencies: w.frequencies,
tree_sizes: w.tree_sizes.into_iter().collect(),
}
}
}
/// Response from the standalone KV indexer.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum IndexerQueryResponse {
/// Overlap scores per worker.
Scores(WireOverlapScores),
/// An error occurred processing the query.
Error(String),
}
#[cfg(feature = "metrics")]
impl MaybeError for IndexerQueryResponse {
fn from_err(err: impl std::error::Error + 'static) -> Self {
IndexerQueryResponse::Error(err.to_string())
}
fn err(&self) -> Option<DynamoError> {
match self {
IndexerQueryResponse::Error(msg) => Some(DynamoError::msg(msg.clone())),
_ => None,
}
}
}
#[cfg(not(feature = "metrics"))]
impl MaybeError for IndexerQueryResponse {
fn from_err(err: impl std::error::Error + 'static) -> Self {
IndexerQueryResponse::Error(err.to_string())
}
fn err(&self) -> Option<Box<dyn std::error::Error + Send + Sync>> {
match self {
IndexerQueryResponse::Error(msg) => Some(Box::new(std::io::Error::other(msg.clone()))),
_ => None,
}
}
}
/// A request to find matches in the Radix Tree.
pub struct MatchRequest {
/// A vector of `LocalBlockHash` representing the sequence to match.
......
......@@ -6,6 +6,9 @@ use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
use xxhash_rust::xxh3;
/// The event subject that workers publish KV cache events on.
pub const KV_EVENT_SUBJECT: &str = "kv-events";
/// Seed for XXH3 hashing, consistent with indexer.rs
pub const XXH3_SEED: u64 = 1337;
......
......@@ -57,7 +57,7 @@ pub struct RouterConfigOverride {
}
/// KV Router configuration parameters
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Validate)]
#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
#[validate(schema(function = "validate_kv_router_config"))]
pub struct KvRouterConfig {
#[validate(range(min = 0.0))]
......@@ -130,6 +130,13 @@ pub struct KvRouterConfig {
/// "fcfs" (default): first-come first-served with priority bumps — optimizes tail TTFT.
/// "wspt": weighted shortest processing time (Smith's rule) — optimizes average TTFT.
pub router_queue_policy: RouterQueuePolicy,
/// Component name of a standalone KV indexer to use for overlap scoring.
/// When set, the router creates a `Remote` indexer that queries the standalone
/// indexer via the request plane instead of maintaining a local radix tree.
/// The standalone indexer handles its own event subscription and discovery.
#[serde(default)]
pub remote_indexer_component: Option<String>,
}
impl Default for KvRouterConfig {
......@@ -152,6 +159,7 @@ impl Default for KvRouterConfig {
router_event_threads: 4,
router_enable_cache_control: false,
router_queue_policy: RouterQueuePolicy::default(),
remote_indexer_component: None,
}
}
}
......
......@@ -106,7 +106,7 @@ pub async fn run_server(config: IndexerConfig) -> anyhow::Result<()> {
};
let state = Arc::new(AppState {
registry,
registry: Arc::new(registry),
#[cfg(feature = "metrics")]
prom_registry,
});
......
......@@ -50,6 +50,9 @@ pub struct WorkerRegistry {
watermarks: DashMap<(WorkerId, u32), Arc<AtomicU64>>,
/// Saved listener state for pause/resume. Populated on register, kept on pause.
listener_states: DashMap<(WorkerId, u32), ListenerState>,
/// Workers added via MDC discovery (no ZMQ listener). Maps worker_id → indexer key.
#[cfg(feature = "indexer-runtime")]
discovered_workers: DashMap<WorkerId, IndexerKey>,
num_threads: usize,
ready_tx: watch::Sender<bool>,
ready_rx: watch::Receiver<bool>,
......@@ -64,6 +67,8 @@ impl WorkerRegistry {
peers: DashMap::new(),
watermarks: DashMap::new(),
listener_states: DashMap::new(),
#[cfg(feature = "indexer-runtime")]
discovered_workers: DashMap::new(),
num_threads,
ready_tx,
ready_rx,
......@@ -101,6 +106,15 @@ impl WorkerRegistry {
block_size: u32,
replay_endpoint: Option<String>,
) -> Result<()> {
// Reject if this worker was already added via discovery
#[cfg(feature = "indexer-runtime")]
if self.discovered_workers.contains_key(&instance_id) {
bail!(
"instance {instance_id} is already registered via discovery; \
use the Dynamo runtime to manage it"
);
}
let key = IndexerKey {
model_name,
tenant_id,
......@@ -207,21 +221,24 @@ impl WorkerRegistry {
model_name: &str,
tenant_id: &str,
) -> Result<()> {
let (_, entry) = self
.workers
.remove(&instance_id)
.ok_or_else(|| anyhow::anyhow!("instance {instance_id} not found"))?;
super::metrics::dec_workers();
for cancel in entry.cancels.values() {
cancel.cancel();
}
let key = IndexerKey {
model_name: model_name.to_string(),
tenant_id: tenant_id.to_string(),
};
// Check ZMQ-registered workers first, then discovery workers (if runtime mode)
if let Some((_, entry)) = self.workers.remove(&instance_id) {
super::metrics::dec_workers();
for cancel in entry.cancels.values() {
cancel.cancel();
}
} else if self.remove_discovered_worker(instance_id) {
super::metrics::dec_workers();
tracing::info!(instance_id, "Deregistering discovered worker via HTTP");
} else {
bail!("instance {instance_id} not found");
}
if let Some(ie) = self.indexers.get(&key) {
ie.indexer.remove_worker(instance_id).await;
} else {
......@@ -286,15 +303,20 @@ impl WorkerRegistry {
instance_id: WorkerId,
model_name: &str,
) -> Result<()> {
let (_, entry) = self
.workers
.remove(&instance_id)
.ok_or_else(|| anyhow::anyhow!("instance {instance_id} not found"))?;
super::metrics::dec_workers();
for cancel in entry.cancels.values() {
cancel.cancel();
// Check ZMQ-registered workers first, then discovery workers (if runtime mode)
if let Some((_, entry)) = self.workers.remove(&instance_id) {
super::metrics::dec_workers();
for cancel in entry.cancels.values() {
cancel.cancel();
}
} else if self.remove_discovered_worker(instance_id) {
super::metrics::dec_workers();
tracing::info!(
instance_id,
"Deregistering discovered worker (all tenants) via HTTP"
);
} else {
bail!("instance {instance_id} not found");
}
let mut found = false;
......@@ -381,10 +403,25 @@ impl WorkerRegistry {
}
pub fn list(&self) -> Vec<(WorkerId, HashMap<u32, String>)> {
self.workers
#[allow(unused_mut)]
let mut result: Vec<(WorkerId, HashMap<u32, String>)> = self
.workers
.iter()
.map(|entry| (*entry.key(), entry.value().endpoints.clone()))
.collect()
.collect();
// Include discovered workers (no ZMQ endpoints)
#[cfg(feature = "indexer-runtime")]
for entry in self.discovered_workers.iter() {
let worker_id = *entry.key();
// Skip if already in the workers map (shouldn't happen, but be safe)
if self.workers.contains_key(&worker_id) {
continue;
}
result.push((worker_id, HashMap::new()));
}
result
}
pub fn get_indexer(&self, key: &IndexerKey) -> Option<Ref<'_, IndexerKey, IndexerEntry>> {
......@@ -428,4 +465,105 @@ impl WorkerRegistry {
})
.collect()
}
/// Helper: try to remove a worker from the discovered_workers map.
/// Returns false when the feature is disabled (no discovered workers exist).
fn remove_discovered_worker(&self, _instance_id: WorkerId) -> bool {
#[cfg(feature = "indexer-runtime")]
{
self.discovered_workers.remove(&_instance_id).is_some()
}
#[cfg(not(feature = "indexer-runtime"))]
{
false
}
}
// ---------------------------------------------------------------
// Discovery-based worker management (no ZMQ listener)
// ---------------------------------------------------------------
/// Register a worker discovered via MDC. Creates the indexer if needed but
/// does NOT start a ZMQ listener — events arrive via the event plane.
#[cfg(feature = "indexer-runtime")]
pub fn add_worker_from_discovery(
&self,
instance_id: WorkerId,
model_name: String,
tenant_id: String,
block_size: u32,
) -> Result<()> {
// Reject if this worker is already registered via ZMQ (--workers or /register)
if self.workers.contains_key(&instance_id) {
bail!(
"instance {instance_id} is already registered via ZMQ; \
cannot add via discovery"
);
}
let key = IndexerKey {
model_name,
tenant_id,
};
let indexer_entry = self.indexers.entry(key.clone()).or_insert_with(|| {
tracing::info!(
model_name = %key.model_name,
tenant_id = %key.tenant_id,
block_size,
"Creating new indexer (discovery)"
);
IndexerEntry {
indexer: create_indexer(block_size, self.num_threads),
block_size,
}
});
if indexer_entry.block_size != block_size {
bail!(
"block_size mismatch for model={} tenant={}: existing={}, requested={}",
key.model_name,
key.tenant_id,
indexer_entry.block_size,
block_size
);
}
drop(indexer_entry);
self.discovered_workers.insert(instance_id, key);
Ok(())
}
/// Remove a worker that was discovered via MDC.
#[cfg(feature = "indexer-runtime")]
pub async fn remove_worker_from_discovery(&self, instance_id: WorkerId) {
if let Some((_, key)) = self.discovered_workers.remove(&instance_id) {
if let Some(ie) = self.indexers.get(&key) {
ie.indexer.remove_worker(instance_id).await;
}
} else {
tracing::debug!(
instance_id,
"remove_worker_from_discovery: worker not in discovered_workers map"
);
}
}
/// Look up the indexer responsible for a given worker_id.
/// Checks both discovery-registered and CLI-registered workers.
#[cfg(feature = "indexer-runtime")]
pub fn get_indexer_for_worker(&self, worker_id: WorkerId) -> Option<Indexer> {
// Check discovery workers first (more common in runtime mode)
if let Some(key) = self.discovered_workers.get(&worker_id)
&& let Some(ie) = self.indexers.get(key.value())
{
return Some(ie.indexer.clone());
}
// Fall back for legacy --workers mode: only if this worker is actually
// in the ZMQ-registered workers map, route to the first indexer.
if self.workers.contains_key(&worker_id) {
return self.indexers.iter().next().map(|ie| ie.indexer.clone());
}
None
}
}
......@@ -18,7 +18,7 @@ use crate::protocols::{LocalBlockHash, WorkerId, compute_block_hash_for_seq};
use super::registry::{IndexerKey, WorkerRegistry};
pub struct AppState {
pub registry: WorkerRegistry,
pub registry: Arc<WorkerRegistry>,
#[cfg(feature = "metrics")]
pub prom_registry: prometheus::Registry,
}
......
......@@ -563,6 +563,7 @@ impl ModelManager {
kv_cache_block_size: u32,
kv_router_config: Option<KvRouterConfig>,
worker_type: &'static str,
model_name: Option<String>,
) -> anyhow::Result<Arc<KvRouter>> {
let client = endpoint.client().await?;
......@@ -588,7 +589,10 @@ impl ModelManager {
// Get of create runtime config watcher for this endpoint
let workers_with_configs = self.get_or_create_runtime_config_watcher(endpoint).await?;
let selector = Box::new(DefaultWorkerSelector::new(kv_router_config, worker_type));
let selector = Box::new(DefaultWorkerSelector::new(
kv_router_config.clone(),
worker_type,
));
let chooser = KvRouter::new(
endpoint.clone(),
client,
......@@ -597,6 +601,7 @@ impl ModelManager {
Some(selector),
kv_router_config,
worker_type,
model_name,
)
.await?;
Ok(Arc::new(chooser))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment