Unverified Commit b6596c52 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

chore(kv-router): remove native kv-indexer binary, use maturin-built one (#7338)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 9e9ca3e2
......@@ -1888,17 +1888,14 @@ dependencies = [
"async-trait",
"axum 0.8.4",
"bytes",
"clap 4.6.0",
"dashmap 6.1.0",
"derive-getters",
"derive_builder",
"dynamo-runtime",
"dynamo-tokens",
"flume",
"indicatif 0.18.4",
"ordered-float 4.6.0",
"parking_lot",
"plotters",
"prometheus",
"rand 0.9.2",
"reqwest 0.12.28",
......@@ -1912,7 +1909,6 @@ dependencies = [
"tokio",
"tokio-util",
"tracing",
"tracing-subscriber",
"uuid",
"validator",
"xxhash-rust",
......
......@@ -43,7 +43,7 @@ dynamo-config = { path = "lib/config", version = "1.0.0" }
dynamo-tokens = { path = "lib/tokens", version = "1.0.0" }
dynamo-memory = { path = "lib/memory", version = "1.0.0" }
dynamo-mocker = { path = "lib/mocker", version = "1.0.0" }
dynamo-kv-router = { path = "lib/kv-router", version = "1.0.0", features = ["metrics"] }
dynamo-kv-router = { path = "lib/kv-router", version = "1.0.0", features = ["metrics", "runtime-protocols"] }
dynamo-async-openai = { path = "lib/async-openai", version = "1.0.0", features = ["byot"] }
dynamo-parsers = { path = "lib/parsers", version = "1.0.0" }
......
......@@ -439,9 +439,9 @@ RUN --mount=type=secret,id=aws-key-id,env=AWS_ACCESS_KEY_ID \
uv build --wheel --out-dir /opt/dynamo/dist && \
cd /opt/dynamo/lib/bindings/python && \
if [ "$ENABLE_MEDIA_FFMPEG" = "true" ]; then \
maturin build --release --features "media-ffmpeg,kv-indexer" --out /opt/dynamo/dist; \
maturin build --release --features "media-ffmpeg" --out /opt/dynamo/dist; \
else \
maturin build --release --features "kv-indexer" --out /opt/dynamo/dist; \
maturin build --release --out /opt/dynamo/dist; \
fi && \
/tmp/use-sccache.sh show-stats "Dynamo Runtime"
......
......@@ -7,12 +7,12 @@ subtitle: Run the KV cache indexer as an independent HTTP service for querying b
## Overview
The standalone KV indexer (`dynamo-kv-indexer`) is a lightweight binary that maintains a radix tree of cached blocks and exposes HTTP endpoints for querying and managing workers. It supports two operational modes:
The standalone KV indexer (`python -m dynamo.indexer`) is a lightweight service that maintains a radix tree of cached blocks and exposes HTTP endpoints for querying and managing workers. It supports two operational modes:
- **Standalone mode** (default): Subscribes to ZMQ KV event streams directly from workers. No Dynamo runtime dependencies required.
- **Dynamo runtime mode** (`--dynamo-runtime`): Integrates with the Dynamo runtime for automatic worker discovery via MDC, KV event ingestion via the event plane (NATS or ZMQ), and serves indexer queries over the request plane for remote frontends.
- **Standalone mode** (default): subscribes to ZMQ KV event streams directly from workers. No Dynamo runtime dependencies required.
- **Dynamo runtime mode** (`--dynamo-runtime`): integrates with the Dynamo runtime for automatic worker discovery via MDC, KV event ingestion via the event plane (NATS or ZMQ), and overlap queries over the request plane for remote frontends.
This is distinct from the [Standalone Router](https://github.com/ai-dynamo/dynamo/blob/main/components/src/dynamo/router/README.md), which is a full routing service. The standalone indexer provides only the indexing and query layer without routing logic.
This is distinct from the [Standalone Router](../../../components/src/dynamo/router/README.md), which is a full routing service. The standalone indexer provides only the indexing and query layer without routing logic.
The HTTP API follows the [Mooncake KV Indexer RFC](https://github.com/kvcache-ai/Mooncake/issues/1403) conventions.
......@@ -56,11 +56,11 @@ If no peers are reachable, the indexer starts with an empty state.
```bash
# Replica A (first instance, no peers)
dynamo-kv-indexer --port 8090 --block-size 16 \
python -m dynamo.indexer --port 8090 --block-size 16 \
--workers "1=tcp://worker1:5557,2=tcp://worker2:5558"
# Replica B (recovers from A on startup)
dynamo-kv-indexer --port 8091 --block-size 16 \
python -m dynamo.indexer --port 8091 --block-size 16 \
--workers "1=tcp://worker1:5557,2=tcp://worker2:5558" \
--peers "http://localhost:8090"
```
......@@ -81,52 +81,50 @@ Peers can be registered at startup via `--peers` or dynamically via the HTTP API
## Building
The binary is a feature-gated target in the `dynamo-kv-router` crate. The available cargo features control which capabilities are compiled in:
The service is exposed through the Python package after building the bindings with maturin. Feature flags control which capabilities are compiled in:
| Feature | Description |
|---------|-------------|
| `standalone-indexer` | Core standalone indexer library (HTTP server, ZMQ listeners, P2P recovery) |
| `metrics` | Prometheus metrics (`/metrics` endpoint, request/worker gauges) |
| `indexer-bin` | CLI binary target |
| `indexer-runtime` | Dynamo runtime integration (discovery, event plane, request plane) |
| `test-endpoints` | Test-only endpoints (`/test/pause_listener`, `/test/resume_listener`) |
| `kv-indexer` | Core standalone indexer binary (HTTP API, ZMQ listeners, P2P recovery) |
| `kv-indexer-metrics` | Optional `/metrics` endpoint |
| `kv-indexer-runtime` | Dynamo runtime integration (`--dynamo-runtime`, discovery, event plane, request plane) |
### Standalone build (no runtime dependency)
### Standalone build
```bash
cargo build -p dynamo-kv-router --features indexer-bin --bin dynamo-kv-indexer
cd lib/bindings/python && VIRTUAL_ENV=../../.venv ../../.venv/bin/maturin develop --uv --features kv-indexer
```
This produces a binary with no `dynamo-runtime` dependency. It supports ZMQ event listeners, HTTP API, and P2P recovery.
After installation, launch the service with `python -m dynamo.indexer`.
### Standalone build with metrics
```bash
cargo build -p dynamo-kv-router --features indexer-bin,metrics --bin dynamo-kv-indexer
cd lib/bindings/python && VIRTUAL_ENV=../../.venv ../../.venv/bin/maturin develop --uv --features kv-indexer,kv-indexer-metrics
```
Adds Prometheus metrics support (`/metrics` endpoint). Pulls in `dynamo-runtime` for the metrics implementation.
This keeps the default `kv-indexer` build lean while still allowing Prometheus metrics when needed.
### Runtime-enabled build
```bash
cargo build -p dynamo-kv-router --features indexer-bin,indexer-runtime --bin dynamo-kv-indexer
cd lib/bindings/python && VIRTUAL_ENV=../../.venv ../../.venv/bin/maturin develop --uv --features kv-indexer,kv-indexer-runtime
```
Enables the `--dynamo-runtime` CLI flag for MDC discovery, event plane subscription, and request plane query endpoint. Includes metrics.
This enables the `--dynamo-runtime` CLI flag for MDC discovery, event-plane subscription, and request-plane queries. It also includes the metrics endpoint.
## CLI
### Standalone mode (default)
```bash
dynamo-kv-indexer --port 8090 [--threads 4] [--block-size 16 --model-name my-model --tenant-id default --workers "1=tcp://host:5557,2:1=tcp://host:5558"] [--peers "http://peer1:8090,http://peer2:8091"]
python -m dynamo.indexer --port 8090 [--threads 4] [--block-size 16 --model-name my-model --tenant-id default --workers "1=tcp://host:5557,2:1=tcp://host:5558"] [--peers "http://peer1:8090,http://peer2:8091"]
```
### Dynamo runtime mode (requires `indexer-runtime` feature)
### Dynamo runtime mode
```bash
dynamo-kv-indexer --dynamo-runtime --namespace default --component-name kv-indexer --worker-component backend --port 8090 [--threads 4]
python -m dynamo.indexer --dynamo-runtime --namespace default --component-name kv-indexer --worker-component backend --port 8090 [--threads 4]
```
In runtime mode, workers are discovered automatically via MDC. The `--workers` flag can still be used to register additional static workers alongside discovered ones.
......@@ -140,10 +138,10 @@ In runtime mode, workers are discovered automatically via MDC. The `--workers` f
| `--model-name` | `default` | Model name for initial `--workers` |
| `--tenant-id` | `default` | Tenant ID for initial `--workers` |
| `--peers` | (none) | Comma-separated peer indexer URLs for P2P recovery on startup |
| `--dynamo-runtime` | `false` | Enable Dynamo runtime integration (requires `indexer-runtime` feature) |
| `--namespace` | `default` | Dynamo namespace to register the indexer component under (runtime mode) |
| `--component-name` | `kv-indexer` | Component name for this indexer in the Dynamo runtime (runtime mode) |
| `--worker-component` | `backend` | Component name that workers register under, for event plane subscription (runtime mode) |
| `--dynamo-runtime` | `false` | Enable Dynamo runtime integration (requires `kv-indexer-runtime`) |
| `--namespace` | `default` | Dynamo namespace to register the indexer component under |
| `--component-name` | `kv-indexer` | Component name for this indexer in the Dynamo runtime |
| `--worker-component` | `backend` | Component name that workers register under for event-plane subscription |
## HTTP API
......@@ -157,7 +155,7 @@ curl http://localhost:8090/health
### `GET /metrics` — Prometheus metrics
Returns metrics in Prometheus text exposition format. Available when the binary is built with the `metrics` or `indexer-runtime` feature.
Returns metrics in Prometheus text exposition format. Available when the binary is built with the `kv-indexer-metrics` or `kv-indexer-runtime` feature.
```bash
curl http://localhost:8090/metrics
......@@ -170,10 +168,12 @@ curl http://localhost:8090/metrics
| `dynamo_kvindexer_errors_total` | Counter | `endpoint`, `status_class` | HTTP error responses (4xx/5xx) |
| `dynamo_kvindexer_models` | Gauge | — | Number of active model+tenant indexers |
| `dynamo_kvindexer_workers` | Gauge | — | Number of registered worker instances |
| `dynamo_kvindexer_listeners` | Gauge | `status` | Number of ZMQ listeners by status (`pending`, `active`, `paused`, `failed`) |
### `POST /register` — Register an endpoint
Register a ZMQ endpoint for an instance. Each call creates or reuses the indexer for the given `(model_name, tenant_id)` pair.
Registration is non-blocking: if the worker is not up yet, the listener is accepted in `pending` state and transitions to `active` once the initial ZMQ connection succeeds.
```bash
# Single model, default tenant
......@@ -245,9 +245,38 @@ curl http://localhost:8090/workers
Returns:
```json
[{"instance_id": 1, "endpoints": {"0": "tcp://127.0.0.1:5557", "1": "tcp://127.0.0.1:5558"}}]
[
{
"instance_id": 1,
"source": "zmq",
"status": "active",
"endpoints": {
"0": "tcp://127.0.0.1:5557",
"1": "tcp://127.0.0.1:5558"
},
"listeners": {
"0": {
"endpoint": "tcp://127.0.0.1:5557",
"status": "active"
},
"1": {
"endpoint": "tcp://127.0.0.1:5558",
"status": "active"
}
}
},
{
"instance_id": 2,
"source": "discovery",
"status": "active",
"endpoints": {},
"listeners": {}
}
]
```
For ZMQ-managed workers, `status` is aggregated across listeners with priority `failed > pending > active > paused`. Each listener entry may also expose a `last_error` field when the most recent startup or recv-loop attempt failed.
### `POST /query` — Query overlap for token IDs
Given raw token IDs, compute block hashes and return per-instance overlap scores (in matched tokens):
......@@ -367,7 +396,7 @@ When started with `--dynamo-runtime`, the indexer integrates with the Dynamo dis
### Worker Discovery
The indexer watches MDC (Model Discovery Catalog) for worker additions and removals. When a worker registers with MDC, the indexer automatically creates an indexer for its model and block size. Workers discovered via MDC are tracked separately from those registered via `--workers` or the `/register` HTTP API a worker cannot be registered through both paths simultaneously.
The indexer watches MDC (Model Discovery Catalog) for worker additions and removals. When a worker registers with MDC, the indexer automatically creates an indexer for its model and block size. Workers discovered via MDC are tracked separately from those registered via `--workers` or the `/register` HTTP API; a worker cannot be registered through both paths simultaneously.
### Event Plane Subscription
......@@ -381,7 +410,7 @@ The indexer registers a query endpoint on the Dynamo request plane, allowing fro
```bash
# Start the indexer with runtime integration
dynamo-kv-indexer --dynamo-runtime \
python -m dynamo.indexer --dynamo-runtime \
--namespace my-namespace \
--component-name kv-indexer \
--worker-component backend \
......@@ -392,7 +421,7 @@ The HTTP API remains fully available in runtime mode. Static workers can be adde
## Limitations
- **Standalone mode is ZMQ only**: In standalone mode, workers must publish KV events via ZMQ PUB sockets. Build with `indexer-runtime` and use `--dynamo-runtime` to receive events via the event plane (NATS or ZMQ).
- **Standalone mode is ZMQ only**: In standalone mode, workers must publish KV events via ZMQ PUB sockets. Build with `kv-indexer-runtime` and use `--dynamo-runtime` to receive events via the event plane (NATS or ZMQ).
- **No routing logic**: The indexer only maintains the radix tree and answers queries. It does not track active blocks, manage request lifecycle, or perform worker selection.
## Architecture
......@@ -410,7 +439,7 @@ graph TD
REG[Worker Registry]
ZMQ[ZMQ SUB Listeners]
IDX["Indexer Map<br/>(model, tenant) → Radix Tree"]
HTTP[HTTP API<br/>/query /dump /register /metrics /health]
HTTP[HTTP API<br/>/query /dump /register /health]
end
CLIENT[External Client]
......@@ -453,7 +482,7 @@ graph TD
REG[Worker Registry]
IDX["Indexer Map<br/>(model, tenant) → Radix Tree"]
QE[Query Endpoint]
HTTP[HTTP API<br/>/query /dump /register]
HTTP[HTTP API<br/>/query /dump /register /metrics]
end
FRONTEND[Frontend / Router]
......@@ -511,4 +540,4 @@ sequenceDiagram
- **[Mooncake KV Indexer RFC](https://github.com/kvcache-ai/Mooncake/issues/1403)**: Community API standardization for KV cache indexers
- **[Router Guide](router-guide.md)**: Full KV router configuration and tuning
- **[Router Design](../../design-docs/router-design.md)**: Architecture and event transport modes
- **[Standalone Router](https://github.com/ai-dynamo/dynamo/blob/main/components/src/dynamo/router/README.md)**: Full routing service (routes requests to workers)
- **[Standalone Router](../../../components/src/dynamo/router/README.md)**: Full routing service (routes requests to workers)
......@@ -1527,8 +1527,6 @@ version = "1.0.0"
dependencies = [
"anyhow",
"async-trait",
"axum",
"bytes",
"dashmap 6.1.0",
"derive-getters",
"derive_builder",
......@@ -1539,11 +1537,9 @@ dependencies = [
"parking_lot",
"prometheus",
"rand 0.9.2",
"reqwest",
"rmp-serde",
"rustc-hash 2.1.1",
"serde",
"serde_json",
"thiserror 2.0.18",
"tokio",
"tokio-util",
......@@ -1551,7 +1547,6 @@ dependencies = [
"uuid",
"validator",
"xxhash-rust",
"zeromq",
]
[[package]]
......@@ -1706,8 +1701,6 @@ version = "1.0.0"
dependencies = [
"anyhow",
"async-trait",
"clap",
"dynamo-kv-router",
"dynamo-llm",
"dynamo-mocker",
"dynamo-parsers",
......@@ -1725,7 +1718,6 @@ dependencies = [
"tokio-stream",
"tokio-util",
"tracing",
"tracing-subscriber",
]
[[package]]
......
......@@ -24,7 +24,6 @@ crate-type = ["cdylib", "rlib"]
[features]
default = []
media-ffmpeg = ["dynamo-llm/media-ffmpeg"]
kv-indexer = ["dep:dynamo-kv-router", "dep:clap", "dep:tracing-subscriber"]
[dependencies]
dynamo-runtime = { path = "../../runtime" }
......@@ -44,11 +43,6 @@ tokio-stream = { version = "0" }
tokio-util = { version = "0.7", features = ["rt"] }
tracing = { version = "0" }
# kv-indexer (optional)
dynamo-kv-router = { path = "../../kv-router", features = ["standalone-indexer"], optional = true }
clap = { version = "4.5", features = ["derive"], optional = true }
tracing-subscriber = { version = "0.3", features = ["env-filter"], optional = true }
# "extension-module" tells pyo3 we want to build an extension module (skips linking against libpython.so)
# "abi3-py310" tells pyo3 (and maturin) to build using the stable ABI with minimum Python version 3.10, which is the minimum version in pyproject.toml
pyo3 = { version = "0.23.4", default-features = false, features = [
......@@ -76,9 +70,4 @@ dynamo-llm = { path = "../../llm" }
[target.'cfg(not(target_os = "linux"))'.dependencies]
dynamo-llm = { path = "../../llm", default-features = false }
[[bin]]
name = "dynamo-kv-indexer"
path = "rust/bin/kv_indexer.rs"
required-features = ["kv-indexer"]
[dev-dependencies]
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use clap::Parser;
use dynamo_kv_router::standalone_indexer::{self, IndexerConfig};
#[derive(Parser)]
#[command(name = "dynamo-kv-indexer", about = "Standalone KV cache indexer")]
struct Cli {
/// KV cache block size for initial workers registered via --workers
#[arg(long)]
block_size: Option<u32>,
/// HTTP server port
#[arg(long, default_value_t = 8090)]
port: u16,
/// Number of indexer threads (1 = single-threaded KvIndexer, >1 = ThreadPoolIndexer)
#[arg(long, default_value_t = 4)]
threads: usize,
/// Initial workers as "worker_id[:dp_rank]=zmq_address,..." (e.g. "1=tcp://host:5557,1:1=tcp://host:5558")
#[arg(long)]
workers: Option<String>,
/// Model name for initial workers registered via --workers
#[arg(long, default_value = "default")]
model_name: String,
/// Tenant ID for initial workers registered via --workers
#[arg(long, default_value = "default")]
tenant_id: String,
/// Comma-separated peer URLs for P2P recovery (e.g. "http://host1:8090,http://host2:8091")
#[arg(long)]
peers: Option<String>,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.init();
let cli = Cli::parse();
standalone_indexer::run_server(IndexerConfig {
block_size: cli.block_size,
port: cli.port,
threads: cli.threads,
workers: cli.workers,
model_name: cli.model_name,
tenant_id: cli.tenant_id,
peers: cli.peers,
})
.await
}
......@@ -77,7 +77,6 @@ type JsonServerStreamingIngress =
static INIT: OnceCell<()> = OnceCell::new();
const DEFAULT_ANNOTATED_SETTING: Option<bool> = Some(true);
// Helper to get appropriate span for instrumentation - always emit spans
fn get_span_for_context(context: &context::Context, operation: &str) -> tracing::Span {
logging::make_client_request_span(
......
......@@ -12,12 +12,11 @@ repository.workspace = true
[features]
default = []
metrics = ["dep:dynamo-runtime"]
bench = ["dep:clap", "dep:indicatif", "dep:serde_json", "dep:plotters"]
metrics = ["dep:prometheus"]
runtime-protocols = ["dep:dynamo-runtime"]
bench = []
standalone-indexer = ["dep:axum", "dep:bytes", "dep:zeromq", "dep:serde_json", "dep:reqwest"]
indexer-bin = ["standalone-indexer", "dep:clap", "dep:tracing-subscriber"]
test-endpoints = ["indexer-bin"]
indexer-runtime = ["metrics", "standalone-indexer"]
indexer-runtime = ["metrics", "runtime-protocols", "standalone-indexer"]
[dependencies]
# repo
......@@ -31,7 +30,7 @@ dashmap = { workspace = true }
ordered-float = { workspace = true }
derive_builder = { workspace = true }
derive-getters = { workspace = true }
prometheus = { workspace = true }
prometheus = { workspace = true, optional = true }
rand = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true, optional = true }
......@@ -48,31 +47,17 @@ flume = "0.12.0"
parking_lot = { workspace = true }
rmp-serde = { workspace = true }
# bench (optional)
clap = { version = "4.5", features = ["derive"], optional = true }
indicatif = { version = "0.18.0", optional = true }
plotters = { version = "0.3", optional = true, default-features = false, features = ["svg_backend", "line_series", "point_series", "full_palette"] }
rustc-hash = "2.1.1"
# indexer-bin (optional)
# standalone-indexer (optional)
axum = { workspace = true, optional = true }
bytes = { workspace = true, optional = true }
reqwest = { workspace = true, optional = true }
tracing-subscriber = { workspace = true, optional = true }
zeromq = { version = "0.4.1", optional = true }
[package.metadata.cargo-machete]
ignored = ["indicatif", "plotters"]
[dev-dependencies]
rstest = "0.18.2"
rstest_reuse = "0.7.0"
serde_json = { workspace = true }
tokio = { workspace = true, features = ["rt", "macros", "time", "test-util"] }
dynamo-tokens = { workspace = true }
[[bin]]
name = "dynamo-kv-indexer"
path = "src/bin/kv_indexer/main.rs"
required-features = ["indexer-bin"]
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::sync::Arc;
use clap::Parser;
use tokio::net::TcpListener;
use tokio_util::sync::CancellationToken;
use dynamo_kv_router::standalone_indexer::{
self, recovery,
registry::WorkerRegistry,
server::{AppState, create_router},
};
#[cfg(feature = "indexer-runtime")]
mod runtime;
#[derive(Parser)]
#[command(name = "dynamo-kv-indexer", about = "Standalone KV cache indexer")]
struct Cli {
/// KV cache block size for initial workers registered via --workers
#[arg(long)]
block_size: Option<u32>,
/// HTTP server port
#[arg(long, default_value_t = 8090)]
port: u16,
/// Number of indexer threads (1 = single-threaded KvIndexer, >1 = ThreadPoolIndexer)
#[arg(long, default_value_t = 4)]
threads: usize,
/// Initial workers as "worker_id[:dp_rank]=zmq_address,..." (e.g. "1=tcp://host:5557,1:1=tcp://host:5558")
#[arg(long)]
workers: Option<String>,
/// Model name for initial workers registered via --workers
#[arg(long, default_value = "default")]
model_name: String,
/// Tenant ID for initial workers registered via --workers
#[arg(long, default_value = "default")]
tenant_id: String,
/// Comma-separated peer URLs for P2P recovery (e.g. "http://host1:8090,http://host2:8091")
#[arg(long)]
peers: Option<String>,
/// Enable Dynamo runtime integration (discovery, event plane, request plane).
/// When enabled, workers are discovered via MDC and events arrive via the event plane.
/// Also enables router to configure a remote indexer via the request plane.
#[cfg(feature = "indexer-runtime")]
#[arg(long)]
dynamo_runtime: bool,
/// Dynamo namespace to register the indexer component under
#[cfg(feature = "indexer-runtime")]
#[arg(long, default_value = "default")]
namespace: String,
/// Component name for this indexer in the Dynamo runtime
#[cfg(feature = "indexer-runtime")]
#[arg(long, default_value = "kv-indexer")]
component_name: String,
/// Component name that workers register under (for event plane subscription)
#[cfg(feature = "indexer-runtime")]
#[arg(long, default_value = "backend")]
worker_component: String,
}
fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
#[cfg(feature = "indexer-runtime")]
if cli.dynamo_runtime {
// Full Dynamo runtime mode: discovery, event plane, request plane
dynamo_runtime::logging::init();
let worker = dynamo_runtime::Worker::from_settings()?;
return worker.execute(move |runtime| app_with_runtime(runtime, cli));
}
// Standalone HTTP-only mode: no runtime dependencies
tracing_subscriber::fmt::init();
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(app_standalone(cli))
}
async fn app_standalone(cli: Cli) -> anyhow::Result<()> {
let cancel_token = CancellationToken::new();
// Install signal handler for graceful shutdown
let shutdown_token = cancel_token.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok();
tracing::info!("Received shutdown signal");
shutdown_token.cancel();
});
tracing::info!(
block_size = ?cli.block_size,
port = cli.port,
threads = cli.threads,
model_name = %cli.model_name,
tenant_id = %cli.tenant_id,
num_peers = cli.peers.as_ref().map(|p| p.split(',').count()).unwrap_or(0),
"Starting standalone KV cache indexer (HTTP-only mode)"
);
let registry = Arc::new(WorkerRegistry::new(cli.threads));
run_common(&cli, &registry, cancel_token).await
}
#[cfg(feature = "indexer-runtime")]
async fn app_with_runtime(runtime: dynamo_runtime::Runtime, cli: Cli) -> anyhow::Result<()> {
use dynamo_kv_router::indexer::{
IndexerQueryRequest, IndexerQueryResponse, KV_INDEXER_QUERY_ENDPOINT,
};
use dynamo_runtime::{
DistributedRuntime,
pipeline::{ManyOut, SingleIn, network::Ingress},
};
let distributed_runtime = DistributedRuntime::from_settings(runtime).await?;
let cancel_token = distributed_runtime.primary_token();
let component = distributed_runtime
.namespace(&cli.namespace)?
.component(&cli.component_name)?;
tracing::info!(
namespace = %cli.namespace,
component = %cli.component_name,
block_size = ?cli.block_size,
port = cli.port,
threads = cli.threads,
model_name = %cli.model_name,
tenant_id = %cli.tenant_id,
worker_component = %cli.worker_component,
num_peers = cli.peers.as_ref().map(|p| p.split(',').count()).unwrap_or(0),
"Starting standalone KV cache indexer (Dynamo runtime mode)"
);
let registry = Arc::new(WorkerRegistry::new(cli.threads));
let engine = Arc::new(runtime::query_engine::IndexerQueryEngine {
registry: registry.clone(),
});
let ingress =
Ingress::<SingleIn<IndexerQueryRequest>, ManyOut<IndexerQueryResponse>>::for_engine(
engine,
)?;
let query_endpoint = component
.endpoint(KV_INDEXER_QUERY_ENDPOINT)
.endpoint_builder()
.handler(ingress)
.graceful_shutdown(true);
distributed_runtime.runtime().secondary().spawn(async move {
if let Err(e) = query_endpoint.start().await {
tracing::error!(error = %e, "Query endpoint failed");
}
});
tracing::info!(
endpoint = KV_INDEXER_QUERY_ENDPOINT,
"Query endpoint registered"
);
runtime::discovery::spawn_discovery_watcher(
&distributed_runtime,
registry.clone(),
cancel_token.clone(),
)
.await?;
runtime::subscriber::spawn_event_subscriber(
&distributed_runtime,
&cli.namespace,
&cli.worker_component,
registry.clone(),
cancel_token.clone(),
)
.await?;
run_common(&cli, &registry, cancel_token).await
}
/// Shared logic for both standalone and runtime modes:
/// register CLI workers, P2P recovery, signal ready, start HTTP server.
async fn run_common(
cli: &Cli,
registry: &Arc<WorkerRegistry>,
cancel_token: CancellationToken,
) -> anyhow::Result<()> {
if let Some(ref workers_str) = cli.workers {
let block_size = cli.block_size.ok_or_else(|| {
anyhow::anyhow!("--block-size is required when --workers is specified")
})?;
for (instance_id, dp_rank, endpoint) in standalone_indexer::parse_workers(workers_str) {
tracing::info!(instance_id, dp_rank, endpoint, "Registering initial worker");
registry
.register(
instance_id,
endpoint,
dp_rank,
cli.model_name.clone(),
cli.tenant_id.clone(),
block_size,
None,
)
.await?;
}
}
let peers: Vec<String> = cli
.peers
.as_deref()
.map(|s| {
s.split(',')
.filter(|p| !p.is_empty())
.map(|p| p.trim().to_string())
.collect()
})
.unwrap_or_default();
// P2P recovery: fetch dump from a peer before starting ZMQ listeners.
if !peers.is_empty() {
match recovery::recover_from_peers(&peers, registry).await {
Ok(true) => tracing::info!("P2P recovery completed"),
Ok(false) => tracing::warn!("no reachable peers, starting with empty state"),
Err(e) => tracing::warn!(error = %e, "P2P recovery failed, starting with empty state"),
}
for peer in &peers {
registry.register_peer(peer.clone());
}
}
// Signal ready — unblocks all ZMQ listeners to start draining buffered events
registry.signal_ready();
#[cfg(feature = "metrics")]
let prom_registry = {
let r = prometheus::Registry::new();
dynamo_kv_router::standalone_indexer::metrics::register(&r)
.expect("failed to register indexer metrics");
r
};
let state = Arc::new(AppState {
registry: registry.clone(),
#[cfg(feature = "metrics")]
prom_registry,
});
let app = create_router(state);
let listener = TcpListener::bind(("0.0.0.0", cli.port)).await?;
tracing::info!("HTTP server listening on 0.0.0.0:{}", cli.port);
axum::serve(listener, app)
.with_graceful_shutdown(async move {
cancel_token.cancelled().await;
tracing::info!("Received shutdown signal, stopping HTTP server");
})
.await?;
Ok(())
}
#[cfg(test)]
mod tests {
use dynamo_kv_router::standalone_indexer::parse_workers;
#[test]
fn test_parse_workers() {
let input = "1=tcp://host:5557,2:1=tcp://host:5558";
let result = parse_workers(input);
assert_eq!(result.len(), 2);
assert_eq!(result[0], (1, 0, "tcp://host:5557".to_string()));
assert_eq!(result[1], (2, 1, "tcp://host:5558".to_string()));
}
#[test]
fn test_parse_workers_empty() {
assert!(parse_workers("").is_empty());
}
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
#[cfg(feature = "metrics")]
use std::sync::{Arc, OnceLock};
#[cfg(feature = "runtime-protocols")]
use std::sync::Arc;
#[cfg(all(feature = "metrics", feature = "runtime-protocols"))]
use std::sync::OnceLock;
#[cfg(feature = "runtime-protocols")]
use dynamo_runtime::component::Component;
#[cfg(all(feature = "metrics", feature = "runtime-protocols"))]
use dynamo_runtime::metrics::MetricsHierarchy;
#[cfg(feature = "metrics")]
use dynamo_runtime::{
component::Component,
metrics::{MetricsHierarchy, prometheus_names::kvrouter},
};
use prometheus::{IntCounterVec, Opts};
use crate::protocols::{KvCacheEventData, KvCacheEventError};
/// Metrics for the KV Indexer.
#[derive(Clone)]
#[cfg_attr(not(feature = "metrics"), derive(Default))]
pub struct KvIndexerMetrics {
/// Counter of events applied.
#[cfg(feature = "metrics")]
pub kv_cache_events_applied: IntCounterVec,
}
......@@ -32,9 +36,12 @@ pub const METRIC_EVENT_REMOVED: &str = "removed";
pub const METRIC_EVENT_CLEARED: &str = "cleared";
/// Metric name for KV cache events applied counter.
#[cfg(feature = "metrics")]
const KV_CACHE_EVENTS_APPLIED_SUFFIX: &str = "kv_cache_events_applied";
#[cfg(feature = "metrics")]
const KV_CACHE_EVENTS_APPLIED_NAME: &str = "dynamo_kvrouter_kv_cache_events_applied";
#[cfg(feature = "metrics")]
#[cfg(all(feature = "metrics", feature = "runtime-protocols"))]
static KV_INDEXER_METRICS: OnceLock<Arc<KvIndexerMetrics>> = OnceLock::new();
impl KvIndexerMetrics {
......@@ -47,26 +54,40 @@ impl KvIndexerMetrics {
/// Creates a new KvIndexerMetrics from a Component, memoizing the result in
/// KV_INDEXER_METRICS to avoid duplicate registration issues.
#[cfg(feature = "metrics")]
#[cfg(feature = "runtime-protocols")]
pub fn from_component(component: &Component) -> Arc<Self> {
KV_INDEXER_METRICS.get_or_init(|| {
#[cfg(feature = "metrics")]
{
KV_INDEXER_METRICS
.get_or_init(|| {
match component.metrics().create_intcountervec(
kvrouter::KV_CACHE_EVENTS_APPLIED,
KV_CACHE_EVENTS_APPLIED_SUFFIX,
"Total number of KV cache events applied to index",
&["event_type", "status"],
&[],
) {
Ok(kv_cache_events_applied) => Arc::new(Self::new(kv_cache_events_applied)),
Ok(kv_cache_events_applied) => {
Arc::new(Self::new(kv_cache_events_applied))
}
Err(e) => {
tracing::warn!("Failed to create kv indexer metrics from component: {}. Using unregistered metrics as fallback.", e);
Arc::new(Self::new_unregistered())
}
}
}).clone()
})
.clone()
}
#[cfg(not(feature = "metrics"))]
{
let _ = component;
Arc::new(Self::new_unregistered())
}
}
/// Creates a new KvIndexerMetrics which is not registered with a MetricsRegistry.
/// This may be used for tests or as a fallback for when a MetricsRegistry is not available / has errored.
#[cfg(feature = "metrics")]
pub fn new_unregistered() -> Self {
Self {
kv_cache_events_applied: IntCounterVec::new(
......@@ -80,6 +101,12 @@ impl KvIndexerMetrics {
}
}
/// Creates a no-op metrics instance when Prometheus support is disabled.
#[cfg(not(feature = "metrics"))]
pub fn new_unregistered() -> Self {
Self::default()
}
pub fn get_event_type(event_data: &KvCacheEventData) -> &'static str {
match event_data {
KvCacheEventData::Stored(_) => METRIC_EVENT_STORED,
......@@ -93,6 +120,8 @@ impl KvIndexerMetrics {
event_type: &'static str,
result: Result<(), KvCacheEventError>,
) {
#[cfg(feature = "metrics")]
{
match result {
Ok(_) => {
self.kv_cache_events_applied
......@@ -111,4 +140,7 @@ impl KvIndexerMetrics {
}
}
}
#[cfg(not(feature = "metrics"))]
let _ = (self, event_type, result);
}
}
......@@ -4,10 +4,6 @@
#[cfg(feature = "bench")]
use std::time::Instant;
#[cfg(feature = "metrics")]
use dynamo_runtime::error::DynamoError;
#[cfg(feature = "metrics")]
pub use dynamo_runtime::protocols::maybe_error::MaybeError;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, oneshot};
......@@ -16,7 +12,6 @@ use dynamo_tokens::SequenceHash;
/// Trait for types that may represent an error response.
/// Used for RPC-style responses that can indicate success or failure.
#[cfg(not(feature = "metrics"))]
pub trait MaybeError {
/// Construct an instance from an error.
fn from_err(err: impl std::error::Error + 'static) -> Self;
......@@ -75,15 +70,30 @@ pub enum WorkerKvQueryResponse {
Error(String),
}
#[cfg(feature = "metrics")]
impl MaybeError for WorkerKvQueryResponse {
fn from_err(err: impl std::error::Error + 'static) -> Self {
WorkerKvQueryResponse::Error(err.to_string())
}
fn err(&self) -> Option<DynamoError> {
fn err(&self) -> Option<Box<dyn std::error::Error + Send + Sync>> {
match self {
WorkerKvQueryResponse::Error(msg) => Some(Box::new(std::io::Error::other(msg.clone()))),
_ => None,
}
}
}
#[cfg(feature = "runtime-protocols")]
impl dynamo_runtime::protocols::maybe_error::MaybeError for WorkerKvQueryResponse {
fn from_err(err: impl std::error::Error + 'static) -> Self {
WorkerKvQueryResponse::Error(err.to_string())
}
fn err(&self) -> Option<dynamo_runtime::error::DynamoError> {
match self {
WorkerKvQueryResponse::Error(msg) => Some(DynamoError::msg(msg.clone())),
WorkerKvQueryResponse::Error(msg) => {
Some(dynamo_runtime::error::DynamoError::msg(msg.clone()))
}
_ => None,
}
}
......@@ -147,29 +157,30 @@ pub enum IndexerQueryResponse {
Error(String),
}
#[cfg(feature = "metrics")]
impl MaybeError for IndexerQueryResponse {
fn from_err(err: impl std::error::Error + 'static) -> Self {
IndexerQueryResponse::Error(err.to_string())
}
fn err(&self) -> Option<DynamoError> {
fn err(&self) -> Option<Box<dyn std::error::Error + Send + Sync>> {
match self {
IndexerQueryResponse::Error(msg) => Some(DynamoError::msg(msg.clone())),
IndexerQueryResponse::Error(msg) => Some(Box::new(std::io::Error::other(msg.clone()))),
_ => None,
}
}
}
#[cfg(not(feature = "metrics"))]
impl MaybeError for IndexerQueryResponse {
#[cfg(feature = "runtime-protocols")]
impl dynamo_runtime::protocols::maybe_error::MaybeError for IndexerQueryResponse {
fn from_err(err: impl std::error::Error + 'static) -> Self {
IndexerQueryResponse::Error(err.to_string())
}
fn err(&self) -> Option<Box<dyn std::error::Error + Send + Sync>> {
fn err(&self) -> Option<dynamo_runtime::error::DynamoError> {
match self {
IndexerQueryResponse::Error(msg) => Some(Box::new(std::io::Error::other(msg.clone()))),
IndexerQueryResponse::Error(msg) => {
Some(dynamo_runtime::error::DynamoError::msg(msg.clone()))
}
_ => None,
}
}
......
......@@ -9,12 +9,13 @@ use bytes::Bytes;
use rmp_serde as rmps;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use zeromq::{Socket, SocketRecv, SocketSend, SubSocket};
use zeromq::{DealerSocket, Socket, SocketRecv, SocketSend, SubSocket};
use crate::protocols::{RouterEvent, WorkerId};
use crate::zmq_wire::{KvEventBatch, convert_event};
use super::indexer::Indexer;
use super::registry::ListenerRecord;
const INITIAL_BACKOFF_MS: u64 = 10;
const MAX_BACKOFF_MS: u64 = 5000;
......@@ -28,17 +29,11 @@ fn calculate_backoff_ms(consecutive_errors: u32) -> u64 {
)
}
/// Sentinel value for `watermark`: indicates no batch has been processed yet.
const WATERMARK_UNSET: u64 = u64::MAX;
/// Replay missed batches from the engine's ROUTER socket.
///
/// Uses a DEALER socket (no send/recv lockstep) to send one request and
/// receive multiple response frames. Each response is `[empty, seq, payload]`;
/// an empty payload signals end of replay.
#[expect(clippy::too_many_arguments)]
async fn replay_gap(
replay_socket: &mut zeromq::DealerSocket,
replay_socket: &mut DealerSocket,
start_seq: u64,
end_seq: u64,
worker_id: WorkerId,
......@@ -56,7 +51,6 @@ async fn replay_gap(
"Requesting replay from engine"
);
// DEALER must manually prepend the empty delimiter that REQ adds automatically.
let req_frames = vec![Bytes::new(), Bytes::from(start_seq.to_be_bytes().to_vec())];
let Ok(req_msg) = zeromq::ZmqMessage::try_from(req_frames) else {
tracing::error!(worker_id, dp_rank, "Failed to build replay request");
......@@ -73,8 +67,6 @@ async fn replay_gap(
tracing::error!(worker_id, dp_rank, "Replay recv error");
break;
};
// ROUTER sends [identity, empty, seq, payload]; DEALER strips identity,
// so we receive [empty, seq, payload].
if msg.len() < 3 {
tracing::warn!(
worker_id,
......@@ -85,12 +77,12 @@ async fn replay_gap(
break;
}
let payload = msg.get(2).unwrap();
let payload = msg.get(2).expect("frame count checked above");
if payload.is_empty() {
break;
}
let seq_bytes = msg.get(1).unwrap();
let seq_bytes = msg.get(1).expect("frame count checked above");
if seq_bytes.len() != 8 {
tracing::warn!(
worker_id,
......@@ -100,7 +92,7 @@ async fn replay_gap(
);
break;
}
let seq = u64::from_be_bytes(seq_bytes[..8].try_into().unwrap());
let seq = u64::from_be_bytes(seq_bytes[..8].try_into().expect("length checked above"));
let Ok(batch) = rmps::from_slice::<KvEventBatch>(payload) else {
tracing::warn!(worker_id, dp_rank, seq, "Failed to decode replayed batch");
......@@ -109,7 +101,7 @@ async fn replay_gap(
let effective_dp_rank = batch
.data_parallel_rank
.map_or(dp_rank, |r| r.cast_unsigned());
.map_or(dp_rank, |rank| rank.cast_unsigned());
for raw_event in batch.events {
let kv_event =
convert_event(raw_event, seq, block_size, effective_dp_rank, warning_count);
......@@ -124,106 +116,86 @@ async fn replay_gap(
replayed
}
// TODO: assumes one dp_rank per ZMQ socket. Seq counter is per-socket so gap
// detection works regardless, but replay semantics may differ if a single
// socket multiplexes dp_ranks.
/// Connect the ZMQ SUB socket, then spawn a background task that waits for
/// the ready signal before entering the recv loop.
///
/// Returns once the SUB socket is connected (subscription handshake begins
/// immediately in the background). The ready gate and recv loop run in a
/// spawned task so `register()` is never blocked waiting for `signal_ready()`.
#[expect(clippy::too_many_arguments)]
pub async fn run_zmq_listener(
pub fn spawn_zmq_listener(
worker_id: WorkerId,
dp_rank: u32,
zmq_address: String,
block_size: u32,
indexer: Indexer,
cancel: CancellationToken,
record: Arc<ListenerRecord>,
ready: watch::Receiver<bool>,
replay_endpoint: Option<String>,
watermark: Arc<AtomicU64>,
generation: u64,
cancel: CancellationToken,
) {
tracing::info!(worker_id, dp_rank, zmq_address, "ZMQ listener starting");
let mut socket = SubSocket::new();
if let Err(e) = socket.subscribe("").await {
tracing::error!("Failed to subscribe on ZMQ socket: {e}");
return;
}
if let Err(e) = socket.connect(&zmq_address).await {
tracing::error!("Failed to connect ZMQ SUB socket to {zmq_address}: {e}");
return;
}
// Spawn the ready-wait + recv loop so the caller returns immediately.
// The ZMQ subscription handshake proceeds in the background while P2P
// recovery runs; once signal_ready() fires the recv loop starts draining
// any buffered messages.
tokio::spawn(zmq_wait_ready_then_recv(
tokio::spawn(async move {
if let Err(error) = run_listener(
worker_id,
dp_rank,
block_size,
indexer,
cancel,
record.clone(),
ready,
socket,
replay_endpoint,
watermark,
));
generation,
cancel,
)
.await
{
tracing::error!(worker_id, dp_rank, error = %error, "ZMQ listener failed");
record.try_mark_failed(generation, error);
}
});
}
#[expect(clippy::too_many_arguments)]
async fn zmq_wait_ready_then_recv(
async fn run_listener(
worker_id: WorkerId,
dp_rank: u32,
block_size: u32,
indexer: Indexer,
cancel: CancellationToken,
record: Arc<ListenerRecord>,
mut ready: watch::Receiver<bool>,
socket: SubSocket,
replay_endpoint: Option<String>,
watermark: Arc<AtomicU64>,
) {
// Wait for the ready signal before entering the recv loop.
// During P2P recovery, this delay lets the recovery code fetch the dump
// from a peer while ZMQ subscription handshakes complete in the background.
generation: u64,
cancel: CancellationToken,
) -> Result<(), String> {
let endpoint = record.endpoint().to_string();
let replay_endpoint = record.replay_endpoint().map(str::to_string);
let block_size = record.block_size();
let indexer = record.indexer();
let watermark = record.watermark();
tracing::info!(worker_id, dp_rank, endpoint, "ZMQ listener starting");
if cancel.is_cancelled() {
return Ok(());
}
let mut socket = SubSocket::new();
socket
.subscribe("")
.await
.map_err(|e| format!("failed to subscribe on ZMQ socket: {e}"))?;
tokio::select! {
biased;
_ = cancel.cancelled() => {
tracing::info!(worker_id, dp_rank, "ZMQ listener cancelled before ready");
return;
_ = cancel.cancelled() => return Ok(()),
result = socket.connect(&endpoint) => {
result.map_err(|e| format!("failed to connect ZMQ SUB socket to {endpoint}: {e}"))?;
}
result = ready.wait_for(|&v| v) => {
if result.is_err() {
tracing::error!(worker_id, dp_rank, "Ready channel closed before signaling");
return;
}
tokio::select! {
_ = cancel.cancelled() => return Ok(()),
result = ready.wait_for(|&value| value) => {
result.map_err(|_| "ready channel closed before signaling".to_string())?;
}
}
tracing::info!(worker_id, dp_rank, "ZMQ listener ready, starting recv loop");
// Connect DEALER socket once if replay_endpoint is configured.
// DEALER (not REQ) because we send one request and receive multiple responses.
let mut replay_socket = None;
if let Some(ref ep) = replay_endpoint {
let mut sock = zeromq::DealerSocket::new();
if let Err(e) = sock.connect(ep).await {
tracing::error!(worker_id, dp_rank, error = %e, "Failed to connect replay socket to {ep}");
} else {
tracing::info!(
if !record.try_mark_active(generation) {
tracing::debug!(
worker_id,
dp_rank,
replay_endpoint = ep,
"Replay socket connected"
"Listener attempt is stale after readiness gate; exiting"
);
replay_socket = Some(sock);
return Ok(());
}
tracing::info!(worker_id, dp_rank, "ZMQ listener ready, starting recv loop");
let replay_socket =
connect_replay_socket(worker_id, dp_rank, replay_endpoint.as_deref(), &cancel).await;
if cancel.is_cancelled() || !record.is_current_attempt(generation) {
return Ok(());
}
zmq_recv_loop(
......@@ -236,7 +208,43 @@ async fn zmq_wait_ready_then_recv(
replay_socket,
watermark,
)
.await;
.await
}
async fn connect_replay_socket(
worker_id: WorkerId,
dp_rank: u32,
replay_endpoint: Option<&str>,
cancel: &CancellationToken,
) -> Option<DealerSocket> {
let endpoint = replay_endpoint?;
let mut socket = DealerSocket::new();
tokio::select! {
_ = cancel.cancelled() => None,
result = socket.connect(endpoint) => {
match result {
Ok(()) => {
tracing::info!(
worker_id,
dp_rank,
replay_endpoint = endpoint,
"Replay socket connected"
);
Some(socket)
}
Err(e) => {
tracing::error!(
worker_id,
dp_rank,
error = %e,
"Failed to connect replay socket to {endpoint}"
);
None
}
}
}
}
}
#[expect(clippy::too_many_arguments)]
......@@ -247,112 +255,127 @@ async fn zmq_recv_loop(
indexer: Indexer,
cancel: CancellationToken,
mut socket: SubSocket,
mut replay_socket: Option<zeromq::DealerSocket>,
mut replay_socket: Option<DealerSocket>,
watermark: Arc<AtomicU64>,
) {
) -> Result<(), String> {
let warning_count = Arc::new(AtomicU32::new(0));
let mut consecutive_errors = 0u32;
#[expect(unused_assignments)]
let mut exit_reason = "unknown";
let mut messages_processed = 0u64;
'main: loop {
loop {
tokio::select! {
biased;
_ = cancel.cancelled() => {
exit_reason = "cancelled";
break 'main;
tracing::info!(
worker_id,
dp_rank,
messages_processed,
"ZMQ listener exiting after cancellation"
);
return Ok(());
}
msg_result = socket.recv() => {
let Ok(msg) = msg_result else {
let e = msg_result.unwrap_err();
let msg = match msg_result {
Ok(msg) => msg,
Err(e) => {
consecutive_errors += 1;
if consecutive_errors >= MAX_CONSECUTIVE_ERRORS {
tracing::error!(
error=%e,
consecutive_errors,
worker_id,
"Too many consecutive ZMQ errors, terminating listener"
);
exit_reason = "too many consecutive errors";
break 'main;
return Err(format!(
"too many consecutive ZMQ recv errors for worker {worker_id} dp_rank {dp_rank}: {e}"
));
}
let backoff_ms = calculate_backoff_ms(consecutive_errors);
tracing::warn!(
error=%e,
error = %e,
consecutive_errors,
backoff_ms,
worker_id,
dp_rank,
"ZMQ recv error, backing off"
);
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
continue;
}
};
consecutive_errors = 0;
if msg.len() != 3 {
tracing::warn!(worker_id, "Unexpected ZMQ frame count: {}", msg.len());
tracing::warn!(worker_id, dp_rank, "Unexpected ZMQ frame count: {}", msg.len());
continue;
}
let seq_bytes = msg.get(1).unwrap();
let seq_bytes = msg.get(1).expect("frame count checked above");
if seq_bytes.len() != 8 {
tracing::warn!(worker_id, "Invalid sequence number length: {}", seq_bytes.len());
tracing::warn!(
worker_id,
dp_rank,
"Invalid sequence number length: {}",
seq_bytes.len()
);
continue;
}
let seq = u64::from_be_bytes(seq_bytes[..8].try_into().unwrap());
let seq = u64::from_be_bytes(seq_bytes[..8].try_into().expect("length checked above"));
// Gap detection
let prev = watermark.load(Ordering::Acquire);
if prev != WATERMARK_UNSET && seq > prev + 1 {
let gap_start = prev + 1;
tracing::warn!(
worker_id, dp_rank,
expected = gap_start, got = seq,
worker_id,
dp_rank,
expected = gap_start,
got = seq,
"Gap detected: expected seq {gap_start}, got {seq}"
);
match replay_socket.as_mut() {
Some(sock) => {
Some(socket) => {
replay_gap(
sock, gap_start, seq, worker_id, dp_rank,
block_size, &indexer, &warning_count, &watermark,
).await;
socket,
gap_start,
seq,
worker_id,
dp_rank,
block_size,
&indexer,
&warning_count,
&watermark,
)
.await;
}
None => tracing::warn!(
worker_id, dp_rank,
gap_size = seq - gap_start,
"No replay endpoint configured, {gap_size} batches lost",
worker_id,
dp_rank,
gap_size = seq - gap_start,
"No replay endpoint configured; batches lost"
),
}
}
// After replay, watermark may have advanced past the current
// batch — skip to avoid double-apply. Exclude the sentinel
// (WATERMARK_UNSET) so the very first message is not skipped.
let current_wm = watermark.load(Ordering::Acquire);
if current_wm != WATERMARK_UNSET && current_wm >= seq {
continue;
}
let payload = msg.get(2).unwrap();
let batch_result = rmps::from_slice::<KvEventBatch>(payload);
let Ok(batch) = batch_result else {
tracing::warn!(worker_id, "Failed to decode KvEventBatch: {}", batch_result.unwrap_err());
let payload = msg.get(2).expect("frame count checked above");
let batch = match rmps::from_slice::<KvEventBatch>(payload) {
Ok(batch) => batch,
Err(error) => {
tracing::warn!(worker_id, dp_rank, "Failed to decode KvEventBatch: {error}");
continue;
}
};
let effective_dp_rank = batch.data_parallel_rank.map_or(dp_rank, |r| r.cast_unsigned());
// Use the engine's ZMQ sequence number as event_id so downstream
// consumers can detect gaps and request replay.
let effective_dp_rank = batch
.data_parallel_rank
.map_or(dp_rank, |rank| rank.cast_unsigned());
for raw_event in batch.events {
let kv_event = convert_event(raw_event, seq, block_size, effective_dp_rank, &warning_count);
let kv_event =
convert_event(raw_event, seq, block_size, effective_dp_rank, &warning_count);
let router_event = RouterEvent::new(worker_id, kv_event);
indexer.apply_event(router_event).await;
messages_processed += 1;
......@@ -361,14 +384,6 @@ async fn zmq_recv_loop(
}
}
}
tracing::info!(
worker_id,
dp_rank,
exit_reason,
messages_processed,
"ZMQ listener exiting"
);
}
#[cfg(test)]
......
......@@ -9,12 +9,28 @@ use std::time::Instant;
#[cfg(feature = "metrics")]
use axum::{extract::MatchedPath, http::Request, middleware::Next, response::Response};
#[cfg(feature = "metrics")]
use dynamo_runtime::metrics::prometheus_names::{kvindexer, name_prefix};
#[cfg(feature = "metrics")]
use prometheus::{
HistogramVec, IntCounterVec, IntGauge, Opts, exponential_buckets, histogram_opts,
HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Opts, exponential_buckets, histogram_opts,
};
#[cfg(feature = "metrics")]
use super::registry::ListenerStatus;
#[cfg(feature = "metrics")]
const METRICS_PREFIX: &str = "dynamo_kvindexer";
#[cfg(feature = "metrics")]
const REQUEST_DURATION_SECONDS: &str = "request_duration_seconds";
#[cfg(feature = "metrics")]
const REQUESTS_TOTAL: &str = "requests_total";
#[cfg(feature = "metrics")]
const ERRORS_TOTAL: &str = "errors_total";
#[cfg(feature = "metrics")]
const MODELS: &str = "models";
#[cfg(feature = "metrics")]
const WORKERS: &str = "workers";
#[cfg(feature = "metrics")]
const LISTENERS: &str = "listeners";
#[cfg(feature = "metrics")]
pub struct StandaloneIndexerMetrics {
pub request_duration: HistogramVec,
......@@ -22,15 +38,14 @@ pub struct StandaloneIndexerMetrics {
pub errors_total: IntCounterVec,
pub models: IntGauge,
pub workers: IntGauge,
pub listeners: IntGaugeVec,
}
#[cfg(feature = "metrics")]
static METRICS: LazyLock<StandaloneIndexerMetrics> = LazyLock::new(|| {
let prefix = name_prefix::KVINDEXER;
StandaloneIndexerMetrics {
static METRICS: LazyLock<StandaloneIndexerMetrics> = LazyLock::new(|| StandaloneIndexerMetrics {
request_duration: HistogramVec::new(
histogram_opts!(
format!("{prefix}_{}", kvindexer::REQUEST_DURATION_SECONDS),
format!("{METRICS_PREFIX}_{REQUEST_DURATION_SECONDS}"),
"HTTP request latency",
exponential_buckets(0.0001, 2.0, 20).expect("valid bucket params")
),
......@@ -39,7 +54,7 @@ static METRICS: LazyLock<StandaloneIndexerMetrics> = LazyLock::new(|| {
.expect("valid histogram"),
requests_total: IntCounterVec::new(
Opts::new(
format!("{prefix}_{}", kvindexer::REQUESTS_TOTAL),
format!("{METRICS_PREFIX}_{REQUESTS_TOTAL}"),
"Total HTTP requests",
),
&["endpoint", "method"],
......@@ -47,23 +62,30 @@ static METRICS: LazyLock<StandaloneIndexerMetrics> = LazyLock::new(|| {
.expect("valid counter"),
errors_total: IntCounterVec::new(
Opts::new(
format!("{prefix}_{}", kvindexer::ERRORS_TOTAL),
format!("{METRICS_PREFIX}_{ERRORS_TOTAL}"),
"HTTP error responses (4xx/5xx)",
),
&["endpoint", "status_class"],
)
.expect("valid counter"),
models: IntGauge::new(
format!("{prefix}_{}", kvindexer::MODELS),
format!("{METRICS_PREFIX}_{MODELS}"),
"Number of active model+tenant indexers",
)
.expect("valid gauge"),
workers: IntGauge::new(
format!("{prefix}_{}", kvindexer::WORKERS),
format!("{METRICS_PREFIX}_{WORKERS}"),
"Number of registered worker instances",
)
.expect("valid gauge"),
}
listeners: IntGaugeVec::new(
Opts::new(
format!("{METRICS_PREFIX}_{LISTENERS}"),
"Number of ZMQ listeners by status",
),
&["status"],
)
.expect("valid gauge"),
});
#[cfg(feature = "metrics")]
......@@ -74,6 +96,7 @@ pub fn register(registry: &prometheus::Registry) -> Result<(), prometheus::Error
registry.register(Box::new(m.errors_total.clone()))?;
registry.register(Box::new(m.models.clone()))?;
registry.register(Box::new(m.workers.clone()))?;
registry.register(Box::new(m.listeners.clone()))?;
Ok(())
}
......@@ -106,28 +129,20 @@ pub async fn metrics_middleware(req: Request<axum::body::Body>, next: Next) -> R
}
#[cfg(feature = "metrics")]
pub fn inc_models() {
METRICS.models.inc();
}
#[cfg(not(feature = "metrics"))]
pub fn inc_models() {}
#[cfg(feature = "metrics")]
pub fn inc_workers() {
METRICS.workers.inc();
}
#[cfg(not(feature = "metrics"))]
pub fn inc_workers() {}
#[cfg(feature = "metrics")]
pub fn dec_workers() {
METRICS.workers.dec();
pub fn set_worker_state(models: usize, workers: usize, listener_counts: [i64; 4]) {
METRICS.models.set(models as i64);
METRICS.workers.set(workers as i64);
for status in ListenerStatus::ALL {
METRICS
.listeners
.with_label_values(&[status.as_str()])
.set(listener_counts[status.metric_index()]);
}
}
#[cfg(not(feature = "metrics"))]
pub fn dec_workers() {}
pub fn set_worker_state(_models: usize, _workers: usize, _listener_counts: [i64; 4]) {}
#[cfg(all(test, feature = "metrics"))]
mod tests {
......@@ -139,10 +154,7 @@ mod tests {
let registry = prometheus::Registry::new();
register(&registry).expect("registration should succeed");
inc_models();
inc_workers();
inc_workers();
dec_workers();
set_worker_state(1, 2, [1, 1, 0, 0]);
let encoder = prometheus::TextEncoder::new();
let mut buf = Vec::new();
......@@ -153,6 +165,8 @@ mod tests {
assert!(output.contains("dynamo_kvindexer_requests_total"));
assert!(output.contains("dynamo_kvindexer_errors_total"));
assert!(output.contains("dynamo_kvindexer_models 1"));
assert!(output.contains("dynamo_kvindexer_workers 1"));
assert!(output.contains("dynamo_kvindexer_workers 2"));
assert!(output.contains("dynamo_kvindexer_listeners{status=\"pending\"} 1"));
assert!(output.contains("dynamo_kvindexer_listeners{status=\"active\"} 1"));
}
}
......@@ -6,11 +6,14 @@ pub mod listener;
pub mod metrics;
pub mod recovery;
pub mod registry;
#[cfg(feature = "indexer-runtime")]
pub mod runtime;
pub mod server;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio_util::sync::CancellationToken;
use registry::WorkerRegistry;
use server::{AppState, create_router};
......@@ -25,23 +28,76 @@ pub struct IndexerConfig {
pub peers: Option<String>,
}
pub fn parse_workers(s: &str) -> Vec<(u64, u32, String)> {
s.split(',')
.filter(|entry| !entry.is_empty())
.filter_map(|entry| {
let (id_part, addr) = entry.split_once('=')?;
#[cfg(feature = "indexer-runtime")]
pub struct RuntimeConfig {
pub namespace: String,
pub component_name: String,
pub worker_component: String,
}
pub(super) fn validate_zmq_endpoint(endpoint: &str) -> anyhow::Result<()> {
endpoint
.parse::<zeromq::Endpoint>()
.map(|_| ())
.map_err(|error| anyhow::anyhow!("invalid ZMQ endpoint `{endpoint}`: {error}"))
}
pub(super) fn validate_listener_endpoints(
endpoint: &str,
replay_endpoint: Option<&str>,
) -> anyhow::Result<()> {
validate_zmq_endpoint(endpoint)?;
if let Some(replay_endpoint) = replay_endpoint {
validate_zmq_endpoint(replay_endpoint).map_err(|error| {
anyhow::anyhow!("invalid replay endpoint `{replay_endpoint}`: {error}")
})?;
}
Ok(())
}
pub fn parse_workers(s: &str) -> anyhow::Result<Vec<(u64, u32, String)>> {
let mut workers = Vec::new();
for entry in s.split(',').filter(|entry| !entry.trim().is_empty()) {
let (id_part, addr) = entry.split_once('=').ok_or_else(|| {
anyhow::anyhow!("invalid worker entry `{entry}`; expected worker_id[:dp_rank]=endpoint")
})?;
let id_part = id_part.trim();
let (id, dp_rank) = if let Some((id_str, rank_str)) = id_part.split_once(':') {
(id_str.parse::<u64>().ok()?, rank_str.parse::<u32>().ok()?)
let (instance_id, dp_rank) = if let Some((id_str, rank_str)) = id_part.split_once(':') {
(
id_str
.parse::<u64>()
.map_err(|error| anyhow::anyhow!("invalid worker id in `{entry}`: {error}"))?,
rank_str
.parse::<u32>()
.map_err(|error| anyhow::anyhow!("invalid dp_rank in `{entry}`: {error}"))?,
)
} else {
(id_part.parse::<u64>().ok()?, 0)
(
id_part
.parse::<u64>()
.map_err(|error| anyhow::anyhow!("invalid worker id in `{entry}`: {error}"))?,
0,
)
};
Some((id, dp_rank, addr.trim().to_string()))
})
.collect()
let endpoint = addr.trim().to_string();
validate_zmq_endpoint(&endpoint)?;
workers.push((instance_id, dp_rank, endpoint));
}
Ok(workers)
}
pub async fn run_server(config: IndexerConfig) -> anyhow::Result<()> {
let cancel_token = CancellationToken::new();
let shutdown_token = cancel_token.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok();
tracing::info!("Received shutdown signal");
shutdown_token.cancel();
});
let peers: Vec<String> = config
.peers
.as_deref()
......@@ -60,16 +116,98 @@ pub async fn run_server(config: IndexerConfig) -> anyhow::Result<()> {
model_name = %config.model_name,
tenant_id = %config.tenant_id,
num_peers = peers.len(),
"Starting standalone KV cache indexer"
"Starting standalone KV cache indexer (HTTP-only mode)"
);
let registry = WorkerRegistry::new(config.threads);
let registry = Arc::new(WorkerRegistry::new(config.threads));
run_common(&config, &registry, cancel_token).await
}
#[cfg(feature = "indexer-runtime")]
pub async fn run_with_runtime(
runtime: dynamo_runtime::Runtime,
config: IndexerConfig,
runtime_config: RuntimeConfig,
) -> anyhow::Result<()> {
use dynamo_runtime::{
DistributedRuntime,
pipeline::{ManyOut, SingleIn, network::Ingress},
};
use crate::indexer::{IndexerQueryRequest, IndexerQueryResponse, KV_INDEXER_QUERY_ENDPOINT};
let distributed_runtime = DistributedRuntime::from_settings(runtime).await?;
let cancel_token = distributed_runtime.primary_token();
let component = distributed_runtime
.namespace(&runtime_config.namespace)?
.component(&runtime_config.component_name)?;
tracing::info!(
namespace = %runtime_config.namespace,
component = %runtime_config.component_name,
block_size = ?config.block_size,
port = config.port,
threads = config.threads,
model_name = %config.model_name,
tenant_id = %config.tenant_id,
worker_component = %runtime_config.worker_component,
num_peers = config.peers.as_ref().map(|p| p.split(',').count()).unwrap_or(0),
"Starting standalone KV cache indexer (Dynamo runtime mode)"
);
let registry = Arc::new(WorkerRegistry::new(config.threads));
let engine = Arc::new(runtime::query_engine::IndexerQueryEngine {
registry: registry.clone(),
});
let ingress =
Ingress::<SingleIn<IndexerQueryRequest>, ManyOut<IndexerQueryResponse>>::for_engine(
engine,
)?;
let query_endpoint = component
.endpoint(KV_INDEXER_QUERY_ENDPOINT)
.endpoint_builder()
.handler(ingress)
.graceful_shutdown(true);
distributed_runtime.runtime().secondary().spawn(async move {
if let Err(err) = query_endpoint.start().await {
tracing::error!(error = %err, "Query endpoint failed");
}
});
tracing::info!(
endpoint = KV_INDEXER_QUERY_ENDPOINT,
"Query endpoint registered"
);
runtime::discovery::spawn_discovery_watcher(
&distributed_runtime,
registry.clone(),
cancel_token.clone(),
)
.await?;
runtime::subscriber::spawn_event_subscriber(
&distributed_runtime,
&runtime_config.namespace,
&runtime_config.worker_component,
registry.clone(),
cancel_token.clone(),
)
.await?;
run_common(&config, &registry, cancel_token).await
}
async fn run_common(
config: &IndexerConfig,
registry: &Arc<WorkerRegistry>,
cancel_token: CancellationToken,
) -> anyhow::Result<()> {
if let Some(ref workers_str) = config.workers {
let block_size = config.block_size.ok_or_else(|| {
anyhow::anyhow!("--block-size is required when --workers is specified")
})?;
for (instance_id, dp_rank, endpoint) in parse_workers(workers_str) {
for (instance_id, dp_rank, endpoint) in parse_workers(workers_str)? {
tracing::info!(instance_id, dp_rank, endpoint, "Registering initial worker");
registry
.register(
......@@ -85,8 +223,19 @@ pub async fn run_server(config: IndexerConfig) -> anyhow::Result<()> {
}
}
let peers: Vec<String> = config
.peers
.as_deref()
.map(|s| {
s.split(',')
.filter(|p| !p.is_empty())
.map(|p| p.trim().to_string())
.collect()
})
.unwrap_or_default();
if !peers.is_empty() {
match recovery::recover_from_peers(&peers, &registry).await {
match recovery::recover_from_peers(&peers, registry).await {
Ok(true) => tracing::info!("P2P recovery completed"),
Ok(false) => tracing::warn!("no reachable peers, starting with empty state"),
Err(e) => tracing::warn!(error = %e, "P2P recovery failed, starting with empty state"),
......@@ -106,7 +255,7 @@ pub async fn run_server(config: IndexerConfig) -> anyhow::Result<()> {
};
let state = Arc::new(AppState {
registry: Arc::new(registry),
registry: registry.clone(),
#[cfg(feature = "metrics")]
prom_registry,
});
......@@ -114,7 +263,12 @@ pub async fn run_server(config: IndexerConfig) -> anyhow::Result<()> {
let app = create_router(state);
let listener = TcpListener::bind(("0.0.0.0", config.port)).await?;
tracing::info!("HTTP server listening on 0.0.0.0:{}", config.port);
axum::serve(listener, app).await?;
axum::serve(listener, app)
.with_graceful_shutdown(async move {
cancel_token.cancelled().await;
tracing::info!("Received shutdown signal, stopping HTTP server");
})
.await?;
Ok(())
}
......@@ -126,7 +280,7 @@ mod tests {
#[test]
fn test_parse_workers() {
let input = "1=tcp://host:5557,2:1=tcp://host:5558";
let result = parse_workers(input);
let result = parse_workers(input).unwrap();
assert_eq!(result.len(), 2);
assert_eq!(result[0], (1, 0, "tcp://host:5557".to_string()));
assert_eq!(result[1], (2, 1, "tcp://host:5558".to_string()));
......@@ -134,6 +288,12 @@ mod tests {
#[test]
fn test_parse_workers_empty() {
assert!(parse_workers("").is_empty());
assert!(parse_workers("").unwrap().is_empty());
}
#[test]
fn test_parse_workers_invalid_entry() {
let error = parse_workers("1").unwrap_err().to_string();
assert!(error.contains("invalid worker entry"));
}
}
......@@ -4,7 +4,6 @@
use std::sync::Arc;
use dynamo_runtime::stream::StreamExt;
use dynamo_runtime::{
DistributedRuntime,
discovery::{
......@@ -14,10 +13,8 @@ use dynamo_runtime::{
use serde::Deserialize;
use tokio_util::sync::CancellationToken;
use dynamo_kv_router::standalone_indexer::registry::WorkerRegistry;
use crate::standalone_indexer::registry::WorkerRegistry;
/// Minimal subset of ModelDeploymentCard — only the fields the indexer needs.
/// Using `#[serde(default)]` on optional fields lets us safely ignore the rest.
#[derive(Deserialize, Debug)]
struct PartialModelCard {
pub display_name: String,
......@@ -25,8 +22,6 @@ struct PartialModelCard {
pub kv_cache_block_size: u32,
}
/// Spawn a background task that watches MDC discovery for worker additions/removals
/// and updates the WorkerRegistry accordingly.
pub async fn spawn_discovery_watcher(
drt: &DistributedRuntime,
registry: Arc<WorkerRegistry>,
......@@ -71,7 +66,6 @@ pub async fn spawn_discovery_watcher(
let model_name = card.display_name.clone();
let block_size = card.kv_cache_block_size;
// Use the Dynamo namespace as the tenant_id
let tenant_id = namespace;
if block_size == 0 {
......@@ -91,7 +85,7 @@ pub async fn spawn_discovery_watcher(
"Discovery: adding worker"
);
if let Err(e) = registry.add_worker_from_discovery(
if let Err(err) = registry.add_worker_from_discovery(
instance_id,
model_name.clone(),
tenant_id,
......@@ -100,7 +94,7 @@ pub async fn spawn_discovery_watcher(
tracing::error!(
instance_id,
model_name,
error = %e,
error = %err,
"Failed to add discovered worker"
);
}
......
......@@ -4,20 +4,14 @@
use std::sync::Arc;
use anyhow::Result;
use dynamo_runtime::stream;
use dynamo_runtime::pipeline::{
AsyncEngine, AsyncEngineContextProvider, ManyOut, ResponseStream, SingleIn, async_trait,
};
use dynamo_runtime::stream;
use dynamo_kv_router::indexer::{IndexerQueryRequest, IndexerQueryResponse};
use dynamo_kv_router::standalone_indexer::registry::{IndexerKey, WorkerRegistry};
use crate::indexer::{IndexerQueryRequest, IndexerQueryResponse};
use crate::standalone_indexer::registry::{IndexerKey, WorkerRegistry};
/// AsyncEngine that serves indexer queries over the request plane.
///
/// When a frontend sends an `IndexerQueryRequest` (model_name, namespace, block hashes),
/// this engine finds the appropriate indexer in the registry and returns overlap scores.
pub struct IndexerQueryEngine {
pub registry: Arc<WorkerRegistry>,
}
......@@ -31,16 +25,15 @@ impl AsyncEngine<SingleIn<IndexerQueryRequest>, ManyOut<IndexerQueryResponse>, a
request: SingleIn<IndexerQueryRequest>,
) -> Result<ManyOut<IndexerQueryResponse>> {
let (req, ctx) = request.into_parts();
let key = IndexerKey {
model_name: req.model_name.clone(),
tenant_id: req.namespace.clone(),
};
let response = match self.registry.get_indexer(&key) {
Some(ie) => match ie.indexer.find_matches(req.block_hashes).await {
Some(entry) => match entry.indexer.find_matches(req.block_hashes).await {
Ok(scores) => IndexerQueryResponse::Scores(scores.into()),
Err(e) => IndexerQueryResponse::Error(e.to_string()),
Err(err) => IndexerQueryResponse::Error(err.to_string()),
},
None => IndexerQueryResponse::Error(format!(
"no indexer for model={} namespace={}",
......@@ -48,7 +41,10 @@ impl AsyncEngine<SingleIn<IndexerQueryRequest>, ManyOut<IndexerQueryResponse>, a
)),
};
let resp_stream = stream::iter(vec![response]);
Ok(ResponseStream::new(Box::pin(resp_stream), ctx.context()))
let response_stream = stream::iter(vec![response]);
Ok(ResponseStream::new(
Box::pin(response_stream),
ctx.context(),
))
}
}
......@@ -10,12 +10,9 @@ use dynamo_runtime::{
DistributedRuntime, discovery::EventTransportKind, transports::event_plane::EventSubscriber,
};
use dynamo_kv_router::protocols::{KV_EVENT_SUBJECT, RouterEvent};
use crate::protocols::{KV_EVENT_SUBJECT, RouterEvent};
use crate::standalone_indexer::registry::WorkerRegistry;
use dynamo_kv_router::standalone_indexer::registry::WorkerRegistry;
/// Spawn a background task that subscribes to KV events from the worker component
/// via the event plane and applies them to the appropriate indexer in the registry.
pub async fn spawn_event_subscriber(
drt: &DistributedRuntime,
namespace: &str,
......@@ -24,10 +21,7 @@ pub async fn spawn_event_subscriber(
cancel_token: CancellationToken,
) -> Result<()> {
let transport_kind = EventTransportKind::from_env_or_default();
// Create a Component reference for the worker component to subscribe to its events.
let worker_component = drt.namespace(namespace)?.component(worker_component_name)?;
let mut subscriber = EventSubscriber::for_component_with_transport(
&worker_component,
KV_EVENT_SUBJECT,
......@@ -69,17 +63,13 @@ pub async fn spawn_event_subscriber(
Some(result) = subscriber.next() => {
let (_envelope, event) = match result {
Ok((envelope, event)) => (envelope, event),
Err(e) => {
tracing::warn!("Failed to receive RouterEvent from event plane: {e:?}");
Err(err) => {
tracing::warn!("Failed to receive RouterEvent from event plane: {err:?}");
continue;
}
};
let worker_id = event.worker_id;
// Apply the event to the indexer that tracks this worker.
// If the worker was discovered via MDC, it will be in the registry.
// If it was registered via --workers CLI, the indexer also exists.
if let Some(indexer) = registry.get_indexer_for_worker(worker_id) {
indexer.apply_event(event).await;
} else {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment