Unverified Commit 48eb52e7 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

feat(kv-router): P2P recovery for standalone KV indexer [DYN-2367] (#6934)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
Co-authored-by: default avatarClaude Opus 4.6 (1M context) <noreply@anthropic.com>
parent fab07d1c
...@@ -1952,6 +1952,7 @@ dependencies = [ ...@@ -1952,6 +1952,7 @@ dependencies = [
"plotters", "plotters",
"prometheus", "prometheus",
"rand 0.9.2", "rand 0.9.2",
"reqwest 0.12.28",
"rmp-serde", "rmp-serde",
"rstest 0.18.2", "rstest 0.18.2",
"rstest_reuse", "rstest_reuse",
......
...@@ -32,6 +32,47 @@ The standalone indexer works with any engine that publishes KV cache events over ...@@ -32,6 +32,47 @@ The standalone indexer works with any engine that publishes KV cache events over
- **Custom routing**: Build external routing logic that queries the indexer for overlap scores and makes its own worker selection decisions. - **Custom routing**: Build external routing logic that queries the indexer for overlap scores and makes its own worker selection decisions.
- **Monitoring**: Observe KV cache distribution across workers without running a full router. - **Monitoring**: Observe KV cache distribution across workers without running a full router.
## P2P Recovery
Multiple indexer replicas can subscribe to the same ZMQ worker endpoints for fault tolerance. When a replica starts (or restarts after a crash), it bootstraps its radix tree state from a healthy peer before processing live events.
### How It Works
1. Workers are registered via `--workers` CLI, which connects ZMQ SUB sockets immediately.
2. A 1-second delay ensures the peer's tree state has advanced past the ZMQ connection point, so the dump covers any events that would otherwise be lost to the slow-joiner window.
3. The indexer fetches a `/dump` from the first reachable peer in `--peers`.
4. Dump events are applied to populate the radix tree.
5. ZMQ listeners are unblocked and begin draining any events that buffered during recovery.
If no peers are reachable, the indexer starts with an empty state.
### Example: Two-Replica Setup
```bash
# Replica A (first instance, no peers)
dynamo-kv-indexer --port 8090 --block-size 16 \
--workers "1=tcp://worker1:5557,2=tcp://worker2:5558"
# Replica B (recovers from A on startup)
dynamo-kv-indexer --port 8091 --block-size 16 \
--workers "1=tcp://worker1:5557,2=tcp://worker2:5558" \
--peers "http://localhost:8090"
```
Both replicas subscribe to the same workers. Replica B recovers A's tree state on startup, then both independently process live ZMQ events going forward.
### Consistency
The dump is a weakly consistent BFS snapshot of the radix tree — concurrent writes may race with the traversal. This is acceptable because:
- **Stale blocks** (partially removed branches): live `Remove` events will clean them up.
- **Missing blocks** (partially added branches): live `Stored` events will add them.
- The tree converges to the correct state after live events catch up.
### Peer Management
Peers can be registered at startup via `--peers` or dynamically via the HTTP API. The peer list is used for recovery only — peers do not synchronize state in real time.
## Building ## Building
The binary is a feature-gated target in the `dynamo-kv-router` crate: The binary is a feature-gated target in the `dynamo-kv-router` crate:
...@@ -43,17 +84,18 @@ cargo build -p dynamo-kv-router --features indexer-bin --bin dynamo-kv-indexer ...@@ -43,17 +84,18 @@ cargo build -p dynamo-kv-router --features indexer-bin --bin dynamo-kv-indexer
## CLI ## CLI
```bash ```bash
dynamo-kv-indexer --port 8090 [--threads 1] [--block-size 16 --model-name my-model --tenant-id default --workers "1=tcp://host:5557,2=tcp://host:5558"] dynamo-kv-indexer --port 8090 [--threads 4] [--block-size 16 --model-name my-model --tenant-id default --workers "1=tcp://host:5557,2:1=tcp://host:5558"] [--peers "http://peer1:8090,http://peer2:8091"]
``` ```
| Flag | Default | Description | | Flag | Default | Description |
|------|---------|-------------| |------|---------|-------------|
| `--block-size` | (none) | KV cache block size for initial `--workers` (required when `--workers` is set) | | `--block-size` | (none) | KV cache block size for initial `--workers` (required when `--workers` is set) |
| `--port` | `8090` | HTTP server listen port | | `--port` | `8090` | HTTP server listen port |
| `--threads` | `1` | Number of indexer threads (1 = single-threaded, >1 = thread pool) | | `--threads` | `4` | Number of indexer threads (1 = single-threaded, >1 = thread pool) |
| `--workers` | (none) | Initial workers as `instance_id=zmq_address,...` pairs | | `--workers` | (none) | Initial workers as `instance_id[:dp_rank]=zmq_address,...` pairs (dp_rank defaults to 0) |
| `--model-name` | `default` | Model name for initial `--workers` | | `--model-name` | `default` | Model name for initial `--workers` |
| `--tenant-id` | `default` | Tenant ID for initial `--workers` | | `--tenant-id` | `default` | Tenant ID for initial `--workers` |
| `--peers` | (none) | Comma-separated peer indexer URLs for P2P recovery on startup |
## HTTP API ## HTTP API
...@@ -188,12 +230,45 @@ curl http://localhost:8090/dump ...@@ -188,12 +230,45 @@ curl http://localhost:8090/dump
Returns: Returns:
```json ```json
{ {
"llama-3-8b:default": [<RouterEvent>, ...], "llama-3-8b:default": {
"mistral-7b:customer-a": [<RouterEvent>, ...] "block_size": 16,
"events": [<RouterEvent>, ...]
},
"mistral-7b:customer-a": {
"block_size": 16,
"events": [<RouterEvent>, ...]
}
} }
``` ```
Each indexer is dumped concurrently. Each indexer is dumped concurrently. The `block_size` field lets recovering peers create indexers with the correct block size without requiring `--block-size` on every replica.
### `POST /register_peer` — Register a peer indexer
```bash
curl -X POST http://localhost:8090/register_peer \
-H 'Content-Type: application/json' \
-d '{"url": "http://peer:8091"}'
```
### `POST /deregister_peer` — Remove a peer indexer
```bash
curl -X POST http://localhost:8090/deregister_peer \
-H 'Content-Type: application/json' \
-d '{"url": "http://peer:8091"}'
```
### `GET /peers` — List registered peers
```bash
curl http://localhost:8090/peers
```
Returns:
```json
["http://peer:8091"]
```
## Limitations ## Limitations
...@@ -235,6 +310,24 @@ graph TD ...@@ -235,6 +310,24 @@ graph TD
style CLIENT fill:#fff3e0,stroke:#333,color:#333 style CLIENT fill:#fff3e0,stroke:#333,color:#333
``` ```
### P2P Recovery Flow
```mermaid
sequenceDiagram
participant B as Replica B (new)
participant A as Replica A (healthy)
participant W as Workers (ZMQ PUB)
B->>W: Connect ZMQ SUB sockets
Note over B,W: 1s delay for peer tree to advance past connection point
B->>A: GET /dump
A-->>B: Radix tree snapshot + block sizes
Note over B: Apply dump events
Note over B: Unblock ZMQ listeners
B->>W: Start draining buffered events
Note over B: Ready to serve queries
```
## See Also ## See Also
- **[Mooncake KV Indexer RFC](https://github.com/kvcache-ai/Mooncake/issues/1403)**: Community API standardization for KV cache indexers - **[Mooncake KV Indexer RFC](https://github.com/kvcache-ai/Mooncake/issues/1403)**: Community API standardization for KV cache indexers
......
...@@ -14,7 +14,7 @@ repository.workspace = true ...@@ -14,7 +14,7 @@ repository.workspace = true
default = [] default = []
metrics = [] metrics = []
bench = ["dep:clap", "dep:indicatif", "dep:serde_json", "dep:plotters"] bench = ["dep:clap", "dep:indicatif", "dep:serde_json", "dep:plotters"]
indexer-bin = ["metrics", "dep:axum", "dep:clap", "dep:zeromq", "dep:tracing-subscriber", "dep:serde_json"] indexer-bin = ["metrics", "dep:axum", "dep:clap", "dep:zeromq", "dep:tracing-subscriber", "dep:serde_json", "dep:reqwest"]
[dependencies] [dependencies]
# repo # repo
...@@ -52,6 +52,7 @@ rustc-hash = "2.1.1" ...@@ -52,6 +52,7 @@ rustc-hash = "2.1.1"
# indexer-bin (optional) # indexer-bin (optional)
axum = { workspace = true, optional = true } axum = { workspace = true, optional = true }
reqwest = { workspace = true, optional = true }
zeromq = { version = "0.4.1", optional = true } zeromq = { version = "0.4.1", optional = true }
tracing-subscriber = { workspace = true, optional = true } tracing-subscriber = { workspace = true, optional = true }
......
...@@ -6,6 +6,7 @@ use std::sync::atomic::AtomicU32; ...@@ -6,6 +6,7 @@ use std::sync::atomic::AtomicU32;
use std::time::Duration; use std::time::Duration;
use rmp_serde as rmps; use rmp_serde as rmps;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use zeromq::{Socket, SocketRecv, SubSocket}; use zeromq::{Socket, SocketRecv, SubSocket};
...@@ -26,6 +27,27 @@ fn calculate_backoff_ms(consecutive_errors: u32) -> u64 { ...@@ -26,6 +27,27 @@ fn calculate_backoff_ms(consecutive_errors: u32) -> u64 {
) )
} }
// TODO: Gap detection for missed ZMQ messages
//
// ZMQ PUB/SUB is lossy — if the subscriber is slow or disconnects briefly,
// messages can be dropped. The `zeromq` 0.4 crate uses bounded internal
// channels between the PUB and SUB sockets (via `try_send` with a noop
// waker), so messages are silently dropped when the write buffer is full
// (per ZMQ spec RFC 29).
//
// For P2P recovery, the ready signal delays `recv()` only briefly (the
// duration of the HTTP dump fetch), which is well within the crate's
// internal channel capacity. For longer delays or high-throughput scenarios,
// messages could be lost.
//
// Easy win: hook up the vLLM replay endpoint — workers already expose
// `LocalKvIndexer` with event buffering and range queries (see
// `lib/llm/src/kv_router/worker_query.rs`), just need to query it from
// the standalone indexer on gap detection.
//
// Alternative future approach: switch to an explicit `mpsc` channel as the
// buffer (unbounded, no drops) instead of relying on ZMQ's internal buffer.
pub async fn run_zmq_listener( pub async fn run_zmq_listener(
worker_id: WorkerId, worker_id: WorkerId,
dp_rank: u32, dp_rank: u32,
...@@ -33,6 +55,7 @@ pub async fn run_zmq_listener( ...@@ -33,6 +55,7 @@ pub async fn run_zmq_listener(
block_size: u32, block_size: u32,
indexer: Indexer, indexer: Indexer,
cancel: CancellationToken, cancel: CancellationToken,
mut ready: watch::Receiver<bool>,
) { ) {
tracing::info!(worker_id, dp_rank, zmq_address, "ZMQ listener starting"); tracing::info!(worker_id, dp_rank, zmq_address, "ZMQ listener starting");
...@@ -48,6 +71,25 @@ pub async fn run_zmq_listener( ...@@ -48,6 +71,25 @@ pub async fn run_zmq_listener(
return; return;
} }
// Wait for the ready signal before entering the recv loop.
// During P2P recovery, this delay lets the recovery code fetch the dump
// from a peer while ZMQ subscription handshakes complete in the background.
tokio::select! {
biased;
_ = cancel.cancelled() => {
tracing::info!(worker_id, dp_rank, "ZMQ listener cancelled before ready");
return;
}
result = ready.wait_for(|&v| v) => {
if result.is_err() {
tracing::error!(worker_id, dp_rank, "Ready channel closed before signaling");
return;
}
}
}
tracing::info!(worker_id, dp_rank, "ZMQ listener ready, starting recv loop");
let mut next_event_id = 0u64; let mut next_event_id = 0u64;
let warning_count = Arc::new(AtomicU32::new(0)); let warning_count = Arc::new(AtomicU32::new(0));
let mut consecutive_errors = 0u32; let mut consecutive_errors = 0u32;
...@@ -133,3 +175,68 @@ pub async fn run_zmq_listener( ...@@ -133,3 +175,68 @@ pub async fn run_zmq_listener(
"ZMQ listener exiting" "ZMQ listener exiting"
); );
} }
#[cfg(test)]
mod tests {
use zeromq::{PubSocket, Socket, SocketRecv, SocketSend, SubSocket};
/// Verify that the `zeromq` crate buffers a small number of messages in
/// TCP kernel buffers when `recv()` is not being called. The PUB socket
/// uses `try_send` with a noop waker — once the TCP send buffer is full
/// it silently drops messages (per ZMQ spec RFC 29). This test confirms
/// that a brief delay (simulating P2P recovery) doesn't lose messages.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn zmq_buffers_messages_during_brief_delay() {
let mut pub_socket = PubSocket::new();
let bound_endpoint = pub_socket.bind("tcp://127.0.0.1:0").await.unwrap();
let mut sub_socket = SubSocket::new();
sub_socket.subscribe("").await.unwrap();
sub_socket
.connect(&bound_endpoint.to_string())
.await
.unwrap();
// Wait for SUB handshake: spawn recv in a background task so the
// PUB's accept/subscription processing can proceed concurrently.
let (tx, mut rx) = tokio::sync::mpsc::channel::<SubSocket>(1);
tokio::spawn(async move {
let _ = sub_socket.recv().await.unwrap();
let _ = tx.send(sub_socket).await;
});
loop {
pub_socket.send("probe".into()).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
if let Ok(sub) = rx.try_recv() {
sub_socket = sub;
break;
}
}
let num_messages = 10u64;
// Send messages without calling recv() — simulates the brief window
// between ZMQ connect and ready signal during P2P recovery.
for i in 0..num_messages {
pub_socket
.send(i.to_le_bytes().to_vec().into())
.await
.unwrap();
}
// Simulate recovery delay
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
// All messages should be buffered in TCP kernel buffers
for i in 0u64..num_messages {
let msg = tokio::time::timeout(std::time::Duration::from_secs(5), sub_socket.recv())
.await
.expect("timed out waiting for ZMQ message")
.expect("ZMQ recv error");
let payload = msg.get(0).unwrap();
let received = u64::from_le_bytes(payload[..8].try_into().unwrap());
assert_eq!(received, i, "message {i} arrived out of order");
}
}
}
...@@ -8,6 +8,7 @@ use tokio::net::TcpListener; ...@@ -8,6 +8,7 @@ use tokio::net::TcpListener;
mod indexer; mod indexer;
mod listener; mod listener;
mod recovery;
mod registry; mod registry;
mod server; mod server;
...@@ -26,10 +27,10 @@ struct Cli { ...@@ -26,10 +27,10 @@ struct Cli {
port: u16, port: u16,
/// Number of indexer threads (1 = single-threaded KvIndexer, >1 = ThreadPoolIndexer) /// Number of indexer threads (1 = single-threaded KvIndexer, >1 = ThreadPoolIndexer)
#[arg(long, default_value_t = 1)] #[arg(long, default_value_t = 4)]
threads: usize, threads: usize,
/// Initial workers as "worker_id=zmq_address,..." (e.g. "1=tcp://host:5557,2=tcp://host:5558") /// Initial workers as "worker_id[:dp_rank]=zmq_address,..." (e.g. "1=tcp://host:5557,1:1=tcp://host:5558")
#[arg(long)] #[arg(long)]
workers: Option<String>, workers: Option<String>,
...@@ -40,15 +41,24 @@ struct Cli { ...@@ -40,15 +41,24 @@ struct Cli {
/// Tenant ID for initial workers registered via --workers /// Tenant ID for initial workers registered via --workers
#[arg(long, default_value = "default")] #[arg(long, default_value = "default")]
tenant_id: String, tenant_id: String,
/// Comma-separated peer URLs for P2P recovery (e.g. "http://host1:8090,http://host2:8091")
#[arg(long)]
peers: Option<String>,
} }
fn parse_workers(s: &str) -> Vec<(u64, String)> { fn parse_workers(s: &str) -> Vec<(u64, u32, String)> {
s.split(',') s.split(',')
.filter(|entry| !entry.is_empty()) .filter(|entry| !entry.is_empty())
.filter_map(|entry| { .filter_map(|entry| {
let (id_str, addr) = entry.split_once('=')?; let (id_part, addr) = entry.split_once('=')?;
let id = id_str.trim().parse::<u64>().ok()?; let id_part = id_part.trim();
Some((id, addr.trim().to_string())) let (id, dp_rank) = if let Some((id_str, rank_str)) = id_part.split_once(':') {
(id_str.parse::<u64>().ok()?, rank_str.parse::<u32>().ok()?)
} else {
(id_part.parse::<u64>().ok()?, 0)
};
Some((id, dp_rank, addr.trim().to_string()))
}) })
.collect() .collect()
} }
...@@ -64,27 +74,42 @@ async fn main() -> anyhow::Result<()> { ...@@ -64,27 +74,42 @@ async fn main() -> anyhow::Result<()> {
let cli = Cli::parse(); let cli = Cli::parse();
let peers: Vec<String> = cli
.peers
.as_deref()
.map(|s| {
s.split(',')
.filter(|p| !p.is_empty())
.map(|p| p.trim().to_string())
.collect()
})
.unwrap_or_default();
tracing::info!( tracing::info!(
block_size = ?cli.block_size, block_size = ?cli.block_size,
port = cli.port, port = cli.port,
threads = cli.threads, threads = cli.threads,
model_name = %cli.model_name, model_name = %cli.model_name,
tenant_id = %cli.tenant_id, tenant_id = %cli.tenant_id,
num_peers = peers.len(),
"Starting standalone KV cache indexer" "Starting standalone KV cache indexer"
); );
let registry = WorkerRegistry::new(cli.threads); let registry = WorkerRegistry::new(cli.threads);
// Register initial workers — connects ZMQ sockets but listeners wait
// for the ready signal. This ensures ZMQ subscription handshakes begin
// before P2P recovery fetches the dump from a peer.
if let Some(ref workers_str) = cli.workers { if let Some(ref workers_str) = cli.workers {
let block_size = cli.block_size.ok_or_else(|| { let block_size = cli.block_size.ok_or_else(|| {
anyhow::anyhow!("--block-size is required when --workers is specified") anyhow::anyhow!("--block-size is required when --workers is specified")
})?; })?;
for (instance_id, endpoint) in parse_workers(workers_str) { for (instance_id, dp_rank, endpoint) in parse_workers(workers_str) {
tracing::info!(instance_id, endpoint, "Registering initial worker"); tracing::info!(instance_id, dp_rank, endpoint, "Registering initial worker");
registry.register( registry.register(
instance_id, instance_id,
endpoint, endpoint,
0, dp_rank,
cli.model_name.clone(), cli.model_name.clone(),
cli.tenant_id.clone(), cli.tenant_id.clone(),
block_size, block_size,
...@@ -92,6 +117,23 @@ async fn main() -> anyhow::Result<()> { ...@@ -92,6 +117,23 @@ async fn main() -> anyhow::Result<()> {
} }
} }
// P2P recovery: fetch dump from a peer before starting ZMQ listeners.
// The 1s delay inside recover_from_peers ensures the peer's tree has
// advanced past our ZMQ connection floor before we fetch the dump.
if !peers.is_empty() {
match recovery::recover_from_peers(&peers, &registry).await {
Ok(true) => tracing::info!("P2P recovery completed"),
Ok(false) => tracing::warn!("no reachable peers, starting with empty state"),
Err(e) => tracing::warn!(error = %e, "P2P recovery failed, starting with empty state"),
}
for peer in &peers {
registry.register_peer(peer.clone());
}
}
// Signal ready — unblocks all ZMQ listeners to start draining buffered events
registry.signal_ready();
let state = Arc::new(AppState { registry }); let state = Arc::new(AppState { registry });
let app = create_router(state); let app = create_router(state);
...@@ -108,11 +150,11 @@ mod tests { ...@@ -108,11 +150,11 @@ mod tests {
#[test] #[test]
fn test_parse_workers() { fn test_parse_workers() {
let input = "1=tcp://host:5557,2=tcp://host:5558"; let input = "1=tcp://host:5557,2:1=tcp://host:5558";
let result = parse_workers(input); let result = parse_workers(input);
assert_eq!(result.len(), 2); assert_eq!(result.len(), 2);
assert_eq!(result[0], (1, "tcp://host:5557".to_string())); assert_eq!(result[0], (1, 0, "tcp://host:5557".to_string()));
assert_eq!(result[1], (2, "tcp://host:5558".to_string())); assert_eq!(result[1], (2, 1, "tcp://host:5558".to_string()));
} }
#[test] #[test]
......
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::collections::HashMap;
use anyhow::{Context, Result};
use serde::Deserialize;
use dynamo_kv_router::protocols::RouterEvent;
use super::registry::{IndexerKey, WorkerRegistry};
#[derive(Deserialize)]
struct DumpEntry {
block_size: u32,
events: Vec<RouterEvent>,
}
pub async fn recover_from_peers(peers: &[String], registry: &WorkerRegistry) -> Result<bool> {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
.context("failed to build HTTP client")?;
// Brief delay to ensure the peer's tree state has advanced past the
// point where our ZMQ SUB sockets connected. The dump must cover any
// events that would otherwise be lost to the slow-joiner window —
// without this delay, the peer's dump could be stale relative to our
// ZMQ connection floor.
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
for peer_url in peers {
match try_recover_from_peer(&client, peer_url, registry).await {
Ok(()) => {
tracing::info!(peer = %peer_url, "recovery from peer succeeded");
return Ok(true);
}
Err(e) => {
tracing::warn!(peer = %peer_url, error = %e, "recovery from peer failed, trying next");
}
}
}
Ok(false)
}
async fn try_recover_from_peer(
client: &reqwest::Client,
peer_url: &str,
registry: &WorkerRegistry,
) -> Result<()> {
let dump_url = format!("{peer_url}/dump");
tracing::info!(url = %dump_url, "fetching dump from peer");
let resp = client
.get(&dump_url)
.send()
.await
.context("HTTP request failed")?;
if !resp.status().is_success() {
anyhow::bail!("peer returned status {}", resp.status());
}
let dump: HashMap<String, DumpEntry> =
resp.json().await.context("failed to parse dump response")?;
let mut total_events = 0usize;
for (map_key, entry) in dump {
let (model_name, tenant_id) = map_key
.split_once(':')
.ok_or_else(|| anyhow::anyhow!("invalid dump key format: {map_key}"))?;
let key = IndexerKey {
model_name: model_name.to_string(),
tenant_id: tenant_id.to_string(),
};
let indexer = registry.get_or_create_indexer(key, entry.block_size);
for event in entry.events {
indexer.apply_event(event).await;
total_events += 1;
}
}
tracing::info!(total_events, "applied dump events from peer");
Ok(())
}
...@@ -6,6 +6,7 @@ use std::collections::HashMap; ...@@ -6,6 +6,7 @@ use std::collections::HashMap;
use anyhow::{Result, bail}; use anyhow::{Result, bail};
use dashmap::DashMap; use dashmap::DashMap;
use dashmap::mapref::one::Ref; use dashmap::mapref::one::Ref;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use dynamo_kv_router::protocols::WorkerId; use dynamo_kv_router::protocols::WorkerId;
...@@ -32,18 +33,45 @@ pub struct WorkerEntry { ...@@ -32,18 +33,45 @@ pub struct WorkerEntry {
pub struct WorkerRegistry { pub struct WorkerRegistry {
workers: DashMap<WorkerId, WorkerEntry>, workers: DashMap<WorkerId, WorkerEntry>,
indexers: DashMap<IndexerKey, IndexerEntry>, indexers: DashMap<IndexerKey, IndexerEntry>,
peers: DashMap<String, ()>,
num_threads: usize, num_threads: usize,
ready_tx: watch::Sender<bool>,
ready_rx: watch::Receiver<bool>,
} }
impl WorkerRegistry { impl WorkerRegistry {
pub fn new(num_threads: usize) -> Self { pub fn new(num_threads: usize) -> Self {
let (ready_tx, ready_rx) = watch::channel(false);
Self { Self {
workers: DashMap::new(), workers: DashMap::new(),
indexers: DashMap::new(), indexers: DashMap::new(),
peers: DashMap::new(),
num_threads, num_threads,
ready_tx,
ready_rx,
} }
} }
pub fn signal_ready(&self) {
let _ = self.ready_tx.send(true);
}
pub fn ready_rx(&self) -> watch::Receiver<bool> {
self.ready_rx.clone()
}
pub fn register_peer(&self, url: String) {
self.peers.entry(url).or_insert(());
}
pub fn deregister_peer(&self, url: &str) -> bool {
self.peers.remove(url).is_some()
}
pub fn list_peers(&self) -> Vec<String> {
self.peers.iter().map(|entry| entry.key().clone()).collect()
}
pub fn register( pub fn register(
&self, &self,
instance_id: WorkerId, instance_id: WorkerId,
...@@ -102,9 +130,10 @@ impl WorkerRegistry { ...@@ -102,9 +130,10 @@ impl WorkerRegistry {
let cancel = CancellationToken::new(); let cancel = CancellationToken::new();
let child_cancel = cancel.child_token(); let child_cancel = cancel.child_token();
let addr = endpoint.clone(); let addr = endpoint.clone();
let ready = self.ready_rx();
tokio::spawn(async move { tokio::spawn(async move {
run_zmq_listener(instance_id, dp_rank, addr, bs, indexer, child_cancel).await; run_zmq_listener(instance_id, dp_rank, addr, bs, indexer, child_cancel, ready).await;
}); });
entry.endpoints.insert(dp_rank, endpoint); entry.endpoints.insert(dp_rank, endpoint);
...@@ -233,10 +262,41 @@ impl WorkerRegistry { ...@@ -233,10 +262,41 @@ impl WorkerRegistry {
self.indexers.get(key) self.indexers.get(key)
} }
pub fn all_indexers(&self) -> Vec<(IndexerKey, Indexer)> { pub fn get_or_create_indexer(&self, key: IndexerKey, block_size: u32) -> Indexer {
let entry = self.indexers.entry(key.clone()).or_insert_with(|| {
tracing::info!(
model_name = %key.model_name,
tenant_id = %key.tenant_id,
block_size,
"Creating indexer from recovery dump"
);
IndexerEntry {
indexer: create_indexer(block_size, self.num_threads),
block_size,
}
});
if entry.block_size != block_size {
tracing::warn!(
model_name = %key.model_name,
tenant_id = %key.tenant_id,
existing_block_size = entry.block_size,
requested_block_size = block_size,
"Block size mismatch for existing indexer"
);
}
entry.indexer.clone()
}
pub fn all_indexers_with_block_size(&self) -> Vec<(IndexerKey, Indexer, u32)> {
self.indexers self.indexers
.iter() .iter()
.map(|entry| (entry.key().clone(), entry.value().indexer.clone())) .map(|entry| {
(
entry.key().clone(),
entry.value().indexer.clone(),
entry.value().block_size,
)
})
.collect() .collect()
} }
} }
...@@ -248,25 +248,65 @@ async fn query_by_hash( ...@@ -248,25 +248,65 @@ async fn query_by_hash(
} }
} }
#[derive(Deserialize)]
struct PeerRequest {
url: String,
}
async fn register_peer(
State(state): State<Arc<AppState>>,
Json(req): Json<PeerRequest>,
) -> impl IntoResponse {
state.registry.register_peer(req.url);
(
StatusCode::CREATED,
Json(serde_json::json!({"status": "ok"})),
)
}
async fn deregister_peer(
State(state): State<Arc<AppState>>,
Json(req): Json<PeerRequest>,
) -> impl IntoResponse {
if state.registry.deregister_peer(&req.url) {
(StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
} else {
(
StatusCode::NOT_FOUND,
Json(serde_json::json!({"error": "peer not found"})),
)
}
}
async fn list_peers(State(state): State<Arc<AppState>>) -> impl IntoResponse {
Json(state.registry.list_peers())
}
async fn dump_events(State(state): State<Arc<AppState>>) -> impl IntoResponse { async fn dump_events(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let indexers = state.registry.all_indexers(); let all = state.registry.all_indexers_with_block_size();
let mut handles = Vec::with_capacity(indexers.len()); let mut handles = Vec::with_capacity(all.len());
for (key, indexer) in indexers { for (key, indexer, block_size) in all {
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
let events = indexer.dump_events().await; let events = indexer.dump_events().await;
(key, events) (key, events, block_size)
})); }));
} }
let mut result: HashMap<String, serde_json::Value> = HashMap::new(); let mut result: HashMap<String, serde_json::Value> = HashMap::new();
for handle in handles { for handle in handles {
match handle.await { match handle.await {
Ok((key, Ok(events))) => { Ok((key, Ok(events), block_size)) => {
let map_key = format!("{}:{}", key.model_name, key.tenant_id); let map_key = format!("{}:{}", key.model_name, key.tenant_id);
result.insert(map_key, serde_json::json!(events)); result.insert(
map_key,
serde_json::json!({
"block_size": block_size,
"events": events,
}),
);
} }
Ok((key, Err(e))) => { Ok((key, Err(e), _)) => {
let map_key = format!("{}:{}", key.model_name, key.tenant_id); let map_key = format!("{}:{}", key.model_name, key.tenant_id);
result.insert(map_key, serde_json::json!({"error": e.to_string()})); result.insert(map_key, serde_json::json!({"error": e.to_string()}));
} }
...@@ -286,5 +326,8 @@ pub fn create_router(state: Arc<AppState>) -> Router { ...@@ -286,5 +326,8 @@ pub fn create_router(state: Arc<AppState>) -> Router {
.route("/query", post(query)) .route("/query", post(query))
.route("/query_by_hash", post(query_by_hash)) .route("/query_by_hash", post(query_by_hash))
.route("/dump", get(dump_events)) .route("/dump", get(dump_events))
.route("/register_peer", post(register_peer))
.route("/deregister_peer", post(deregister_peer))
.route("/peers", get(list_peers))
.with_state(state) .with_state(state)
} }
...@@ -33,15 +33,21 @@ async fn start_kv_router_background_event_plane( ...@@ -33,15 +33,21 @@ async fn start_kv_router_background_event_plane(
transport_kind: EventTransportKind, transport_kind: EventTransportKind,
) -> Result<()> { ) -> Result<()> {
let cancellation_token = component.drt().primary_token(); let cancellation_token = component.drt().primary_token();
// WorkerQueryClient handles its own discovery loop for lifecycle + initial recovery.
// No blocking wait — recovery happens asynchronously as endpoints are discovered.
let worker_query_client = WorkerQueryClient::spawn(component.clone(), indexer.clone()).await?;
// Subscribe to KV events using the selected event plane transport // Subscribe to KV events BEFORE spawning the discovery/recovery loop.
// This ensures no events are lost between the initial dump fetch and the
// subscription becoming active — the tree state at fetch time is guaranteed
// to be a subset of what the subscription will deliver.
let mut subscriber = let mut subscriber =
EventSubscriber::for_component_with_transport(&component, KV_EVENT_SUBJECT, transport_kind) EventSubscriber::for_component_with_transport(&component, KV_EVENT_SUBJECT, transport_kind)
.await? .await?
.typed::<RouterEvent>(); .typed::<RouterEvent>();
// Brief delay to let the subscription fully establish with the NATS server
// before recovery fetches the initial dump from workers.
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let worker_query_client = WorkerQueryClient::spawn(component.clone(), indexer.clone()).await?;
let kv_event_subject = format!( let kv_event_subject = format!(
"namespace.{}.component.{}.{}", "namespace.{}.component.{}.{}",
component.namespace().name(), component.namespace().name(),
......
...@@ -1354,6 +1354,7 @@ def _test_router_indexers_sync( ...@@ -1354,6 +1354,7 @@ def _test_router_indexers_sync(
durable_kv_events: bool = False, durable_kv_events: bool = False,
router_event_threads: int = 4, router_event_threads: int = 4,
standalone_indexer_url: Optional[str] = None, standalone_indexer_url: Optional[str] = None,
standalone_indexer_b_url: Optional[str] = None,
): ):
"""Test that two KV routers have synchronized indexer states after processing requests. """Test that two KV routers have synchronized indexer states after processing requests.
...@@ -1538,6 +1539,15 @@ def _test_router_indexers_sync( ...@@ -1538,6 +1539,15 @@ def _test_router_indexers_sync(
kv_router_config=kv_router_config, kv_router_config=kv_router_config,
) )
# Launch Indexer B alongside Router 2. Workers are passed via --workers
# so ZMQ sockets connect before recovery, avoiding the slow-joiner problem.
if standalone_indexer_b_url:
engine_workers.launch_indexer()
logger.info(
f"Launched Indexer B at {standalone_indexer_b_url} "
f"(P2P recovery from Indexer A)"
)
# Send 25 requests to second router with initial retry loop # Send 25 requests to second router with initial retry loop
logger.info("Sending 25 requests to second router") logger.info("Sending 25 requests to second router")
successful2 = await send_requests_to_router( successful2 = await send_requests_to_router(
...@@ -1576,14 +1586,13 @@ def _test_router_indexers_sync( ...@@ -1576,14 +1586,13 @@ def _test_router_indexers_sync(
successful_recovery == 5 successful_recovery == 5
), f"Expected 5 successful requests post-recovery, got {successful_recovery}" ), f"Expected 5 successful requests post-recovery, got {successful_recovery}"
# Wait for all requests to complete (they should already be complete from gather) # Wait for internal synchronization and ZMQ event propagation
# Wait another 1 second for internal synchronization
logger.info("Waiting for final synchronization") logger.info("Waiting for final synchronization")
await asyncio.sleep(1) await asyncio.sleep(2)
# Verify NATS object store bucket was created with snapshot # Verify NATS object store bucket was created with snapshot
# Skip for NATS interruption test (restarts fresh), and standalone indexer (no JetStream) # Skip for NATS interruption test (restarts fresh) and non-durable modes
if not test_nats_interruption and not standalone_indexer_url: if not test_nats_interruption and durable_kv_events:
# Mirror the Rust bucket naming logic from subscriber.rs: # Mirror the Rust bucket naming logic from subscriber.rs:
# component.subject() -> "namespace.{ns}.component.{comp}" # component.subject() -> "namespace.{ns}.component.{comp}"
# then slugify (convert dots to dashes, lowercase, etc) and append "-radix-bucket" # then slugify (convert dots to dashes, lowercase, etc) and append "-radix-bucket"
...@@ -1630,16 +1639,14 @@ def _test_router_indexers_sync( ...@@ -1630,16 +1639,14 @@ def _test_router_indexers_sync(
"Skipping NATS object store verification (NATS was restarted fresh for interruption test)" "Skipping NATS object store verification (NATS was restarted fresh for interruption test)"
) )
# Dump states from both routers # Dump states from all sources
logger.info("Dumping states from both routers") logger.info("Dumping states from all sources")
state1_json = await kv_router1.dump_events() state1_json = await kv_router1.dump_events()
state2_json = await kv_router2.dump_events() state2_json = await kv_router2.dump_events()
# Parse JSON strings for comparison
state1 = json.loads(state1_json) state1 = json.loads(state1_json)
state2 = json.loads(state2_json) state2 = json.loads(state2_json)
# Sort both states for comparison (order might differ due to HashMap iteration and sharding)
def sort_key(event): def sort_key(event):
data = event["event"]["data"]["stored"] data = event["event"]["data"]["stored"]
blocks = data["blocks"] blocks = data["blocks"]
...@@ -1657,50 +1664,66 @@ def _test_router_indexers_sync( ...@@ -1657,50 +1664,66 @@ def _test_router_indexers_sync(
logger.info(f"Router 2 has {len(sorted_state2)} events") logger.info(f"Router 2 has {len(sorted_state2)} events")
assert_event_dumps_equal(sorted_state1, sorted_state2, "Router 1", "Router 2") assert_event_dumps_equal(sorted_state1, sorted_state2, "Router 1", "Router 2")
logger.info("Successfully verified that both router states are equal") logger.info("Successfully verified Router 1 and Router 2 states are equal")
# Verify standalone HTTP indexer builds the same tree (non-durable with ZMQ) # Verify standalone HTTP indexers build the same tree (via ZMQ)
if standalone_indexer_url: if standalone_indexer_url:
logger.info("Verifying standalone HTTP indexer tree state via /dump")
# Wait for ZMQ events to propagate to the indexer
await asyncio.sleep(3)
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
async with session.get(f"{standalone_indexer_url}/dump") as resp: async with session.get(f"{standalone_indexer_url}/dump") as resp:
assert resp.status == 200, f"GET /dump failed: {resp.status}" assert resp.status == 200, f"GET /dump failed: {resp.status}"
dump_by_key = await resp.json() dump_a = await resp.json()
# /dump returns {model:tenant -> events}, extract the expected key # /dump returns {model:tenant -> {"block_size": N, "events": [...]}}
expected_key = f"{model_name}:default" expected_key = f"{model_name}:default"
assert expected_key in dump_by_key, ( assert expected_key in dump_a, (
f"Expected dump key '{expected_key}', " f"Expected dump key '{expected_key}', "
f"got keys={list(dump_by_key.keys())}" f"got keys={list(dump_a.keys())}"
) )
for k, v in dump_by_key.items(): for k, v in dump_a.items():
assert isinstance(v, list), f"Dump key '{k}' returned error: {v}" assert (
standalone_state = dump_by_key[expected_key] isinstance(v, dict) and "events" in v
sorted_standalone = sorted(standalone_state, key=sort_key) ), f"Dump key '{k}' returned unexpected format: {v}"
logger.info(f"Standalone HTTP indexer has {len(sorted_standalone)} events") sorted_standalone_a = sorted(dump_a[expected_key]["events"], key=sort_key)
logger.info(f"Standalone Indexer A has {len(sorted_standalone_a)} events")
assert_event_dumps_equal( assert_event_dumps_equal(
sorted_state1, sorted_standalone, "Router 1", "Standalone" sorted_state1, sorted_standalone_a, "Router 1", "Standalone A"
)
logger.info(
"Successfully verified standalone HTTP indexer state matches router states"
)
elif not durable_kv_events:
logger.info(
"Skipping standalone indexer verification (no standalone_indexer_url)"
)
else:
logger.info(
"Skipping standalone indexer verification (not supported with durable_kv_events)"
) )
logger.info("Standalone A matches Router 1")
if standalone_indexer_b_url:
async with aiohttp.ClientSession() as session:
async with session.get(f"{standalone_indexer_b_url}/dump") as resp:
assert (
resp.status == 200
), f"GET /dump from Indexer B failed: {resp.status}"
dump_b = await resp.json()
assert expected_key in dump_b, (
f"Indexer B missing dump key '{expected_key}', "
f"got keys={list(dump_b.keys())}"
)
sorted_standalone_b = sorted(
dump_b[expected_key]["events"], key=sort_key
)
logger.info(
f"Standalone Indexer B has {len(sorted_standalone_b)} events"
)
assert_event_dumps_equal(
sorted_standalone_a,
sorted_standalone_b,
"Standalone A",
"Standalone B",
)
logger.info(
"All 4 dumps match: Router 1, Router 2, "
"Standalone A, Standalone B"
)
# Verify NATS consumers are created (while routers are still alive) # Verify NATS consumers are created (while routers are still alive)
# Skip for NATS interruption test and standalone indexer (neither uses JetStream) # Skip for NATS interruption test (restarts fresh) and non-durable modes
if not test_nats_interruption and not standalone_indexer_url: if not test_nats_interruption and durable_kv_events:
logger.info("Verifying NATS consumers exist for both routers") logger.info("Verifying NATS consumers exist for both routers")
component_subject = f"namespace.{engine_workers.namespace}.component.{engine_workers.component_name}" component_subject = f"namespace.{engine_workers.namespace}.component.{engine_workers.component_name}"
slugified = component_subject.lower().replace(".", "-").replace("_", "-") slugified = component_subject.lower().replace(".", "-").replace("_", "-")
......
...@@ -201,7 +201,9 @@ class MockerProcess: ...@@ -201,7 +201,9 @@ class MockerProcess:
self._zmq_kv_events_ports: list[int] = [] self._zmq_kv_events_ports: list[int] = []
self._standalone_indexer = standalone_indexer self._standalone_indexer = standalone_indexer
self._standalone_indexer_port: Optional[int] = None self._standalone_indexer_port: Optional[int] = None
self._standalone_indexer_b_port: Optional[int] = None
self._indexer_process: Optional[ManagedProcess] = None self._indexer_process: Optional[ManagedProcess] = None
self._indexer_b_process: Optional[ManagedProcess] = None
self._mocker_processes: list[ManagedProcess] = [] self._mocker_processes: list[ManagedProcess] = []
self._request = request self._request = request
self._store_backend = store_backend self._store_backend = store_backend
...@@ -233,13 +235,11 @@ class MockerProcess: ...@@ -233,13 +235,11 @@ class MockerProcess:
) )
if standalone_indexer: if standalone_indexer:
# Allocate a port for the standalone indexer HTTP server # Allocate ports for standalone indexer A and B (P2P recovery peer)
self._standalone_indexer_port = allocate_ports(1, BASE_PORT)[0] indexer_ports = allocate_ports(2, BASE_PORT)
request.addfinalizer( self._standalone_indexer_port = indexer_ports[0]
lambda: deallocate_ports([self._standalone_indexer_port]) self._standalone_indexer_b_port = indexer_ports[1]
if self._standalone_indexer_port request.addfinalizer(lambda: deallocate_ports(indexer_ports))
else None
)
# Don't build a single mocker command — we'll launch per-mocker in launch_mockers_with_indexer # Don't build a single mocker command — we'll launch per-mocker in launch_mockers_with_indexer
self._process = None self._process = None
else: else:
...@@ -274,6 +274,12 @@ class MockerProcess: ...@@ -274,6 +274,12 @@ class MockerProcess:
return f"http://localhost:{self._standalone_indexer_port}" return f"http://localhost:{self._standalone_indexer_port}"
return None return None
@property
def standalone_indexer_b_url(self) -> Optional[str]:
if self._standalone_indexer_b_port is not None:
return f"http://localhost:{self._standalone_indexer_b_port}"
return None
def __enter__(self): def __enter__(self):
if self._standalone_indexer: if self._standalone_indexer:
# Launch the standalone indexer binary # Launch the standalone indexer binary
...@@ -412,6 +418,64 @@ class MockerProcess: ...@@ -412,6 +418,64 @@ class MockerProcess:
f"All {self.num_workers} mockers launched and registered with indexer" f"All {self.num_workers} mockers launched and registered with indexer"
) )
def launch_indexer(self):
"""Launch a second standalone indexer (Indexer B) with --peers pointing to Indexer A.
Workers are passed via --workers so ZMQ sockets connect before recovery
runs, ensuring the subscription handshake completes during the recovery
delay and no events are lost to the ZMQ slow-joiner problem.
"""
if not self._standalone_indexer or self._standalone_indexer_b_port is None:
raise RuntimeError("launch_indexer requires standalone_indexer=True")
if not self.worker_id_to_zmq_ports:
raise RuntimeError("launch_indexer requires workers to be registered first")
block_size = self._mocker_args_orig.get("block_size", BLOCK_SIZE)
# Build --workers arg: "worker_id:dp_rank=zmq_addr,..."
worker_entries = []
for worker_id, zmq_addresses in self.worker_id_to_zmq_ports.items():
for dp_rank_str, zmq_endpoint in zmq_addresses.items():
worker_entries.append(f"{worker_id}:{dp_rank_str}={zmq_endpoint}")
workers_arg = ",".join(worker_entries)
indexer_b_cmd = [
"cargo",
"run",
"-p",
"dynamo-kv-router",
"--features",
"indexer-bin",
"--bin",
"dynamo-kv-indexer",
"--",
"--block-size",
str(block_size),
"--port",
str(self._standalone_indexer_b_port),
"--peers",
f"http://localhost:{self._standalone_indexer_port}",
"--workers",
workers_arg,
"--model-name",
self.model_name,
]
self._indexer_b_process = ManagedProcess(
command=indexer_b_cmd,
timeout=120,
display_output=True,
health_check_ports=[self._standalone_indexer_b_port],
health_check_urls=[],
log_dir=self._request.node.name,
terminate_all_matching_process_names=False,
display_name="dynamo-kv-indexer-b",
)
logger.info(
f"Starting standalone indexer B on port {self._standalone_indexer_b_port} "
f"with peer http://localhost:{self._standalone_indexer_port}"
)
self._indexer_b_process.__enter__()
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
logger.info("Stopping mocker process(es)") logger.info("Stopping mocker process(es)")
# Stop individual mocker processes (standalone_indexer mode) # Stop individual mocker processes (standalone_indexer mode)
...@@ -421,7 +485,14 @@ class MockerProcess: ...@@ -421,7 +485,14 @@ class MockerProcess:
except Exception as e: except Exception as e:
logger.warning(f"Error stopping mocker process: {e}") logger.warning(f"Error stopping mocker process: {e}")
self._mocker_processes.clear() self._mocker_processes.clear()
# Stop standalone indexer # Stop standalone indexer B (P2P recovery peer)
if self._indexer_b_process is not None:
try:
self._indexer_b_process.__exit__(exc_type, exc_val, exc_tb)
except Exception as e:
logger.warning(f"Error stopping indexer B process: {e}")
self._indexer_b_process = None
# Stop standalone indexer A
if self._indexer_process is not None: if self._indexer_process is not None:
try: try:
self._indexer_process.__exit__(exc_type, exc_val, exc_tb) self._indexer_process.__exit__(exc_type, exc_val, exc_tb)
...@@ -791,6 +862,9 @@ def test_indexers_sync( ...@@ -791,6 +862,9 @@ def test_indexers_sync(
num_mockers=NUM_MOCKERS, num_mockers=NUM_MOCKERS,
store_backend=store_backend, store_backend=store_backend,
request_plane=request_plane, request_plane=request_plane,
zmq_kv_events=True,
standalone_indexer=True,
model_name=MODEL_NAME,
) as mockers: ) as mockers:
# Start mocker instances (2 workers x 2 DP ranks = 4 independent event streams) # Start mocker instances (2 workers x 2 DP ranks = 4 independent event streams)
logger.info(f"Starting {NUM_MOCKERS} mocker instances with dp_size=2") logger.info(f"Starting {NUM_MOCKERS} mocker instances with dp_size=2")
...@@ -810,6 +884,7 @@ def test_indexers_sync( ...@@ -810,6 +884,7 @@ def test_indexers_sync(
nats_server=nats_process if not durable_kv_events else None, nats_server=nats_process if not durable_kv_events else None,
durable_kv_events=durable_kv_events, durable_kv_events=durable_kv_events,
standalone_indexer_url=mockers.standalone_indexer_url, standalone_indexer_url=mockers.standalone_indexer_url,
standalone_indexer_b_url=mockers.standalone_indexer_b_url,
) )
logger.info("Indexers sync test completed successfully") logger.info("Indexers sync test completed successfully")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment