Unverified Commit f80ad692 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

feat(kv-router): package dynamo-kv-indexer binary via maturin [LLM-126] (#7194)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent a2ffdd74
......@@ -431,9 +431,9 @@ RUN --mount=type=secret,id=aws-key-id,env=AWS_ACCESS_KEY_ID \
uv build --wheel --out-dir /opt/dynamo/dist && \
cd /opt/dynamo/lib/bindings/python && \
if [ "$ENABLE_MEDIA_FFMPEG" = "true" ]; then \
maturin build --release --features "media-ffmpeg" --out /opt/dynamo/dist; \
maturin build --release --features "media-ffmpeg,kv-indexer" --out /opt/dynamo/dist; \
else \
maturin build --release --out /opt/dynamo/dist; \
maturin build --release --features "kv-indexer" --out /opt/dynamo/dist; \
fi && \
/tmp/use-sccache.sh show-stats "Dynamo Runtime"
......
......@@ -66,9 +66,9 @@ dependencies = [
[[package]]
name = "anstream"
version = "0.6.21"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a"
checksum = "824a212faf96e9acacdbd09febd34438f8f711fb84e09a8916013cd7815ca28d"
dependencies = [
"anstyle",
"anstyle-parse",
......@@ -87,9 +87,9 @@ checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78"
[[package]]
name = "anstyle-parse"
version = "0.2.7"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2"
checksum = "52ce7f38b242319f7cabaa6813055467063ecdc9d355bbb4ce0c68908cd8130e"
dependencies = [
"utf8parse",
]
......@@ -813,9 +813,9 @@ dependencies = [
[[package]]
name = "clap"
version = "4.5.60"
version = "4.5.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2797f34da339ce31042b27d23607e051786132987f595b02ba4f6a6dffb7030a"
checksum = "52fa72306bb30daf11bc97773431628e5b4916e97aaa74b7d3f625d4d495da02"
dependencies = [
"clap_builder",
"clap_derive",
......@@ -823,9 +823,9 @@ dependencies = [
[[package]]
name = "clap_builder"
version = "4.5.60"
version = "4.5.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24a241312cea5059b13574bb9b3861cabf758b879c15190b37b6d6fd63ab6876"
checksum = "2071365c5c56eae7d77414029dde2f4f4ba151cf68d5a3261c9a40de428ace93"
dependencies = [
"anstream",
"anstyle",
......@@ -835,9 +835,9 @@ dependencies = [
[[package]]
name = "clap_derive"
version = "4.5.55"
version = "4.5.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a92793da1a46a5f2a02a6f4c46c6496b28c43638adea8306fcb0caa1634f24e5"
checksum = "dec5be1eea072311774b7b84ded287adbd9f293f9d23456817605c6042f4f5e0"
dependencies = [
"heck",
"proc-macro2",
......@@ -847,9 +847,9 @@ dependencies = [
[[package]]
name = "clap_lex"
version = "1.0.0"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a822ea5bc7590f9d40f1ba12c0dc3c2760f3482c6984db1573ad11031420831"
checksum = "0e78417baa3b3114dc0e95e7357389a249c4da97c3c2b540700079db6171bfd7"
[[package]]
name = "cmake"
......@@ -2749,9 +2749,9 @@ dependencies = [
[[package]]
name = "image"
version = "0.25.9"
version = "0.25.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6506c6c10786659413faa717ceebcb8f70731c0a60cbae39795fdf114519c1a"
checksum = "85ab80394333c02fe689eaf900ab500fbd0c2213da414687ebf995a65d5a6104"
dependencies = [
"bytemuck",
"byteorder-lite",
......@@ -2768,8 +2768,8 @@ dependencies = [
"rgb",
"serde",
"tiff",
"zune-core 0.5.1",
"zune-jpeg 0.5.12",
"zune-core",
"zune-jpeg",
]
[[package]]
......@@ -3744,9 +3744,9 @@ dependencies = [
[[package]]
name = "moxcms"
version = "0.7.11"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac9557c559cd6fc9867e122e20d2cbefc9ca29d80d027a8e39310920ed2f0a97"
checksum = "bb85c154ba489f01b25c0d36ae69a87e4a1c73a72631fc6c0eb6dde34a73e44b"
dependencies = [
"num-traits",
"pxfm",
......@@ -4268,9 +4268,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.21.3"
version = "1.21.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50"
[[package]]
name = "once_cell_polyfill"
......@@ -5314,9 +5314,9 @@ dependencies = [
[[package]]
name = "ravif"
version = "0.12.0"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef69c1990ceef18a116855938e74793a5f7496ee907562bd0857b6ac734ab285"
checksum = "e52310197d971b0f5be7fe6b57530dcd27beb35c1b013f29d66c1ad73fbbcc45"
dependencies = [
"avif-serialize",
"imgref",
......@@ -6389,9 +6389,9 @@ checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1"
[[package]]
name = "tempfile"
version = "3.26.0"
version = "3.27.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82a72c767771b47409d2345987fda8628641887d5466101319899796367354a0"
checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd"
dependencies = [
"fastrand",
"getrandom 0.4.2",
......@@ -6451,16 +6451,16 @@ dependencies = [
[[package]]
name = "tiff"
version = "0.10.3"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af9605de7fee8d9551863fd692cce7637f548dbd9db9180fcc07ccc6d26c336f"
checksum = "b63feaf3343d35b6ca4d50483f94843803b0f51634937cc2ec519fc32232bc52"
dependencies = [
"fax",
"flate2",
"half",
"quick-error",
"weezl",
"zune-jpeg 0.4.21",
"zune-jpeg",
]
[[package]]
......@@ -8297,12 +8297,6 @@ dependencies = [
"simd-adler32",
]
[[package]]
name = "zune-core"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f423a2c17029964870cfaabb1f13dfab7d092a62a29a89264f4d36990ca414a"
[[package]]
name = "zune-core"
version = "0.5.1"
......@@ -8320,18 +8314,9 @@ dependencies = [
[[package]]
name = "zune-jpeg"
version = "0.4.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29ce2c8a9384ad323cf564b67da86e21d3cfdff87908bc1223ed5c99bc792713"
dependencies = [
"zune-core 0.4.12",
]
[[package]]
name = "zune-jpeg"
version = "0.5.12"
version = "0.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "410e9ecef634c709e3831c2cfdb8d9c32164fae1c67496d5b68fff728eec37fe"
checksum = "ec5f41c76397b7da451efd19915684f727d7e1d516384ca6bd0ec43ec94de23c"
dependencies = [
"zune-core 0.5.1",
"zune-core",
]
......@@ -1527,6 +1527,8 @@ version = "1.0.0"
dependencies = [
"anyhow",
"async-trait",
"axum",
"bytes",
"dashmap 6.1.0",
"derive-getters",
"derive_builder",
......@@ -1536,9 +1538,11 @@ dependencies = [
"parking_lot",
"prometheus",
"rand 0.9.2",
"reqwest",
"rmp-serde",
"rustc-hash 2.1.1",
"serde",
"serde_json",
"thiserror 2.0.18",
"tokio",
"tokio-util",
......@@ -1546,6 +1550,7 @@ dependencies = [
"uuid",
"validator",
"xxhash-rust",
"zeromq",
]
[[package]]
......@@ -1700,6 +1705,8 @@ version = "1.0.0"
dependencies = [
"anyhow",
"async-trait",
"clap",
"dynamo-kv-router",
"dynamo-llm",
"dynamo-mocker",
"dynamo-parsers",
......@@ -1717,6 +1724,7 @@ dependencies = [
"tokio-stream",
"tokio-util",
"tracing",
"tracing-subscriber",
]
[[package]]
......@@ -2810,9 +2818,9 @@ dependencies = [
[[package]]
name = "image"
version = "0.25.9"
version = "0.25.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6506c6c10786659413faa717ceebcb8f70731c0a60cbae39795fdf114519c1a"
checksum = "85ab80394333c02fe689eaf900ab500fbd0c2213da414687ebf995a65d5a6104"
dependencies = [
"bytemuck",
"byteorder-lite",
......@@ -2829,8 +2837,8 @@ dependencies = [
"rgb",
"serde",
"tiff",
"zune-core 0.5.1",
"zune-jpeg 0.5.12",
"zune-core",
"zune-jpeg",
]
[[package]]
......@@ -3793,9 +3801,9 @@ dependencies = [
[[package]]
name = "moxcms"
version = "0.7.11"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac9557c559cd6fc9867e122e20d2cbefc9ca29d80d027a8e39310920ed2f0a97"
checksum = "bb85c154ba489f01b25c0d36ae69a87e4a1c73a72631fc6c0eb6dde34a73e44b"
dependencies = [
"num-traits",
"pxfm",
......@@ -5373,9 +5381,9 @@ dependencies = [
[[package]]
name = "ravif"
version = "0.12.0"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef69c1990ceef18a116855938e74793a5f7496ee907562bd0857b6ac734ab285"
checksum = "e52310197d971b0f5be7fe6b57530dcd27beb35c1b013f29d66c1ad73fbbcc45"
dependencies = [
"avif-serialize",
"imgref",
......@@ -6510,16 +6518,16 @@ dependencies = [
[[package]]
name = "tiff"
version = "0.10.3"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af9605de7fee8d9551863fd692cce7637f548dbd9db9180fcc07ccc6d26c336f"
checksum = "b63feaf3343d35b6ca4d50483f94843803b0f51634937cc2ec519fc32232bc52"
dependencies = [
"fax",
"flate2",
"half",
"quick-error",
"weezl",
"zune-jpeg 0.4.21",
"zune-jpeg",
]
[[package]]
......@@ -8373,12 +8381,6 @@ dependencies = [
"simd-adler32",
]
[[package]]
name = "zune-core"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f423a2c17029964870cfaabb1f13dfab7d092a62a29a89264f4d36990ca414a"
[[package]]
name = "zune-core"
version = "0.5.1"
......@@ -8394,20 +8396,11 @@ dependencies = [
"simd-adler32",
]
[[package]]
name = "zune-jpeg"
version = "0.4.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29ce2c8a9384ad323cf564b67da86e21d3cfdff87908bc1223ed5c99bc792713"
dependencies = [
"zune-core 0.4.12",
]
[[package]]
name = "zune-jpeg"
version = "0.5.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "410e9ecef634c709e3831c2cfdb8d9c32164fae1c67496d5b68fff728eec37fe"
dependencies = [
"zune-core 0.5.1",
"zune-core",
]
......@@ -24,6 +24,7 @@ crate-type = ["cdylib", "rlib"]
[features]
default = []
media-ffmpeg = ["dynamo-llm/media-ffmpeg"]
kv-indexer = ["dep:dynamo-kv-router", "dep:clap", "dep:tracing-subscriber"]
[dependencies]
dynamo-runtime = { path = "../../runtime" }
......@@ -41,7 +42,12 @@ thiserror = { version = "2.0" }
tokio = { version = "1.46.0", features = ["full"] }
tokio-stream = { version = "0" }
tokio-util = { version = "0.7", features = ["rt"] }
tracing = { version = "0" }
tracing = { version = "0" }
# kv-indexer (optional)
dynamo-kv-router = { path = "../../kv-router", features = ["standalone-indexer"], optional = true }
clap = { version = "4.5", features = ["derive"], optional = true }
tracing-subscriber = { version = "0.3", features = ["env-filter"], optional = true }
# "extension-module" tells pyo3 we want to build an extension module (skips linking against libpython.so)
# "abi3-py310" tells pyo3 (and maturin) to build using the stable ABI with minimum Python version 3.10, which is the minimum version in pyproject.toml
......@@ -70,4 +76,9 @@ dynamo-llm = { path = "../../llm" }
[target.'cfg(not(target_os = "linux"))'.dependencies]
dynamo-llm = { path = "../../llm", default-features = false }
[[bin]]
name = "dynamo-kv-indexer"
path = "rust/bin/kv_indexer.rs"
required-features = ["kv-indexer"]
[dev-dependencies]
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use clap::Parser;
use dynamo_kv_router::standalone_indexer::{self, IndexerConfig};
#[derive(Parser)]
#[command(name = "dynamo-kv-indexer", about = "Standalone KV cache indexer")]
struct Cli {
/// KV cache block size for initial workers registered via --workers
#[arg(long)]
block_size: Option<u32>,
/// HTTP server port
#[arg(long, default_value_t = 8090)]
port: u16,
/// Number of indexer threads (1 = single-threaded KvIndexer, >1 = ThreadPoolIndexer)
#[arg(long, default_value_t = 4)]
threads: usize,
/// Initial workers as "worker_id[:dp_rank]=zmq_address,..." (e.g. "1=tcp://host:5557,1:1=tcp://host:5558")
#[arg(long)]
workers: Option<String>,
/// Model name for initial workers registered via --workers
#[arg(long, default_value = "default")]
model_name: String,
/// Tenant ID for initial workers registered via --workers
#[arg(long, default_value = "default")]
tenant_id: String,
/// Comma-separated peer URLs for P2P recovery (e.g. "http://host1:8090,http://host2:8091")
#[arg(long)]
peers: Option<String>,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.init();
let cli = Cli::parse();
standalone_indexer::run_server(IndexerConfig {
block_size: cli.block_size,
port: cli.port,
threads: cli.threads,
workers: cli.workers,
model_name: cli.model_name,
tenant_id: cli.tenant_id,
peers: cli.peers,
})
.await
}
......@@ -14,7 +14,8 @@ repository.workspace = true
default = []
metrics = []
bench = ["dep:clap", "dep:indicatif", "dep:serde_json", "dep:plotters"]
indexer-bin = ["metrics", "dep:axum", "dep:bytes", "dep:clap", "dep:zeromq", "dep:tracing-subscriber", "dep:serde_json", "dep:reqwest"]
standalone-indexer = ["metrics", "dep:axum", "dep:bytes", "dep:zeromq", "dep:serde_json", "dep:reqwest"]
indexer-bin = ["standalone-indexer", "dep:clap", "dep:tracing-subscriber"]
test-endpoints = ["indexer-bin"]
[dependencies]
......
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::sync::Arc;
use clap::Parser;
use tokio::net::TcpListener;
mod indexer;
mod listener;
mod recovery;
mod registry;
mod server;
use registry::WorkerRegistry;
use server::{AppState, create_router};
use dynamo_kv_router::standalone_indexer::{self, IndexerConfig};
#[derive(Parser)]
#[command(name = "dynamo-kv-indexer", about = "Standalone KV cache indexer")]
......@@ -47,22 +37,6 @@ struct Cli {
peers: Option<String>,
}
fn parse_workers(s: &str) -> Vec<(u64, u32, String)> {
s.split(',')
.filter(|entry| !entry.is_empty())
.filter_map(|entry| {
let (id_part, addr) = entry.split_once('=')?;
let id_part = id_part.trim();
let (id, dp_rank) = if let Some((id_str, rank_str)) = id_part.split_once(':') {
(id_str.parse::<u64>().ok()?, rank_str.parse::<u32>().ok()?)
} else {
(id_part.parse::<u64>().ok()?, 0)
};
Some((id, dp_rank, addr.trim().to_string()))
})
.collect()
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
......@@ -74,94 +48,14 @@ async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
let peers: Vec<String> = cli
.peers
.as_deref()
.map(|s| {
s.split(',')
.filter(|p| !p.is_empty())
.map(|p| p.trim().to_string())
.collect()
})
.unwrap_or_default();
tracing::info!(
block_size = ?cli.block_size,
port = cli.port,
threads = cli.threads,
model_name = %cli.model_name,
tenant_id = %cli.tenant_id,
num_peers = peers.len(),
"Starting standalone KV cache indexer"
);
let registry = WorkerRegistry::new(cli.threads);
// Register initial workers — connects ZMQ SUB sockets (subscription
// handshakes begin immediately) and spawns listener tasks that wait for
// the ready signal. register() returns as soon as the socket is connected.
if let Some(ref workers_str) = cli.workers {
let block_size = cli.block_size.ok_or_else(|| {
anyhow::anyhow!("--block-size is required when --workers is specified")
})?;
for (instance_id, dp_rank, endpoint) in parse_workers(workers_str) {
tracing::info!(instance_id, dp_rank, endpoint, "Registering initial worker");
registry
.register(
instance_id,
endpoint,
dp_rank,
cli.model_name.clone(),
cli.tenant_id.clone(),
block_size,
None,
)
.await?;
}
}
// P2P recovery: fetch dump from a peer before starting ZMQ listeners.
// The 1s delay inside recover_from_peers ensures the peer's tree has
// advanced past our ZMQ connection floor before we fetch the dump.
if !peers.is_empty() {
match recovery::recover_from_peers(&peers, &registry).await {
Ok(true) => tracing::info!("P2P recovery completed"),
Ok(false) => tracing::warn!("no reachable peers, starting with empty state"),
Err(e) => tracing::warn!(error = %e, "P2P recovery failed, starting with empty state"),
}
for peer in &peers {
registry.register_peer(peer.clone());
}
}
// Signal ready — unblocks all ZMQ listeners to start draining buffered events
registry.signal_ready();
let state = Arc::new(AppState { registry });
let app = create_router(state);
let listener = TcpListener::bind(("0.0.0.0", cli.port)).await?;
tracing::info!("HTTP server listening on 0.0.0.0:{}", cli.port);
axum::serve(listener, app).await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_workers() {
let input = "1=tcp://host:5557,2:1=tcp://host:5558";
let result = parse_workers(input);
assert_eq!(result.len(), 2);
assert_eq!(result[0], (1, 0, "tcp://host:5557".to_string()));
assert_eq!(result[1], (2, 1, "tcp://host:5558".to_string()));
}
#[test]
fn test_parse_workers_empty() {
assert!(parse_workers("").is_empty());
}
standalone_indexer::run_server(IndexerConfig {
block_size: cli.block_size,
port: cli.port,
threads: cli.threads,
workers: cli.workers,
model_name: cli.model_name,
tenant_id: cli.tenant_id,
peers: cli.peers,
})
.await
}
......@@ -27,6 +27,9 @@ pub use scheduling::selector;
pub use sequences::multi_worker as multi_worker_sequence;
pub use sequences::single as sequence;
#[cfg(feature = "standalone-indexer")]
pub mod standalone_indexer;
#[cfg(any(test, feature = "bench"))]
pub mod test_utils;
......
......@@ -6,10 +6,10 @@ use std::sync::Arc;
use anyhow::Result;
use tokio_util::sync::CancellationToken;
use dynamo_kv_router::ConcurrentRadixTree;
use dynamo_kv_router::ThreadPoolIndexer;
use dynamo_kv_router::indexer::{KvIndexer, KvIndexerInterface, KvIndexerMetrics};
use dynamo_kv_router::protocols::{LocalBlockHash, OverlapScores, RouterEvent, WorkerId};
use crate::ConcurrentRadixTree;
use crate::ThreadPoolIndexer;
use crate::indexer::{KvIndexer, KvIndexerInterface, KvIndexerMetrics};
use crate::protocols::{LocalBlockHash, OverlapScores, RouterEvent, WorkerId};
#[derive(Clone)]
pub enum Indexer {
......
......@@ -11,8 +11,8 @@ use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use zeromq::{Socket, SocketRecv, SocketSend, SubSocket};
use dynamo_kv_router::protocols::{RouterEvent, WorkerId};
use dynamo_kv_router::zmq_wire::{KvEventBatch, convert_event};
use crate::protocols::{RouterEvent, WorkerId};
use crate::zmq_wire::{KvEventBatch, convert_event};
use super::indexer::Indexer;
......@@ -375,11 +375,6 @@ async fn zmq_recv_loop(
mod tests {
use zeromq::{PubSocket, Socket, SocketRecv, SocketSend, SubSocket};
/// Verify that the `zeromq` crate buffers a small number of messages in
/// TCP kernel buffers when `recv()` is not being called. The PUB socket
/// uses `try_send` with a noop waker — once the TCP send buffer is full
/// it silently drops messages (per ZMQ spec RFC 29). This test confirms
/// that a brief delay (simulating P2P recovery) doesn't lose messages.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn zmq_buffers_messages_during_brief_delay() {
let mut pub_socket = PubSocket::new();
......@@ -392,8 +387,6 @@ mod tests {
.await
.unwrap();
// Wait for SUB handshake: spawn recv in a background task so the
// PUB's accept/subscription processing can proceed concurrently.
let (tx, mut rx) = tokio::sync::mpsc::channel::<SubSocket>(1);
tokio::spawn(async move {
let _ = sub_socket.recv().await.unwrap();
......@@ -410,8 +403,6 @@ mod tests {
let num_messages = 10u64;
// Send messages without calling recv() — simulates the brief window
// between ZMQ connect and ready signal during P2P recovery.
for i in 0..num_messages {
pub_socket
.send(i.to_le_bytes().to_vec().into())
......@@ -419,10 +410,8 @@ mod tests {
.unwrap();
}
// Simulate recovery delay
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
// All messages should be buffered in TCP kernel buffers
for i in 0u64..num_messages {
let msg = tokio::time::timeout(std::time::Duration::from_secs(5), sub_socket.recv())
.await
......
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
pub mod indexer;
pub mod listener;
pub mod recovery;
pub mod registry;
pub mod server;
use std::sync::Arc;
use tokio::net::TcpListener;
use registry::WorkerRegistry;
use server::{AppState, create_router};
pub struct IndexerConfig {
pub block_size: Option<u32>,
pub port: u16,
pub threads: usize,
pub workers: Option<String>,
pub model_name: String,
pub tenant_id: String,
pub peers: Option<String>,
}
pub fn parse_workers(s: &str) -> Vec<(u64, u32, String)> {
s.split(',')
.filter(|entry| !entry.is_empty())
.filter_map(|entry| {
let (id_part, addr) = entry.split_once('=')?;
let id_part = id_part.trim();
let (id, dp_rank) = if let Some((id_str, rank_str)) = id_part.split_once(':') {
(id_str.parse::<u64>().ok()?, rank_str.parse::<u32>().ok()?)
} else {
(id_part.parse::<u64>().ok()?, 0)
};
Some((id, dp_rank, addr.trim().to_string()))
})
.collect()
}
pub async fn run_server(config: IndexerConfig) -> anyhow::Result<()> {
let peers: Vec<String> = config
.peers
.as_deref()
.map(|s| {
s.split(',')
.filter(|p| !p.is_empty())
.map(|p| p.trim().to_string())
.collect()
})
.unwrap_or_default();
tracing::info!(
block_size = ?config.block_size,
port = config.port,
threads = config.threads,
model_name = %config.model_name,
tenant_id = %config.tenant_id,
num_peers = peers.len(),
"Starting standalone KV cache indexer"
);
let registry = WorkerRegistry::new(config.threads);
if let Some(ref workers_str) = config.workers {
let block_size = config.block_size.ok_or_else(|| {
anyhow::anyhow!("--block-size is required when --workers is specified")
})?;
for (instance_id, dp_rank, endpoint) in parse_workers(workers_str) {
tracing::info!(instance_id, dp_rank, endpoint, "Registering initial worker");
registry
.register(
instance_id,
endpoint,
dp_rank,
config.model_name.clone(),
config.tenant_id.clone(),
block_size,
None,
)
.await?;
}
}
if !peers.is_empty() {
match recovery::recover_from_peers(&peers, &registry).await {
Ok(true) => tracing::info!("P2P recovery completed"),
Ok(false) => tracing::warn!("no reachable peers, starting with empty state"),
Err(e) => tracing::warn!(error = %e, "P2P recovery failed, starting with empty state"),
}
for peer in &peers {
registry.register_peer(peer.clone());
}
}
registry.signal_ready();
let state = Arc::new(AppState { registry });
let app = create_router(state);
let listener = TcpListener::bind(("0.0.0.0", config.port)).await?;
tracing::info!("HTTP server listening on 0.0.0.0:{}", config.port);
axum::serve(listener, app).await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_workers() {
let input = "1=tcp://host:5557,2:1=tcp://host:5558";
let result = parse_workers(input);
assert_eq!(result.len(), 2);
assert_eq!(result[0], (1, 0, "tcp://host:5557".to_string()));
assert_eq!(result[1], (2, 1, "tcp://host:5558".to_string()));
}
#[test]
fn test_parse_workers_empty() {
assert!(parse_workers("").is_empty());
}
}
......@@ -6,7 +6,7 @@ use std::collections::HashMap;
use anyhow::{Context, Result};
use serde::Deserialize;
use dynamo_kv_router::protocols::RouterEvent;
use crate::protocols::RouterEvent;
use super::registry::{IndexerKey, WorkerRegistry};
......@@ -22,11 +22,6 @@ pub async fn recover_from_peers(peers: &[String], registry: &WorkerRegistry) ->
.build()
.context("failed to build HTTP client")?;
// Brief delay to ensure the peer's tree state has advanced past the
// point where our ZMQ SUB sockets connected. The dump must cover any
// events that would otherwise be lost to the slow-joiner window —
// without this delay, the peer's dump could be stale relative to our
// ZMQ connection floor.
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
for peer_url in peers {
......
......@@ -11,7 +11,7 @@ use dashmap::mapref::one::Ref;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use dynamo_kv_router::protocols::WorkerId;
use crate::protocols::WorkerId;
use super::indexer::{Indexer, create_indexer};
use super::listener::run_zmq_listener;
......@@ -106,8 +106,6 @@ impl WorkerRegistry {
tenant_id,
};
// Get or create the indexer for this (model, tenant) pair.
// Use the entry API for atomic check-and-insert.
let indexer_entry = self.indexers.entry(key.clone()).or_insert_with(|| {
tracing::info!(
model_name = %key.model_name,
......@@ -312,7 +310,7 @@ impl WorkerRegistry {
Ok(())
}
#[expect(dead_code)]
#[cfg_attr(not(feature = "test-endpoints"), allow(dead_code))]
pub fn pause_listener(&self, instance_id: WorkerId, dp_rank: u32) -> Result<()> {
let mut entry = self
.workers
......@@ -328,7 +326,7 @@ impl WorkerRegistry {
Ok(())
}
#[expect(dead_code)]
#[cfg_attr(not(feature = "test-endpoints"), allow(dead_code))]
pub async fn resume_listener(&self, instance_id: WorkerId, dp_rank: u32) -> Result<()> {
{
let entry = self
......
......@@ -11,7 +11,7 @@ use axum::routing::{get, post};
use axum::{Json, Router};
use serde::{Deserialize, Serialize};
use dynamo_kv_router::protocols::{LocalBlockHash, WorkerId, compute_block_hash_for_seq};
use crate::protocols::{LocalBlockHash, WorkerId, compute_block_hash_for_seq};
use super::registry::{IndexerKey, WorkerRegistry};
......@@ -63,11 +63,6 @@ pub struct QueryRequest {
pub lora_name: Option<String>,
}
/// Query using pre-computed block hashes.
///
/// Callers must include the LoRA salt in their hashes when applicable — use
/// [`compute_block_hash_for_seq`] with the appropriate `lora_name`. The indexer
/// cannot retroactively apply a LoRA salt to pre-computed hashes.
#[derive(Deserialize)]
pub struct QueryByHashRequest {
pub block_hashes: Vec<i64>,
......@@ -160,7 +155,7 @@ async fn list_workers(State(state): State<Arc<AppState>>) -> impl IntoResponse {
}
fn build_score_response(
overlap: dynamo_kv_router::protocols::OverlapScores,
overlap: crate::protocols::OverlapScores,
block_size: u32,
) -> ScoreResponse {
let mut scores: HashMap<String, HashMap<String, u32>> = HashMap::new();
......
......@@ -5,7 +5,6 @@ import importlib.util
import logging
import os
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import Generator, Optional
......@@ -244,30 +243,6 @@ def predownload_tokenizers(pytestconfig):
os.environ.pop("HF_HUB_OFFLINE", None)
@pytest.fixture(scope="session")
def build_kv_indexer():
"""Pre-build the standalone KV indexer binary once per session.
Runs `cargo build` so that `cargo run` in tests starts instantly.
No-op if the binary is already cached in target/.
"""
_logger.info("Building dynamo-kv-indexer binary (cached after first build)")
subprocess.check_call(
[
"cargo",
"build",
"-p",
"dynamo-kv-router",
"--features",
"indexer-bin",
"--bin",
"dynamo-kv-indexer",
],
timeout=600,
)
_logger.info("dynamo-kv-indexer binary ready")
@pytest.fixture(autouse=True)
def logger(request):
log_dir = resolve_test_output_path(request.node.name)
......
......@@ -853,7 +853,6 @@ def test_indexers_sync(
request,
runtime_services_dynamic_ports,
predownload_tokenizers,
build_kv_indexer,
file_storage_backend,
store_backend,
durable_kv_events,
......@@ -974,7 +973,6 @@ def test_router_decisions(
request,
runtime_services_dynamic_ports,
predownload_tokenizers,
build_kv_indexer,
durable_kv_events,
use_kv_events,
request_plane,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment