Unverified Commit 07742cc2 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

feat: move standalone KV indexer into kv-router crate with HTTP integration tests (#6569)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
Co-authored-by: default avatarClaude Opus 4.6 <noreply@anthropic.com>
parent e7f3361e
...@@ -1878,6 +1878,7 @@ version = "1.0.0" ...@@ -1878,6 +1878,7 @@ version = "1.0.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
"axum 0.8.4",
"clap 4.5.60", "clap 4.5.60",
"dashmap 6.1.0", "dashmap 6.1.0",
"derive-getters", "derive-getters",
...@@ -1893,6 +1894,7 @@ dependencies = [ ...@@ -1893,6 +1894,7 @@ dependencies = [
"plotters", "plotters",
"prometheus", "prometheus",
"rand 0.9.2", "rand 0.9.2",
"rmp-serde",
"rstest 0.18.2", "rstest 0.18.2",
"rstest_reuse", "rstest_reuse",
"rustc-hash 2.1.1", "rustc-hash 2.1.1",
...@@ -1902,9 +1904,11 @@ dependencies = [ ...@@ -1902,9 +1904,11 @@ dependencies = [
"tokio", "tokio",
"tokio-util", "tokio-util",
"tracing", "tracing",
"tracing-subscriber",
"uuid", "uuid",
"validator", "validator",
"xxhash-rust", "xxhash-rust",
"zeromq",
] ]
[[package]] [[package]]
......
...@@ -2,15 +2,21 @@ ...@@ -2,15 +2,21 @@
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
title: Standalone KV Indexer title: Standalone KV Indexer
subtitle: Run the KV cache indexer as an independent service for querying block state subtitle: Run the KV cache indexer as an independent HTTP service for querying block state
--- ---
## Overview ## Overview
The standalone KV indexer runs the KV cache radix tree as an independent service, separate from the router. It subscribes to KV events from workers, maintains a radix tree of cached blocks, and exposes a query endpoint (`kv_indexer_query`) that external clients can use to inspect or query KV cache state. The standalone KV indexer (`dynamo-kv-indexer`) is a lightweight HTTP binary that subscribes to ZMQ KV event streams from workers, maintains a radix tree of cached blocks, and exposes HTTP endpoints for querying and managing workers.
This is distinct from the [Standalone Router](../../../components/src/dynamo/router/README.md), which is a full routing service. The standalone indexer provides only the indexing and query layer without routing logic. This is distinct from the [Standalone Router](../../../components/src/dynamo/router/README.md), which is a full routing service. The standalone indexer provides only the indexing and query layer without routing logic.
The HTTP API follows the [Mooncake KV Indexer RFC](https://github.com/kvcache-ai/Mooncake/issues/1403) conventions.
## Compatibility
The standalone indexer works with any engine that publishes KV cache events over ZMQ in the expected msgpack format. This includes bare vLLM and SGLang engines, which emit ZMQ KV events natively — no Dynamo-specific wrapper is required.
## Use Cases ## Use Cases
- **Debugging**: Inspect the radix tree state to verify which blocks are cached on which workers. - **Debugging**: Inspect the radix tree state to verify which blocks are cached on which workers.
...@@ -18,62 +24,121 @@ This is distinct from the [Standalone Router](../../../components/src/dynamo/rou ...@@ -18,62 +24,121 @@ This is distinct from the [Standalone Router](../../../components/src/dynamo/rou
- **Custom routing**: Build external routing logic that queries the indexer for overlap scores and makes its own worker selection decisions. - **Custom routing**: Build external routing logic that queries the indexer for overlap scores and makes its own worker selection decisions.
- **Monitoring**: Observe KV cache distribution across workers without running a full router. - **Monitoring**: Observe KV cache distribution across workers without running a full router.
## API ## Building
The binary is a feature-gated target in the `dynamo-kv-router` crate:
```bash
cargo build -p dynamo-kv-router --features indexer-bin --bin dynamo-kv-indexer
```
## CLI
```bash
dynamo-kv-indexer --block-size 16 --port 8090 [--threads 1] [--workers "1=tcp://host:5557,2=tcp://host:5558"]
```
| Flag | Default | Description |
|------|---------|-------------|
| `--block-size` | (required) | KV cache block size (must match the engine's block size) |
| `--port` | `8090` | HTTP server listen port |
| `--threads` | `1` | Number of indexer threads (1 = single-threaded, >1 = thread pool) |
| `--workers` | (none) | Initial workers as `instance_id=zmq_address,...` pairs |
## HTTP API
### `POST /register` — Register an endpoint
Register a ZMQ endpoint for an instance. Call once per dp_rank for data-parallel workers:
```bash
# Single dp_rank (dp_rank defaults to 0)
curl -X POST http://localhost:8090/register \
-H 'Content-Type: application/json' \
-d '{"instance_id": 1, "endpoint": "tcp://127.0.0.1:5557"}'
# Multiple dp_ranks — register each separately
curl -X POST http://localhost:8090/register \
-H 'Content-Type: application/json' \
-d '{"instance_id": 1, "endpoint": "tcp://127.0.0.1:5557", "dp_rank": 0}'
curl -X POST http://localhost:8090/register \
-H 'Content-Type: application/json' \
-d '{"instance_id": 1, "endpoint": "tcp://127.0.0.1:5558", "dp_rank": 1}'
```
The indexer spawns a ZMQ SUB listener for each endpoint and begins consuming KV events.
### Python ### `POST /unregister` — Deregister an instance
```python Remove all dp_ranks for an instance, or a specific dp_rank:
from dynamo._internal import start_kv_block_indexer
from dynamo.llm import KvRouterConfig
# Start the standalone indexer on a component's endpoint ```bash
await start_kv_block_indexer(endpoint, block_size, kv_router_config) # Remove all dp_ranks
curl -X POST http://localhost:8090/unregister \
-H 'Content-Type: application/json' \
-d '{"instance_id": 1}'
# Remove a specific dp_rank
curl -X POST http://localhost:8090/unregister \
-H 'Content-Type: application/json' \
-d '{"instance_id": 1, "dp_rank": 0}'
``` ```
**Parameters:** Cancels ZMQ listeners and removes the instance's blocks from the radix tree.
### `GET /workers` — List registered instances
| Parameter | Type | Description | ```bash
|-----------|------|-------------| curl http://localhost:8090/workers
| `endpoint` | `Endpoint` | The Dynamo runtime endpoint on whose component the indexer will run | ```
| `block_size` | `int` | KV cache block size (must match workers) |
| `kv_router_config` | `KvRouterConfig` | Router configuration (controls event threading, TTL, etc.) | Returns:
```json
[{"instance_id": 1, "endpoints": {"0": "tcp://127.0.0.1:5557", "1": "tcp://127.0.0.1:5558"}}]
```
### Query Endpoint ### `POST /query` — Query overlap for token IDs
Once started, the indexer exposes a `kv_indexer_query` endpoint on the same component. Clients can send one of three request types: Given raw token IDs, compute block hashes and return per-instance overlap scores:
**`FindMatchesTokens`** — Given raw tokens, compute block hashes and return per-worker overlap scores: ```bash
curl -X POST http://localhost:8090/query \
-H 'Content-Type: application/json' \
-d '{"token_ids": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]}'
```
```python Returns:
query_client = await query_endpoint.client() ```json
stream = await query_client.generate( {
{"FindMatchesTokens": {"tokens": [1, 2, 3, ...], "block_mm_infos": None}}, "scores": {"1": {"0": 2}, "2": {"1": 0}},
annotated=False, "frequencies": [1, 1],
) "tree_sizes": {"1": {"0": 5}, "2": {"1": 3}}
response = await stream.__anext__() }
# response == {"Matches": {"scores": {(worker_id, dp_rank): count, ...}, "frequencies": [...]}}
``` ```
**`FindMatchesHashed`** — Same as above but with pre-computed block hashes: Scores are nested by `instance_id` then `dp_rank`. Higher score means more cached prefix blocks on that instance.
```python ### `POST /query_by_hash` — Query overlap for pre-computed hashes
stream = await query_client.generate(
{"FindMatchesHashed": {"block_hashes": [hash1, hash2, ...]}}, ```bash
annotated=False, curl -X POST http://localhost:8090/query_by_hash \
) -H 'Content-Type: application/json' \
-d '{"block_hashes": [123456, 789012]}'
``` ```
**`DumpTree`** — Dump the full radix tree state as a list of router events: Same response format as `/query`.
### `GET /dump` — Dump all radix tree events
```python Returns the full radix tree state as a JSON array of `RouterEvent` objects:
stream = await query_client.generate("DumpTree", annotated=False)
response = await stream.__anext__() ```bash
events = response["TreeDump"] # List of RouterEvent objects curl http://localhost:8090/dump
``` ```
## Limitations ## Limitations
- **JetStream not supported**: The standalone indexer does not support `durable_kv_events` (JetStream mode). It relies on NATS Core or ZMQ event plane with local indexer mode. Attempting to start with `durable_kv_events=True` will raise an error. - **ZMQ only**: Workers must publish KV events via ZMQ PUB sockets. The standalone indexer does not subscribe to NATS event streams.
- **No routing logic**: The indexer only maintains the radix tree and answers queries. It does not track active blocks, manage request lifecycle, or perform worker selection. - **No routing logic**: The indexer only maintains the radix tree and answers queries. It does not track active blocks, manage request lifecycle, or perform worker selection.
## Architecture ## Architecture
...@@ -81,46 +146,39 @@ events = response["TreeDump"] # List of RouterEvent objects ...@@ -81,46 +146,39 @@ events = response["TreeDump"] # List of RouterEvent objects
```mermaid ```mermaid
graph TD graph TD
subgraph Workers subgraph Workers
W1[Worker 1<br/>KvEventPublisher] W1[Worker 1<br/>ZMQ PUB]
W2[Worker 2<br/>KvEventPublisher] W2[Worker 2<br/>ZMQ PUB]
end end
subgraph "Event Plane (NATS Core / ZMQ)" subgraph "Standalone Indexer (HTTP)"
EP[KV Events] REG[Worker Registry]
end ZMQ[ZMQ SUB Listeners]
subgraph "Standalone Indexer"
SUB[Subscriber]
IDX[Indexer / Radix Tree] IDX[Indexer / Radix Tree]
QE[kv_indexer_query endpoint] HTTP[HTTP API<br/>/query /dump /register]
end end
CLIENT[External Client] CLIENT[External Client]
W1 -->|publish events| EP W1 -->|ZMQ events| ZMQ
W2 -->|publish events| EP W2 -->|ZMQ events| ZMQ
EP -->|subscribe| SUB CLIENT -->|POST /register| REG
SUB -->|apply events| IDX REG -->|spawn listeners| ZMQ
CLIENT -->|FindMatches / DumpTree| QE ZMQ -->|apply events| IDX
QE -->|query| IDX CLIENT -->|POST /query, GET /dump| HTTP
HTTP -->|query| IDX
style EP fill:#e1f5fe,stroke:#333,color:#333
style W1 fill:#f3e5f5,stroke:#333,color:#333 style W1 fill:#f3e5f5,stroke:#333,color:#333
style W2 fill:#f3e5f5,stroke:#333,color:#333 style W2 fill:#f3e5f5,stroke:#333,color:#333
style IDX fill:#2e8b57,stroke:#333,color:#fff style IDX fill:#2e8b57,stroke:#333,color:#fff
style SUB fill:#2e8b57,stroke:#333,color:#fff style ZMQ fill:#2e8b57,stroke:#333,color:#fff
style QE fill:#2e8b57,stroke:#333,color:#fff style REG fill:#2e8b57,stroke:#333,color:#fff
style HTTP fill:#2e8b57,stroke:#333,color:#fff
style CLIENT fill:#fff3e0,stroke:#333,color:#333 style CLIENT fill:#fff3e0,stroke:#333,color:#333
``` ```
The standalone indexer internally:
1. Creates an `Indexer` instance with the given config and block size.
2. Starts a subscriber that listens for KV events from workers via the event plane. On worker discovery, it queries the worker's local indexer to bootstrap state.
3. Registers a `kv_indexer_query` endpoint that accepts `FindMatchesHashed`, `FindMatchesTokens`, and `DumpTree` requests.
## See Also ## See Also
- **[Mooncake KV Indexer RFC](https://github.com/kvcache-ai/Mooncake/issues/1403)**: Community API standardization for KV cache indexers
- **[Router Guide](router-guide.md)**: Full KV router configuration and tuning - **[Router Guide](router-guide.md)**: Full KV router configuration and tuning
- **[Router Design](../../design-docs/router-design.md)**: Architecture and event transport modes - **[Router Design](../../design-docs/router-design.md)**: Architecture and event transport modes
- **[Standalone Router](../../../components/src/dynamo/router/README.md)**: Full routing service (routes requests to workers) - **[Standalone Router](../../../components/src/dynamo/router/README.md)**: Full routing service (routes requests to workers)
...@@ -1528,6 +1528,7 @@ dependencies = [ ...@@ -1528,6 +1528,7 @@ dependencies = [
"parking_lot", "parking_lot",
"prometheus", "prometheus",
"rand 0.9.2", "rand 0.9.2",
"rmp-serde",
"rustc-hash 2.1.1", "rustc-hash 2.1.1",
"serde", "serde",
"thiserror 2.0.18", "thiserror 2.0.18",
...@@ -2964,9 +2965,9 @@ checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" ...@@ -2964,9 +2965,9 @@ checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2"
[[package]] [[package]]
name = "jiff" name = "jiff"
version = "0.2.21" version = "0.2.22"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3e3d65f018c6ae946ab16e80944b97096ed73c35b221d1c478a6c81d8f57940" checksum = "819b44bc7c87d9117eb522f14d46e918add69ff12713c475946b0a29363ed1c2"
dependencies = [ dependencies = [
"jiff-static", "jiff-static",
"jiff-tzdb-platform", "jiff-tzdb-platform",
...@@ -2979,9 +2980,9 @@ dependencies = [ ...@@ -2979,9 +2980,9 @@ dependencies = [
[[package]] [[package]]
name = "jiff-static" name = "jiff-static"
version = "0.2.21" version = "0.2.22"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a17c2b211d863c7fde02cbea8a3c1a439b98e109286554f2860bdded7ff83818" checksum = "470252db18ecc35fd766c0891b1e3ec6cbbcd62507e85276c01bf75d8e94d4a1"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
...@@ -4995,12 +4996,9 @@ dependencies = [ ...@@ -4995,12 +4996,9 @@ dependencies = [
[[package]] [[package]]
name = "pxfm" name = "pxfm"
version = "0.1.27" version = "0.1.28"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7186d3822593aa4393561d186d1393b3923e9d6163d3fbfd6e825e3e6cf3e6a8" checksum = "b5a041e753da8b807c9255f28de81879c78c876392ff2469cde94799b2896b9d"
dependencies = [
"num-traits",
]
[[package]] [[package]]
name = "py_literal" name = "py_literal"
...@@ -8236,9 +8234,9 @@ dependencies = [ ...@@ -8236,9 +8234,9 @@ dependencies = [
[[package]] [[package]]
name = "zlib-rs" name = "zlib-rs"
version = "0.6.2" version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c745c48e1007337ed136dc99df34128b9faa6ed542d80a1c673cf55a6d7236c8" checksum = "3be3d40e40a133f9c916ee3f9f4fa2d9d63435b5fbe1bfc6d9dae0aa0ada1513"
[[package]] [[package]]
name = "zmij" name = "zmij"
......
...@@ -1536,6 +1536,7 @@ dependencies = [ ...@@ -1536,6 +1536,7 @@ dependencies = [
"parking_lot", "parking_lot",
"prometheus", "prometheus",
"rand 0.9.2", "rand 0.9.2",
"rmp-serde",
"rustc-hash 2.1.1", "rustc-hash 2.1.1",
"serde", "serde",
"thiserror 2.0.18", "thiserror 2.0.18",
...@@ -3024,9 +3025,9 @@ checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" ...@@ -3024,9 +3025,9 @@ checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2"
[[package]] [[package]]
name = "jiff" name = "jiff"
version = "0.2.21" version = "0.2.22"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3e3d65f018c6ae946ab16e80944b97096ed73c35b221d1c478a6c81d8f57940" checksum = "819b44bc7c87d9117eb522f14d46e918add69ff12713c475946b0a29363ed1c2"
dependencies = [ dependencies = [
"jiff-static", "jiff-static",
"jiff-tzdb-platform", "jiff-tzdb-platform",
...@@ -3039,9 +3040,9 @@ dependencies = [ ...@@ -3039,9 +3040,9 @@ dependencies = [
[[package]] [[package]]
name = "jiff-static" name = "jiff-static"
version = "0.2.21" version = "0.2.22"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a17c2b211d863c7fde02cbea8a3c1a439b98e109286554f2860bdded7ff83818" checksum = "470252db18ecc35fd766c0891b1e3ec6cbbcd62507e85276c01bf75d8e94d4a1"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
...@@ -5043,12 +5044,9 @@ dependencies = [ ...@@ -5043,12 +5044,9 @@ dependencies = [
[[package]] [[package]]
name = "pxfm" name = "pxfm"
version = "0.1.27" version = "0.1.28"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7186d3822593aa4393561d186d1393b3923e9d6163d3fbfd6e825e3e6cf3e6a8" checksum = "b5a041e753da8b807c9255f28de81879c78c876392ff2469cde94799b2896b9d"
dependencies = [
"num-traits",
]
[[package]] [[package]]
name = "py_literal" name = "py_literal"
...@@ -8311,9 +8309,9 @@ dependencies = [ ...@@ -8311,9 +8309,9 @@ dependencies = [
[[package]] [[package]]
name = "zlib-rs" name = "zlib-rs"
version = "0.6.2" version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c745c48e1007337ed136dc99df34128b9faa6ed542d80a1c673cf55a6d7236c8" checksum = "3be3d40e40a133f9c916ee3f9f4fa2d9d63435b5fbe1bfc6d9dae0aa0ada1513"
[[package]] [[package]]
name = "zmij" name = "zmij"
......
...@@ -140,7 +140,6 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> { ...@@ -140,7 +140,6 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
} }
m.add_function(wrap_pyfunction!(llm::kv::compute_block_hash_for_seq_py, m)?)?; m.add_function(wrap_pyfunction!(llm::kv::compute_block_hash_for_seq_py, m)?)?;
m.add_function(wrap_pyfunction!(llm::kv::start_kv_block_indexer_py, m)?)?;
m.add_function(wrap_pyfunction!(lora_name_to_id, m)?)?; m.add_function(wrap_pyfunction!(lora_name_to_id, m)?)?;
m.add_function(wrap_pyfunction!(log_message, m)?)?; m.add_function(wrap_pyfunction!(log_message, m)?)?;
m.add_function(wrap_pyfunction!(register_model, m)?)?; m.add_function(wrap_pyfunction!(register_model, m)?)?;
......
...@@ -26,26 +26,6 @@ fn depythonize_block_mm_infos(obj: &Bound<'_, PyAny>) -> PyResult<Vec<Option<Blo ...@@ -26,26 +26,6 @@ fn depythonize_block_mm_infos(obj: &Bound<'_, PyAny>) -> PyResult<Vec<Option<Blo
depythonize(obj).map_err(to_pyerr) depythonize(obj).map_err(to_pyerr)
} }
#[pyfunction]
#[pyo3(name = "start_kv_block_indexer", signature = (endpoint, block_size, kv_router_config))]
pub fn start_kv_block_indexer_py<'p>(
py: Python<'p>,
endpoint: &Endpoint,
block_size: u32,
kv_router_config: &super::entrypoint::KvRouterConfig,
) -> PyResult<Bound<'p, PyAny>> {
let component = endpoint.inner.component().clone();
let config = kv_router_config.inner();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
llm_rs::kv_router::indexer_standalone::start_kv_block_indexer(
&component, &config, block_size,
)
.await
.map_err(to_pyerr)?;
Ok(())
})
}
#[pyfunction] #[pyfunction]
#[pyo3(name = "compute_block_hash_for_seq", signature = (tokens, kv_block_size, block_mm_infos=None, lora_name=None))] #[pyo3(name = "compute_block_hash_for_seq", signature = (tokens, kv_block_size, block_mm_infos=None, lora_name=None))]
pub fn compute_block_hash_for_seq_py( pub fn compute_block_hash_for_seq_py(
......
...@@ -1015,12 +1015,6 @@ class KvRouterConfig: ...@@ -1015,12 +1015,6 @@ class KvRouterConfig:
""" """
... ...
async def start_kv_block_indexer(
endpoint: Endpoint,
block_size: int,
kv_router_config: KvRouterConfig,
) -> None: ...
async def register_model( async def register_model(
model_input: ModelInput, model_input: ModelInput,
model_type: ModelType, model_type: ModelType,
......
...@@ -12,9 +12,7 @@ For public APIs, use dynamo.runtime and dynamo.llm. ...@@ -12,9 +12,7 @@ For public APIs, use dynamo.runtime and dynamo.llm.
# Re-export from _core # Re-export from _core
from dynamo._core import ModelDeploymentCard as ModelDeploymentCard from dynamo._core import ModelDeploymentCard as ModelDeploymentCard
from dynamo._core import start_kv_block_indexer as start_kv_block_indexer
__all__ = [ __all__ = [
"ModelDeploymentCard", "ModelDeploymentCard",
"start_kv_block_indexer",
] ]
...@@ -3,9 +3,7 @@ ...@@ -3,9 +3,7 @@
# Type stubs - re-export from _core # Type stubs - re-export from _core
from dynamo._core import ModelDeploymentCard as ModelDeploymentCard from dynamo._core import ModelDeploymentCard as ModelDeploymentCard
from dynamo._core import start_kv_block_indexer as start_kv_block_indexer
__all__ = [ __all__ = [
"ModelDeploymentCard", "ModelDeploymentCard",
"start_kv_block_indexer",
] ]
...@@ -14,6 +14,7 @@ repository.workspace = true ...@@ -14,6 +14,7 @@ repository.workspace = true
default = [] default = []
metrics = [] metrics = []
bench = ["dep:clap", "dep:indicatif", "dep:serde_json", "dynamo-runtime/integration", "dep:plotters"] bench = ["dep:clap", "dep:indicatif", "dep:serde_json", "dynamo-runtime/integration", "dep:plotters"]
indexer-bin = ["metrics", "dep:axum", "dep:clap", "dep:zeromq", "dep:tracing-subscriber", "dep:serde_json"]
[dependencies] [dependencies]
# repo # repo
...@@ -41,6 +42,7 @@ xxhash-rust = { workspace = true } ...@@ -41,6 +42,7 @@ xxhash-rust = { workspace = true }
# dependencies # dependencies
flume = "0.12.0" flume = "0.12.0"
parking_lot = { workspace = true } parking_lot = { workspace = true }
rmp-serde = { workspace = true }
# bench (optional) # bench (optional)
clap = { version = "4.5", features = ["derive"], optional = true } clap = { version = "4.5", features = ["derive"], optional = true }
...@@ -48,6 +50,11 @@ indicatif = { version = "0.18.0", optional = true } ...@@ -48,6 +50,11 @@ indicatif = { version = "0.18.0", optional = true }
plotters = { version = "0.3", optional = true, default-features = false, features = ["svg_backend", "line_series", "point_series", "full_palette"] } plotters = { version = "0.3", optional = true, default-features = false, features = ["svg_backend", "line_series", "point_series", "full_palette"] }
rustc-hash = "2.1.1" rustc-hash = "2.1.1"
# indexer-bin (optional)
axum = { workspace = true, optional = true }
zeromq = { version = "0.4.1", optional = true }
tracing-subscriber = { workspace = true, optional = true }
[dev-dependencies] [dev-dependencies]
dynamo-bench = { path = "../bench" } dynamo-bench = { path = "../bench" }
rstest = "0.18.2" rstest = "0.18.2"
...@@ -72,3 +79,8 @@ required-features = ["bench"] ...@@ -72,3 +79,8 @@ required-features = ["bench"]
name = "active_sequences_bench" name = "active_sequences_bench"
harness = false harness = false
required-features = ["bench"] required-features = ["bench"]
[[bin]]
name = "dynamo-kv-indexer"
path = "src/bin/kv_indexer/main.rs"
required-features = ["indexer-bin"]
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::sync::Arc;
use anyhow::Result;
use tokio_util::sync::CancellationToken;
use dynamo_kv_router::ConcurrentRadixTree;
use dynamo_kv_router::ThreadPoolIndexer;
use dynamo_kv_router::indexer::{KvIndexer, KvIndexerInterface, KvIndexerMetrics};
use dynamo_kv_router::protocols::{LocalBlockHash, OverlapScores, RouterEvent, WorkerId};
#[derive(Clone)]
pub enum Indexer {
Single(KvIndexer),
Concurrent(Arc<ThreadPoolIndexer<ConcurrentRadixTree>>),
}
impl Indexer {
pub async fn apply_event(&self, event: RouterEvent) {
match self {
Indexer::Single(idx) => idx.apply_event(event).await,
Indexer::Concurrent(idx) => idx.apply_event(event).await,
}
}
pub async fn remove_worker(&self, worker_id: WorkerId) {
match self {
Indexer::Single(idx) => idx.remove_worker(worker_id).await,
Indexer::Concurrent(idx) => idx.remove_worker(worker_id).await,
}
}
pub async fn find_matches(&self, hashes: Vec<LocalBlockHash>) -> Result<OverlapScores> {
match self {
Indexer::Single(idx) => idx.find_matches(hashes).await.map_err(Into::into),
Indexer::Concurrent(idx) => idx.find_matches(hashes).await.map_err(Into::into),
}
}
pub async fn dump_events(&self) -> Result<Vec<RouterEvent>> {
match self {
Indexer::Single(idx) => idx.dump_events().await.map_err(Into::into),
Indexer::Concurrent(idx) => idx.dump_events().await.map_err(Into::into),
}
}
}
pub fn create_indexer(block_size: u32, num_threads: usize) -> Indexer {
if num_threads > 1 {
Indexer::Concurrent(Arc::new(ThreadPoolIndexer::new(
ConcurrentRadixTree::new(),
num_threads,
block_size,
)))
} else {
Indexer::Single(KvIndexer::new_with_frequency(
CancellationToken::new(),
None,
block_size,
Arc::new(KvIndexerMetrics::new_unregistered()),
None,
))
}
}
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::time::Duration;
use rmp_serde as rmps;
use tokio_util::sync::CancellationToken;
use zeromq::{Socket, SocketRecv, SubSocket};
use dynamo_kv_router::protocols::{RouterEvent, WorkerId};
use dynamo_kv_router::zmq_wire::{KvEventBatch, convert_event};
use super::indexer::Indexer;
const INITIAL_BACKOFF_MS: u64 = 10;
const MAX_BACKOFF_MS: u64 = 5000;
const MAX_CONSECUTIVE_ERRORS: u32 = 10;
const MAX_BACKOFF_EXPONENT: u32 = 8;
fn calculate_backoff_ms(consecutive_errors: u32) -> u64 {
std::cmp::min(
INITIAL_BACKOFF_MS * 2_u64.pow(consecutive_errors.min(MAX_BACKOFF_EXPONENT)),
MAX_BACKOFF_MS,
)
}
pub async fn run_zmq_listener(
worker_id: WorkerId,
dp_rank: u32,
zmq_address: String,
block_size: u32,
indexer: Indexer,
cancel: CancellationToken,
) {
tracing::info!(worker_id, dp_rank, zmq_address, "ZMQ listener starting");
let mut socket = SubSocket::new();
if let Err(e) = socket.subscribe("").await {
tracing::error!("Failed to subscribe on ZMQ socket: {e}");
return;
}
if let Err(e) = socket.connect(&zmq_address).await {
tracing::error!("Failed to connect ZMQ SUB socket to {zmq_address}: {e}");
return;
}
let next_event_id = AtomicU64::new(0);
let warning_count = Arc::new(AtomicU32::new(0));
let mut consecutive_errors = 0u32;
#[allow(unused_assignments)]
let mut exit_reason = "unknown";
let mut messages_processed = 0u64;
'main: loop {
tokio::select! {
biased;
_ = cancel.cancelled() => {
exit_reason = "cancelled";
break 'main;
}
msg_result = socket.recv() => {
let Ok(msg) = msg_result else {
let e = msg_result.unwrap_err();
consecutive_errors += 1;
if consecutive_errors >= MAX_CONSECUTIVE_ERRORS {
tracing::error!(
error=%e,
consecutive_errors,
worker_id,
"Too many consecutive ZMQ errors, terminating listener"
);
exit_reason = "too many consecutive errors";
break 'main;
}
let backoff_ms = calculate_backoff_ms(consecutive_errors);
tracing::warn!(
error=%e,
consecutive_errors,
backoff_ms,
worker_id,
"ZMQ recv error, backing off"
);
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
continue;
};
consecutive_errors = 0;
let mut frames: Vec<Vec<u8>> = msg.into_vec().into_iter().map(|f| f.to_vec()).collect();
if frames.len() != 3 {
tracing::warn!(worker_id, "Unexpected ZMQ frame count: {}", frames.len());
continue;
}
let payload = frames.pop().unwrap();
let seq_bytes = frames.pop().unwrap();
if seq_bytes.len() != 8 {
tracing::warn!(worker_id, "Invalid sequence number length: {}", seq_bytes.len());
continue;
}
let batch_result = rmps::from_slice::<KvEventBatch>(&payload);
let Ok(batch) = batch_result else {
tracing::warn!(worker_id, "Failed to decode KvEventBatch: {}", batch_result.unwrap_err());
continue;
};
let effective_dp_rank = batch.data_parallel_rank.map_or(dp_rank, |r| r as u32);
for raw_event in batch.events.into_iter() {
let event_id = next_event_id.fetch_add(1, Ordering::SeqCst);
let kv_event = convert_event(raw_event, event_id, block_size, effective_dp_rank, &warning_count);
let router_event = RouterEvent::new(worker_id, kv_event);
indexer.apply_event(router_event).await;
messages_processed += 1;
}
}
}
}
tracing::info!(
worker_id,
dp_rank,
exit_reason,
messages_processed,
"ZMQ listener exiting"
);
}
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::sync::Arc;
use clap::Parser;
use tokio::net::TcpListener;
mod indexer;
mod listener;
mod registry;
mod server;
use indexer::create_indexer;
use registry::WorkerRegistry;
use server::{AppState, create_router};
#[derive(Parser)]
#[command(name = "dynamo-kv-indexer", about = "Standalone KV cache indexer")]
struct Cli {
/// KV cache block size (must match the vLLM engine's block size)
#[arg(long)]
block_size: u32,
/// HTTP server port
#[arg(long, default_value_t = 8090)]
port: u16,
/// Number of indexer threads (1 = single-threaded KvIndexer, >1 = ThreadPoolIndexer)
#[arg(long, default_value_t = 1)]
threads: usize,
/// Initial workers as "worker_id=zmq_address,..." (e.g. "1=tcp://host:5557,2=tcp://host:5558")
#[arg(long)]
workers: Option<String>,
}
fn parse_workers(s: &str) -> Vec<(u64, String)> {
s.split(',')
.filter(|entry| !entry.is_empty())
.filter_map(|entry| {
let (id_str, addr) = entry.split_once('=')?;
let id = id_str.trim().parse::<u64>().ok()?;
Some((id, addr.trim().to_string()))
})
.collect()
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.init();
let cli = Cli::parse();
tracing::info!(
block_size = cli.block_size,
port = cli.port,
threads = cli.threads,
"Starting standalone KV cache indexer"
);
let indexer = create_indexer(cli.block_size, cli.threads);
let registry = WorkerRegistry::new(indexer, cli.block_size);
if let Some(ref workers_str) = cli.workers {
for (instance_id, endpoint) in parse_workers(workers_str) {
tracing::info!(instance_id, endpoint, "Registering initial worker");
registry.register(instance_id, endpoint, 0)?;
}
}
let state = Arc::new(AppState {
registry,
block_size: cli.block_size,
});
let app = create_router(state);
let listener = TcpListener::bind(("0.0.0.0", cli.port)).await?;
tracing::info!("HTTP server listening on 0.0.0.0:{}", cli.port);
axum::serve(listener, app).await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_workers() {
let input = "1=tcp://host:5557,2=tcp://host:5558";
let result = parse_workers(input);
assert_eq!(result.len(), 2);
assert_eq!(result[0], (1, "tcp://host:5557".to_string()));
assert_eq!(result[1], (2, "tcp://host:5558".to_string()));
}
#[test]
fn test_parse_workers_empty() {
assert!(parse_workers("").is_empty());
}
}
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::collections::HashMap;
use anyhow::{Result, bail};
use dashmap::DashMap;
use tokio_util::sync::CancellationToken;
use dynamo_kv_router::protocols::WorkerId;
use super::indexer::Indexer;
use super::listener::run_zmq_listener;
pub struct EndpointEntry {
pub endpoint: String,
}
pub struct WorkerEntry {
pub endpoints: HashMap<u32, EndpointEntry>,
cancel: CancellationToken,
}
pub struct WorkerRegistry {
workers: DashMap<WorkerId, WorkerEntry>,
indexer: Indexer,
block_size: u32,
}
impl WorkerRegistry {
pub fn new(indexer: Indexer, block_size: u32) -> Self {
Self {
workers: DashMap::new(),
indexer,
block_size,
}
}
pub fn register(&self, instance_id: WorkerId, endpoint: String, dp_rank: u32) -> Result<()> {
let mut entry = self
.workers
.entry(instance_id)
.or_insert_with(|| WorkerEntry {
endpoints: HashMap::new(),
cancel: CancellationToken::new(),
});
if entry.endpoints.contains_key(&dp_rank) {
bail!("instance {instance_id} dp_rank {dp_rank} already registered");
}
let child_cancel = entry.cancel.child_token();
let indexer = self.indexer.clone();
let block_size = self.block_size;
let addr = endpoint.clone();
tokio::spawn(async move {
run_zmq_listener(
instance_id,
dp_rank,
addr,
block_size,
indexer,
child_cancel,
)
.await;
});
entry.endpoints.insert(dp_rank, EndpointEntry { endpoint });
Ok(())
}
pub async fn deregister(&self, instance_id: WorkerId) -> Result<()> {
let (_, entry) = self
.workers
.remove(&instance_id)
.ok_or_else(|| anyhow::anyhow!("instance {instance_id} not found"))?;
entry.cancel.cancel();
self.indexer.remove_worker(instance_id).await;
Ok(())
}
pub async fn deregister_dp_rank(&self, instance_id: WorkerId, dp_rank: u32) -> Result<()> {
let mut entry = self
.workers
.get_mut(&instance_id)
.ok_or_else(|| anyhow::anyhow!("instance {instance_id} not found"))?;
if entry.endpoints.remove(&dp_rank).is_none() {
bail!("instance {instance_id} dp_rank {dp_rank} not found");
}
if entry.endpoints.is_empty() {
drop(entry);
return self.deregister(instance_id).await;
}
Ok(())
}
pub fn list(&self) -> Vec<(WorkerId, HashMap<u32, String>)> {
self.workers
.iter()
.map(|entry| {
let endpoints: HashMap<u32, String> = entry
.value()
.endpoints
.iter()
.map(|(&dp_rank, e)| (dp_rank, e.endpoint.clone()))
.collect();
(*entry.key(), endpoints)
})
.collect()
}
pub fn indexer(&self) -> &Indexer {
&self.indexer
}
}
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::collections::HashMap;
use std::sync::Arc;
use axum::extract::State;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::{get, post};
use axum::{Json, Router};
use serde::{Deserialize, Serialize};
use dynamo_kv_router::protocols::{LocalBlockHash, WorkerId, compute_block_hash_for_seq};
use super::registry::WorkerRegistry;
pub struct AppState {
pub registry: WorkerRegistry,
pub block_size: u32,
}
#[derive(Deserialize)]
pub struct RegisterRequest {
pub instance_id: WorkerId,
pub endpoint: String,
#[serde(default)]
pub dp_rank: Option<u32>,
}
#[derive(Deserialize)]
pub struct UnregisterRequest {
pub instance_id: WorkerId,
#[serde(default)]
pub dp_rank: Option<u32>,
}
#[derive(Serialize)]
struct WorkerInfo {
instance_id: WorkerId,
endpoints: HashMap<u32, String>,
}
#[derive(Deserialize)]
pub struct QueryRequest {
pub token_ids: Vec<u32>,
#[serde(default)]
pub lora_name: Option<String>,
}
#[derive(Deserialize)]
pub struct QueryByHashRequest {
pub block_hashes: Vec<i64>,
}
#[derive(Serialize)]
struct ScoreResponse {
scores: HashMap<String, HashMap<String, u32>>,
frequencies: Vec<usize>,
tree_sizes: HashMap<String, HashMap<String, usize>>,
}
async fn register(
State(state): State<Arc<AppState>>,
Json(req): Json<RegisterRequest>,
) -> impl IntoResponse {
match state
.registry
.register(req.instance_id, req.endpoint, req.dp_rank.unwrap_or(0))
{
Ok(()) => (
StatusCode::CREATED,
Json(serde_json::json!({"status": "ok"})),
),
Err(e) => (
StatusCode::CONFLICT,
Json(serde_json::json!({"error": e.to_string()})),
),
}
}
async fn unregister(
State(state): State<Arc<AppState>>,
Json(req): Json<UnregisterRequest>,
) -> impl IntoResponse {
let result = match req.dp_rank {
Some(dp_rank) => {
state
.registry
.deregister_dp_rank(req.instance_id, dp_rank)
.await
}
None => state.registry.deregister(req.instance_id).await,
};
match result {
Ok(()) => (StatusCode::OK, Json(serde_json::json!({"status": "ok"}))),
Err(e) => (
StatusCode::NOT_FOUND,
Json(serde_json::json!({"error": e.to_string()})),
),
}
}
async fn list_workers(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let workers: Vec<WorkerInfo> = state
.registry
.list()
.into_iter()
.map(|(instance_id, endpoints)| WorkerInfo {
instance_id,
endpoints,
})
.collect();
Json(workers)
}
fn build_score_response(overlap: dynamo_kv_router::protocols::OverlapScores) -> ScoreResponse {
let mut scores: HashMap<String, HashMap<String, u32>> = HashMap::new();
for (k, v) in &overlap.scores {
scores
.entry(k.worker_id.to_string())
.or_default()
.insert(k.dp_rank.to_string(), *v);
}
let mut tree_sizes: HashMap<String, HashMap<String, usize>> = HashMap::new();
for (k, v) in &overlap.tree_sizes {
tree_sizes
.entry(k.worker_id.to_string())
.or_default()
.insert(k.dp_rank.to_string(), *v);
}
ScoreResponse {
scores,
frequencies: overlap.frequencies,
tree_sizes,
}
}
async fn query(
State(state): State<Arc<AppState>>,
Json(req): Json<QueryRequest>,
) -> impl IntoResponse {
let block_hashes = compute_block_hash_for_seq(
&req.token_ids,
state.block_size,
None,
req.lora_name.as_deref(),
);
match state.registry.indexer().find_matches(block_hashes).await {
Ok(overlap) => (
StatusCode::OK,
Json(serde_json::json!(build_score_response(overlap))),
),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
),
}
}
async fn query_by_hash(
State(state): State<Arc<AppState>>,
Json(req): Json<QueryByHashRequest>,
) -> impl IntoResponse {
let block_hashes: Vec<LocalBlockHash> = req
.block_hashes
.iter()
.map(|h| LocalBlockHash(*h as u64))
.collect();
match state.registry.indexer().find_matches(block_hashes).await {
Ok(overlap) => (
StatusCode::OK,
Json(serde_json::json!(build_score_response(overlap))),
),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
),
}
}
async fn dump_events(State(state): State<Arc<AppState>>) -> impl IntoResponse {
match state.registry.indexer().dump_events().await {
Ok(events) => (StatusCode::OK, Json(serde_json::json!(events))),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
),
}
}
pub fn create_router(state: Arc<AppState>) -> Router {
Router::new()
.route("/register", post(register))
.route("/unregister", post(unregister))
.route("/workers", get(list_workers))
.route("/query", post(query))
.route("/query_by_hash", post(query_by_hash))
.route("/dump", get(dump_events))
.with_state(state)
}
...@@ -16,6 +16,7 @@ pub mod protocols; ...@@ -16,6 +16,7 @@ pub mod protocols;
pub mod radix_tree; pub mod radix_tree;
pub mod scheduling; pub mod scheduling;
pub mod sequences; pub mod sequences;
pub mod zmq_wire;
// Backward-compat re-exports: preserve old module paths for external consumers // Backward-compat re-exports: preserve old module paths for external consumers
pub use scheduling::config; pub use scheduling::config;
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Wire-format types for vLLM ZMQ KV event streams.
//!
//! These types mirror the Python `msgspec`-defined structures emitted by vLLM
//! engines over ZMQ PUB sockets. They are independent of the dynamo runtime
//! and can be used by any crate that needs to decode the raw ZMQ payloads.
use std::collections::HashSet;
use std::fmt;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use serde::Deserialize;
use serde::Serialize;
use serde::de::{self, Deserializer, IgnoredAny, MapAccess, SeqAccess, Visitor};
use crate::protocols::{
BlockExtraInfo, BlockMmObjectInfo, ExternalSequenceBlockHash, KvCacheEvent, KvCacheEventData,
KvCacheRemoveData, KvCacheStoreData, KvCacheStoredBlockData, compute_block_hash_for_seq,
};
// -------------------------------------------------------------------------
// Types mirroring the Python msgspec-defined structures -------------------
// -------------------------------------------------------------------------
#[derive(Debug, Serialize)]
pub struct KvEventBatch {
pub ts: f64,
pub events: Vec<RawKvEvent>,
#[serde(alias = "dp_rank")]
pub data_parallel_rank: Option<i32>,
}
impl<'de> Deserialize<'de> for KvEventBatch {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
// Deserialize from array format: [timestamp, [events], data_parallel_rank]
let arr: (f64, Vec<RawKvEvent>, Option<i32>) = Deserialize::deserialize(deserializer)?;
Ok(KvEventBatch {
ts: arr.0,
events: arr.1,
data_parallel_rank: arr.2,
})
}
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
#[serde(untagged)]
pub enum BlockHashValue {
Signed(i64),
Unsigned(u64),
}
impl BlockHashValue {
pub fn into_u64(self) -> u64 {
match self {
BlockHashValue::Signed(v) => v as u64,
BlockHashValue::Unsigned(v) => v,
}
}
}
#[derive(Debug, Serialize, Clone)]
#[serde(tag = "type")] // msgspec encodes variant tag as a string when `tag=True`
pub enum RawKvEvent {
BlockStored {
/// Block hashes may be emitted as either signed or unsigned 64-bit values.
/// We normalize them to `u64` while deserializing to support both producers.
block_hashes: Vec<BlockHashValue>,
parent_block_hash: Option<BlockHashValue>,
token_ids: Vec<u32>,
block_size: usize,
#[serde(skip_serializing_if = "Option::is_none")]
medium: Option<String>,
/// LoRA adapter name for adapter-aware block hashing
#[serde(default, skip_serializing_if = "Option::is_none")]
lora_name: Option<String>,
/// Multimodal extra info for each block (length should match block_hashes)
#[serde(default, skip_serializing_if = "Option::is_none")]
block_mm_infos: Option<Vec<Option<BlockExtraInfo>>>,
},
BlockRemoved {
block_hashes: Vec<BlockHashValue>,
#[serde(skip_serializing_if = "Option::is_none")]
medium: Option<String>,
},
AllBlocksCleared,
}
/// Parse MM hash from extra_keys string:
/// - Only accept canonical vLLM MM identifiers (64-char hex digest)
/// - Convert by taking the first 16 hex chars as u64
pub fn parse_mm_hash_from_extra_key(s: &str) -> Option<u64> {
// extra_keys mixes MM identifiers with LoRA/cache_salt/prompt-embed metadata.
// Only MM identifiers should be mapped into BlockExtraInfo.
if s.len() == 64 && s.chars().all(|c| c.is_ascii_hexdigit()) {
return u64::from_str_radix(&s[..16], 16).ok();
}
None
}
/// Convert vLLM BlockStored extra_keys to block-level MM infos.
/// extra_keys is a list aligned with blocks:
/// - None => no MM content in that block
/// - ["hash1", "hash2", ...] => one or more MM objects in that block
pub fn extra_keys_to_block_mm_infos(
extra_keys: Option<Vec<Option<Vec<String>>>>,
) -> Option<Vec<Option<BlockExtraInfo>>> {
let extra_keys = extra_keys?;
if extra_keys.is_empty() {
return None;
}
let infos: Vec<Option<BlockExtraInfo>> = extra_keys
.into_iter()
.map(|block_keys| {
let mm_objects: Vec<BlockMmObjectInfo> = block_keys
.unwrap_or_default()
.iter()
.filter_map(|key| parse_mm_hash_from_extra_key(key))
.map(|mm_hash| BlockMmObjectInfo {
mm_hash,
offsets: vec![], // extra_keys does not carry offsets today
})
.collect();
if mm_objects.is_empty() {
None
} else {
Some(BlockExtraInfo { mm_objects })
}
})
.collect();
if infos.iter().all(|i| i.is_none()) {
return None;
}
Some(infos)
}
// -------------------------------------------------------------------------
// Custom deserializer for RawKvEvent --------------------------------------
// -------------------------------------------------------------------------
/// Our producers use msgspec with `tag=True` and `array_like=True`, which
/// encodes each event as either a tagged map or a tagged tuple. To be tolerant of
/// additional fields that may be appended in the future, we implement a custom
/// deserializer that ignores unknown keys and any extra positional elements.
///
/// This keeps us compatible with older payloads while safely
/// accepting newer ones that include extra metadata.
impl<'de> Deserialize<'de> for RawKvEvent {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_any(RawKvEventVisitor)
}
}
struct RawKvEventVisitor;
impl<'de> Visitor<'de> for RawKvEventVisitor {
type Value = RawKvEvent;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a kv event encoded as a tagged map or sequence")
}
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: MapAccess<'de>,
{
let mut event_type: Option<String> = None;
let mut block_hashes: Option<Vec<BlockHashValue>> = None;
let mut parent_block_hash: Option<Option<BlockHashValue>> = None;
let mut token_ids: Option<Vec<u32>> = None;
let mut block_size: Option<usize> = None;
let mut medium: Option<Option<String>> = None;
let mut lora_name: Option<Option<String>> = None;
let mut extra_keys: Option<Option<Vec<Option<Vec<String>>>>> = None;
let mut block_mm_infos: Option<Option<Vec<Option<BlockExtraInfo>>>> = None;
while let Some(key) = map.next_key::<String>()? {
match key.as_str() {
"type" => {
event_type = Some(map.next_value()?);
}
"block_hashes" => {
block_hashes = Some(map.next_value()?);
}
"parent_block_hash" => {
parent_block_hash = Some(map.next_value()?);
}
"token_ids" => {
token_ids = Some(map.next_value()?);
}
"block_size" => {
block_size = Some(map.next_value()?);
}
"medium" => {
medium = Some(map.next_value()?);
}
"lora_name" => {
lora_name = Some(map.next_value()?);
}
"extra_keys" => {
extra_keys = Some(map.next_value()?);
}
"block_mm_infos" => {
block_mm_infos = Some(map.next_value()?);
}
_ => {
map.next_value::<IgnoredAny>()?;
}
}
}
match event_type.as_deref() {
Some("BlockStored") => {
let block_hashes =
block_hashes.ok_or_else(|| de::Error::missing_field("block_hashes"))?;
let token_ids = token_ids.ok_or_else(|| de::Error::missing_field("token_ids"))?;
let block_size =
block_size.ok_or_else(|| de::Error::missing_field("block_size"))?;
let block_mm_infos = block_mm_infos
.unwrap_or(None)
.or_else(|| extra_keys_to_block_mm_infos(extra_keys.unwrap_or(None)));
Ok(RawKvEvent::BlockStored {
block_hashes,
parent_block_hash: parent_block_hash.unwrap_or(None),
token_ids,
block_size,
medium: medium.unwrap_or(None),
lora_name: lora_name.unwrap_or(None),
block_mm_infos,
})
}
Some("BlockRemoved") => {
let block_hashes =
block_hashes.ok_or_else(|| de::Error::missing_field("block_hashes"))?;
Ok(RawKvEvent::BlockRemoved {
block_hashes,
medium: medium.unwrap_or(None),
})
}
Some("AllBlocksCleared") => Ok(RawKvEvent::AllBlocksCleared),
Some(other) => Err(de::Error::unknown_variant(
other,
&["BlockStored", "BlockRemoved", "AllBlocksCleared"],
)),
None => Err(de::Error::missing_field("type")),
}
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
let tag: Option<String> = seq.next_element()?;
let Some(tag) = tag else {
return Err(de::Error::invalid_length(
0,
&"sequence must start with event tag",
));
};
match tag.as_str() {
"BlockStored" => {
let block_hashes: Vec<BlockHashValue> = seq
.next_element()?
.ok_or_else(|| de::Error::invalid_length(1, &"missing block_hashes"))?;
let parent_block_hash: Option<BlockHashValue> = seq.next_element()?.unwrap_or(None);
let token_ids: Vec<u32> = seq
.next_element()?
.ok_or_else(|| de::Error::invalid_length(3, &"missing token_ids"))?;
let block_size: usize = seq
.next_element()?
.ok_or_else(|| de::Error::invalid_length(4, &"missing block_size"))?;
// Position 5 was lora_id in older formats; consume and discard for compat
let _lora_id: Option<u64> = seq.next_element()?.unwrap_or(None);
let medium: Option<String> = seq.next_element()?.unwrap_or(None);
let lora_name: Option<String> = seq.next_element()?.unwrap_or(None);
let extra_keys: Option<Vec<Option<Vec<String>>>> =
seq.next_element()?.unwrap_or(None);
let block_mm_infos: Option<Vec<Option<BlockExtraInfo>>> =
seq.next_element()?.unwrap_or(None);
while seq.next_element::<IgnoredAny>()?.is_some() {}
let block_mm_infos =
block_mm_infos.or_else(|| extra_keys_to_block_mm_infos(extra_keys));
Ok(RawKvEvent::BlockStored {
block_hashes,
parent_block_hash,
token_ids,
block_size,
medium,
lora_name,
block_mm_infos,
})
}
"BlockRemoved" => {
let block_hashes: Vec<BlockHashValue> = seq
.next_element()?
.ok_or_else(|| de::Error::invalid_length(1, &"missing block_hashes"))?;
let medium: Option<String> = seq.next_element()?.unwrap_or(None);
while seq.next_element::<IgnoredAny>()?.is_some() {}
Ok(RawKvEvent::BlockRemoved {
block_hashes,
medium,
})
}
"AllBlocksCleared" => {
while seq.next_element::<IgnoredAny>()?.is_some() {}
Ok(RawKvEvent::AllBlocksCleared)
}
other => Err(de::Error::unknown_variant(
other,
&["BlockStored", "BlockRemoved", "AllBlocksCleared"],
)),
}
}
}
// -------------------------------------------------------------------------
// Event conversion --------------------------------------------------------
// -------------------------------------------------------------------------
/// Convert a raw event coming from the ZMQ channel into the internal
/// [`KvCacheEvent`] representation used by the router.
pub fn convert_event(
raw: RawKvEvent,
event_id: u64,
kv_block_size: u32,
dp_rank: u32,
warning_count: &Arc<AtomicU32>,
) -> KvCacheEvent {
match raw {
RawKvEvent::BlockStored {
block_hashes,
parent_block_hash,
token_ids,
block_size,
lora_name,
block_mm_infos,
medium: _,
} => {
// Reject self-referencing blocks: all block hashes (including parent) must be unique.
{
let mut seen = HashSet::with_capacity(block_hashes.len() + 1);
if let Some(parent) = parent_block_hash {
seen.insert(parent.into_u64());
}
let has_duplicate = block_hashes.iter().any(|h| !seen.insert(h.into_u64()));
if has_duplicate {
tracing::warn!(
event_id,
"Self-referencing block detected: duplicate hash in store event; dropping"
);
return KvCacheEvent {
event_id,
data: KvCacheEventData::Cleared,
dp_rank,
};
}
}
let num_block_tokens = vec![block_size as u64; block_hashes.len()];
let block_hashes_u64: Vec<u64> = block_hashes
.into_iter()
.map(BlockHashValue::into_u64)
.collect();
KvCacheEvent {
event_id,
data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: parent_block_hash
.map(BlockHashValue::into_u64)
.map(ExternalSequenceBlockHash::from),
blocks: create_stored_blocks(
kv_block_size,
&token_ids,
&num_block_tokens,
&block_hashes_u64,
lora_name.as_deref(),
warning_count,
block_mm_infos.as_deref(),
),
}),
dp_rank,
}
}
RawKvEvent::BlockRemoved { block_hashes, .. } => {
let hashes = block_hashes
.into_iter()
.map(BlockHashValue::into_u64)
.map(ExternalSequenceBlockHash::from)
.collect();
KvCacheEvent {
event_id,
data: KvCacheEventData::Removed(KvCacheRemoveData {
block_hashes: hashes,
}),
dp_rank,
}
}
RawKvEvent::AllBlocksCleared => KvCacheEvent {
event_id,
data: KvCacheEventData::Cleared,
dp_rank,
},
}
}
pub fn create_stored_block_from_parts(
kv_block_size: u32,
block_hash: u64,
token_ids: &[u32],
lora_name: Option<&str>,
mm_extra_info: Option<BlockExtraInfo>,
) -> KvCacheStoredBlockData {
let block_mm_infos = mm_extra_info.as_ref().map(|info| vec![Some(info.clone())]);
let tokens_hash = compute_block_hash_for_seq(
token_ids,
kv_block_size,
block_mm_infos.as_deref(),
lora_name,
)[0];
tracing::trace!(
"Creating stored block: external_block_hash={}, tokens_hash={}, token_ids={:?}, kv_block_size={}, mm_extra_info={:?}",
block_hash,
tokens_hash.0,
token_ids,
kv_block_size,
mm_extra_info
);
KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash::from(block_hash),
tokens_hash,
mm_extra_info,
}
}
pub fn create_stored_blocks(
kv_block_size: u32,
token_ids: &[u32],
num_block_tokens: &[u64],
block_hashes: &[u64],
lora_name: Option<&str>,
warning_count: &Arc<AtomicU32>,
block_mm_infos: Option<&[Option<BlockExtraInfo>]>,
) -> Vec<KvCacheStoredBlockData> {
let mut blocks: Vec<KvCacheStoredBlockData> = Vec::new();
let mut token_offset: usize = 0;
for (block_idx, (num_tokens_it, block_hash_it)) in
num_block_tokens.iter().zip(block_hashes.iter()).enumerate()
{
if *num_tokens_it != kv_block_size as u64 {
if warning_count.fetch_add(1, Ordering::Relaxed) < 3 {
tracing::warn!(
"Block not published. Block size must be {} tokens to be published. Block size is: {}",
kv_block_size,
*num_tokens_it
);
}
break;
}
let tokens = &token_ids[token_offset..(token_offset + *num_tokens_it as usize)];
let mm_extra_info = block_mm_infos
.and_then(|infos| infos.get(block_idx))
.and_then(|opt| opt.clone());
blocks.push(create_stored_block_from_parts(
kv_block_size,
*block_hash_it,
tokens,
lora_name,
mm_extra_info,
));
token_offset += *num_tokens_it as usize;
}
blocks
}
...@@ -31,7 +31,6 @@ pub use dynamo_kv_router::selector; ...@@ -31,7 +31,6 @@ pub use dynamo_kv_router::selector;
pub mod cache_control; pub mod cache_control;
pub mod config; pub mod config;
pub mod indexer_standalone;
mod jetstream; mod jetstream;
pub mod metrics; pub mod metrics;
pub mod prefill_router; pub mod prefill_router;
...@@ -46,7 +45,6 @@ pub mod worker_query; ...@@ -46,7 +45,6 @@ pub mod worker_query;
pub use cache_control::{CacheControlClient, spawn_pin_prefix}; pub use cache_control::{CacheControlClient, spawn_pin_prefix};
pub use config::{KvRouterConfig, RouterConfigOverride}; pub use config::{KvRouterConfig, RouterConfigOverride};
pub use indexer_standalone::start_kv_block_indexer;
pub use prefill_router::PrefillRouter; pub use prefill_router::PrefillRouter;
pub use push_router::{DirectRoutingRouter, KvPushRouter}; pub use push_router::{DirectRoutingRouter, KvPushRouter};
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use anyhow::Result;
use serde::{Deserialize, Serialize};
use dynamo_runtime::{
component::Component,
error::DynamoError,
pipeline::{
AsyncEngine, AsyncEngineContextProvider, ManyOut, ResponseStream, SingleIn, async_trait,
network::Ingress,
},
protocols::{annotated::Annotated, maybe_error::MaybeError},
stream,
};
use crate::kv_router::{
Indexer, KV_INDEXER_QUERY_ENDPOINT, KvRouterConfig,
protocols::{
BlockExtraInfo, LocalBlockHash, OverlapScores, RouterEvent, compute_block_hash_for_seq,
},
subscriber,
};
#[derive(Serialize, Deserialize, Debug)]
pub enum IndexerQueryRequest {
FindMatchesHashed {
block_hashes: Vec<LocalBlockHash>,
},
FindMatchesTokens {
tokens: Vec<u32>,
block_mm_infos: Option<Vec<Option<BlockExtraInfo>>>,
lora_name: Option<String>,
},
DumpTree,
}
#[derive(Serialize, Deserialize, Debug)]
pub enum IndexerQueryResponse {
Matches(OverlapScores),
TreeDump(Vec<RouterEvent>),
Error(String),
}
impl MaybeError for IndexerQueryResponse {
fn from_err(err: impl std::error::Error + 'static) -> Self {
IndexerQueryResponse::Error(err.to_string())
}
fn err(&self) -> Option<DynamoError> {
match self {
IndexerQueryResponse::Error(msg) => Some(DynamoError::msg(msg.clone())),
_ => None,
}
}
}
struct IndexerQueryEngine {
indexer: Indexer,
block_size: u32,
}
#[async_trait]
impl
AsyncEngine<
SingleIn<IndexerQueryRequest>,
ManyOut<Annotated<IndexerQueryResponse>>,
anyhow::Error,
> for IndexerQueryEngine
{
async fn generate(
&self,
request: SingleIn<IndexerQueryRequest>,
) -> Result<ManyOut<Annotated<IndexerQueryResponse>>> {
let (request, ctx) = request.into_parts();
if matches!(request, IndexerQueryRequest::DumpTree) {
let response = match self.indexer.dump_events().await {
Ok(events) => IndexerQueryResponse::TreeDump(events),
Err(e) => IndexerQueryResponse::Error(format!("{e:?}")),
};
return Ok(ResponseStream::new(
Box::pin(stream::iter(vec![Annotated::from_data(response)])),
ctx.context(),
));
}
let block_hashes = match request {
IndexerQueryRequest::FindMatchesHashed { block_hashes } => block_hashes,
IndexerQueryRequest::FindMatchesTokens {
tokens,
block_mm_infos,
lora_name,
} => compute_block_hash_for_seq(
&tokens,
self.block_size,
block_mm_infos.as_deref(),
lora_name.as_deref(),
),
IndexerQueryRequest::DumpTree => unreachable!(),
};
let response = match self.indexer.find_matches(block_hashes).await {
Ok(scores) => IndexerQueryResponse::Matches(scores),
Err(e) => IndexerQueryResponse::Error(format!("{e:?}")),
};
Ok(ResponseStream::new(
Box::pin(stream::iter(vec![Annotated::from_data(response)])),
ctx.context(),
))
}
}
async fn start_indexer_query_endpoint(
component: Component,
indexer: Indexer,
block_size: u32,
) -> Result<()> {
let engine = std::sync::Arc::new(IndexerQueryEngine {
indexer,
block_size,
});
let ingress = Ingress::for_engine(engine)?;
let fut = component
.endpoint(KV_INDEXER_QUERY_ENDPOINT)
.endpoint_builder()
.handler(ingress)
.graceful_shutdown(true)
.start();
tokio::spawn(async move {
if let Err(e) = fut.await {
tracing::error!("Indexer query endpoint failed: {e:?}");
}
});
Ok(())
}
pub async fn start_kv_block_indexer(
component: &Component,
kv_router_config: &KvRouterConfig,
block_size: u32,
) -> Result<Indexer> {
if kv_router_config.durable_kv_events {
anyhow::bail!(
"standalone indexer does not support durable_kv_events (JetStream): \
consumer ID collisions, orphan cleanup conflicts, and snapshot/purge races \
make it incompatible with an independent indexer"
);
}
let indexer = Indexer::new(component, kv_router_config, block_size);
subscriber::start_subscriber(component.clone(), kv_router_config, indexer.clone()).await?;
start_indexer_query_endpoint(component.clone(), indexer.clone(), block_size).await?;
tracing::info!(
"Standalone KV indexer started with query endpoint '{KV_INDEXER_QUERY_ENDPOINT}'"
);
Ok(indexer)
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use std::collections::HashSet;
use std::fmt;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
...@@ -10,9 +8,6 @@ use std::time::{Duration, Instant}; ...@@ -10,9 +8,6 @@ use std::time::{Duration, Instant};
use anyhow::Result; use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use rmp_serde as rmps; use rmp_serde as rmps;
use serde::Deserialize;
use serde::Serialize;
use serde::de::{self, Deserializer, IgnoredAny, MapAccess, SeqAccess, Visitor};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use zeromq::{Socket, SocketRecv, SubSocket}; use zeromq::{Socket, SocketRecv, SubSocket};
...@@ -39,6 +34,9 @@ fn create_kv_stream_name(component: &Component, subject: &str) -> String { ...@@ -39,6 +34,9 @@ fn create_kv_stream_name(component: &Component, subject: &str) -> String {
.replace("_", "-") .replace("_", "-")
} }
pub use dynamo_kv_router::zmq_wire::create_stored_blocks;
use dynamo_kv_router::zmq_wire::*;
use crate::kv_router::{ use crate::kv_router::{
KV_EVENT_SUBJECT, KV_METRICS_SUBJECT, WORKER_KV_INDEXER_BUFFER_SIZE, KV_EVENT_SUBJECT, KV_METRICS_SUBJECT, WORKER_KV_INDEXER_BUFFER_SIZE,
indexer::{KvIndexerMetrics, LocalKvIndexer}, indexer::{KvIndexerMetrics, LocalKvIndexer},
...@@ -756,471 +754,6 @@ pub async fn start_zmq_listener( ...@@ -756,471 +754,6 @@ pub async fn start_zmq_listener(
); );
} }
/// Convert a raw event coming from the ZMQ channel into the internal
/// [`KvCacheEvent`] representation used by the router.
fn convert_event(
raw: RawKvEvent,
event_id: u64,
kv_block_size: u32,
dp_rank: u32,
warning_count: &Arc<AtomicU32>,
) -> KvCacheEvent {
match raw {
RawKvEvent::BlockStored {
block_hashes,
parent_block_hash,
token_ids,
block_size,
lora_name,
block_mm_infos,
medium: _,
} => {
// Reject self-referencing blocks: all block hashes (including parent) must be unique.
{
let mut seen = HashSet::with_capacity(block_hashes.len() + 1);
if let Some(parent) = parent_block_hash {
seen.insert(parent.into_u64());
}
let has_duplicate = block_hashes.iter().any(|h| !seen.insert(h.into_u64()));
if has_duplicate {
tracing::warn!(
event_id,
"Self-referencing block detected: duplicate hash in store event; dropping"
);
return KvCacheEvent {
event_id,
data: KvCacheEventData::Cleared,
dp_rank,
};
}
}
let num_block_tokens = vec![block_size as u64; block_hashes.len()];
let block_hashes_u64: Vec<u64> = block_hashes
.into_iter()
.map(BlockHashValue::into_u64)
.collect();
KvCacheEvent {
event_id,
data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: parent_block_hash
.map(BlockHashValue::into_u64)
.map(ExternalSequenceBlockHash::from),
blocks: create_stored_blocks(
kv_block_size,
&token_ids,
&num_block_tokens,
&block_hashes_u64,
lora_name.as_deref(),
warning_count,
block_mm_infos.as_deref(),
),
}),
dp_rank,
}
}
RawKvEvent::BlockRemoved { block_hashes, .. } => {
let hashes = block_hashes
.into_iter()
.map(BlockHashValue::into_u64)
.map(ExternalSequenceBlockHash::from)
.collect();
KvCacheEvent {
event_id,
data: KvCacheEventData::Removed(KvCacheRemoveData {
block_hashes: hashes,
}),
dp_rank,
}
}
RawKvEvent::AllBlocksCleared => KvCacheEvent {
event_id,
data: KvCacheEventData::Cleared,
dp_rank,
},
}
}
pub fn create_stored_block_from_parts(
kv_block_size: u32,
block_hash: u64,
token_ids: &[u32],
lora_name: Option<&str>,
mm_extra_info: Option<BlockExtraInfo>,
) -> KvCacheStoredBlockData {
let block_mm_infos = mm_extra_info.as_ref().map(|info| vec![Some(info.clone())]);
let tokens_hash = compute_block_hash_for_seq(
token_ids,
kv_block_size,
block_mm_infos.as_deref(),
lora_name,
)[0];
tracing::trace!(
"Creating stored block: external_block_hash={}, tokens_hash={}, token_ids={:?}, kv_block_size={}, mm_extra_info={:?}",
block_hash,
tokens_hash.0,
token_ids,
kv_block_size,
mm_extra_info
);
KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash::from(block_hash),
tokens_hash,
mm_extra_info,
}
}
pub fn create_stored_blocks(
kv_block_size: u32,
token_ids: &[u32],
num_block_tokens: &[u64],
block_hashes: &[u64],
lora_name: Option<&str>,
warning_count: &Arc<AtomicU32>,
block_mm_infos: Option<&[Option<BlockExtraInfo>]>,
) -> Vec<KvCacheStoredBlockData> {
let mut blocks: Vec<KvCacheStoredBlockData> = Vec::new();
let mut token_offset: usize = 0;
for (block_idx, (num_tokens_it, block_hash_it)) in
num_block_tokens.iter().zip(block_hashes.iter()).enumerate()
{
if *num_tokens_it != kv_block_size as u64 {
if warning_count.fetch_add(1, Ordering::Relaxed) < 3 {
tracing::warn!(
"Block not published. Block size must be {} tokens to be published. Block size is: {}",
kv_block_size,
*num_tokens_it
);
}
break;
}
let tokens = &token_ids[token_offset..(token_offset + *num_tokens_it as usize)];
let mm_extra_info = block_mm_infos
.and_then(|infos| infos.get(block_idx))
.and_then(|opt| opt.clone());
blocks.push(create_stored_block_from_parts(
kv_block_size,
*block_hash_it,
tokens,
lora_name,
mm_extra_info,
));
token_offset += *num_tokens_it as usize;
}
blocks
}
// -------------------------------------------------------------------------
// Types mirroring the Python msgspec-defined structures -------------------
// -------------------------------------------------------------------------
#[derive(Debug, Serialize)]
struct KvEventBatch {
ts: f64,
events: Vec<RawKvEvent>,
#[serde(alias = "dp_rank")]
data_parallel_rank: Option<i32>,
}
impl<'de> Deserialize<'de> for KvEventBatch {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
// Deserialize from array format: [timestamp, [events], data_parallel_rank]
let arr: (f64, Vec<RawKvEvent>, Option<i32>) = Deserialize::deserialize(deserializer)?;
Ok(KvEventBatch {
ts: arr.0,
events: arr.1,
data_parallel_rank: arr.2,
})
}
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
#[serde(untagged)]
enum BlockHashValue {
Signed(i64),
Unsigned(u64),
}
impl BlockHashValue {
fn into_u64(self) -> u64 {
match self {
BlockHashValue::Signed(v) => v as u64,
BlockHashValue::Unsigned(v) => v,
}
}
}
#[derive(Debug, Serialize, Clone)]
#[serde(tag = "type")] // msgspec encodes variant tag as a string when `tag=True`
enum RawKvEvent {
BlockStored {
/// Block hashes may be emitted as either signed or unsigned 64-bit values.
/// We normalize them to `u64` while deserializing to support both producers.
block_hashes: Vec<BlockHashValue>,
parent_block_hash: Option<BlockHashValue>,
token_ids: Vec<u32>,
block_size: usize,
#[serde(skip_serializing_if = "Option::is_none")]
medium: Option<String>,
/// LoRA adapter name for adapter-aware block hashing
#[serde(default, skip_serializing_if = "Option::is_none")]
lora_name: Option<String>,
/// Multimodal extra info for each block (length should match block_hashes)
#[serde(default, skip_serializing_if = "Option::is_none")]
block_mm_infos: Option<Vec<Option<BlockExtraInfo>>>,
},
BlockRemoved {
block_hashes: Vec<BlockHashValue>,
#[serde(skip_serializing_if = "Option::is_none")]
medium: Option<String>,
},
AllBlocksCleared,
}
/// Parse MM hash from extra_keys string:
/// - Only accept canonical vLLM MM identifiers (64-char hex digest)
/// - Convert by taking the first 16 hex chars as u64
fn parse_mm_hash_from_extra_key(s: &str) -> Option<u64> {
// extra_keys mixes MM identifiers with LoRA/cache_salt/prompt-embed metadata.
// Only MM identifiers should be mapped into BlockExtraInfo.
if s.len() == 64 && s.chars().all(|c| c.is_ascii_hexdigit()) {
return u64::from_str_radix(&s[..16], 16).ok();
}
None
}
/// Convert vLLM BlockStored extra_keys to block-level MM infos.
/// extra_keys is a list aligned with blocks:
/// - None => no MM content in that block
/// - ["hash1", "hash2", ...] => one or more MM objects in that block
fn extra_keys_to_block_mm_infos(
extra_keys: Option<Vec<Option<Vec<String>>>>,
) -> Option<Vec<Option<BlockExtraInfo>>> {
let extra_keys = extra_keys?;
if extra_keys.is_empty() {
return None;
}
let infos: Vec<Option<BlockExtraInfo>> = extra_keys
.into_iter()
.map(|block_keys| {
let mm_objects: Vec<BlockMmObjectInfo> = block_keys
.unwrap_or_default()
.iter()
.filter_map(|key| parse_mm_hash_from_extra_key(key))
.map(|mm_hash| BlockMmObjectInfo {
mm_hash,
offsets: vec![], // extra_keys does not carry offsets today
})
.collect();
if mm_objects.is_empty() {
None
} else {
Some(BlockExtraInfo { mm_objects })
}
})
.collect();
if infos.iter().all(|i| i.is_none()) {
return None;
}
Some(infos)
}
/// Our producers use msgspec with `tag=True` and `array_like=True`, which
/// encodes each event as either a tagged map or a tagged tuple. To be tolerant of
/// additional fields that may be appended in the future, we implement a custom
/// deserializer that ignores unknown keys and any extra positional elements.
///
/// This keeps us compatible with older payloads while safely
/// accepting newer ones that include extra metadata.
impl<'de> Deserialize<'de> for RawKvEvent {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_any(RawKvEventVisitor)
}
}
struct RawKvEventVisitor;
impl<'de> Visitor<'de> for RawKvEventVisitor {
type Value = RawKvEvent;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a kv event encoded as a tagged map or sequence")
}
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: MapAccess<'de>,
{
let mut event_type: Option<String> = None;
let mut block_hashes: Option<Vec<BlockHashValue>> = None;
let mut parent_block_hash: Option<Option<BlockHashValue>> = None;
let mut token_ids: Option<Vec<u32>> = None;
let mut block_size: Option<usize> = None;
let mut medium: Option<Option<String>> = None;
let mut lora_name: Option<Option<String>> = None;
let mut extra_keys: Option<Option<Vec<Option<Vec<String>>>>> = None;
let mut block_mm_infos: Option<Option<Vec<Option<BlockExtraInfo>>>> = None;
while let Some(key) = map.next_key::<String>()? {
match key.as_str() {
"type" => {
event_type = Some(map.next_value()?);
}
"block_hashes" => {
block_hashes = Some(map.next_value()?);
}
"parent_block_hash" => {
parent_block_hash = Some(map.next_value()?);
}
"token_ids" => {
token_ids = Some(map.next_value()?);
}
"block_size" => {
block_size = Some(map.next_value()?);
}
"medium" => {
medium = Some(map.next_value()?);
}
"lora_name" => {
lora_name = Some(map.next_value()?);
}
"extra_keys" => {
extra_keys = Some(map.next_value()?);
}
"block_mm_infos" => {
block_mm_infos = Some(map.next_value()?);
}
_ => {
map.next_value::<IgnoredAny>()?;
}
}
}
match event_type.as_deref() {
Some("BlockStored") => {
let block_hashes =
block_hashes.ok_or_else(|| de::Error::missing_field("block_hashes"))?;
let token_ids = token_ids.ok_or_else(|| de::Error::missing_field("token_ids"))?;
let block_size =
block_size.ok_or_else(|| de::Error::missing_field("block_size"))?;
let block_mm_infos = block_mm_infos
.unwrap_or(None)
.or_else(|| extra_keys_to_block_mm_infos(extra_keys.unwrap_or(None)));
Ok(RawKvEvent::BlockStored {
block_hashes,
parent_block_hash: parent_block_hash.unwrap_or(None),
token_ids,
block_size,
medium: medium.unwrap_or(None),
lora_name: lora_name.unwrap_or(None),
block_mm_infos,
})
}
Some("BlockRemoved") => {
let block_hashes =
block_hashes.ok_or_else(|| de::Error::missing_field("block_hashes"))?;
Ok(RawKvEvent::BlockRemoved {
block_hashes,
medium: medium.unwrap_or(None),
})
}
Some("AllBlocksCleared") => Ok(RawKvEvent::AllBlocksCleared),
Some(other) => Err(de::Error::unknown_variant(
other,
&["BlockStored", "BlockRemoved", "AllBlocksCleared"],
)),
None => Err(de::Error::missing_field("type")),
}
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
let tag: Option<String> = seq.next_element()?;
let Some(tag) = tag else {
return Err(de::Error::invalid_length(
0,
&"sequence must start with event tag",
));
};
match tag.as_str() {
"BlockStored" => {
let block_hashes: Vec<BlockHashValue> = seq
.next_element()?
.ok_or_else(|| de::Error::invalid_length(1, &"missing block_hashes"))?;
let parent_block_hash: Option<BlockHashValue> = seq.next_element()?.unwrap_or(None);
let token_ids: Vec<u32> = seq
.next_element()?
.ok_or_else(|| de::Error::invalid_length(3, &"missing token_ids"))?;
let block_size: usize = seq
.next_element()?
.ok_or_else(|| de::Error::invalid_length(4, &"missing block_size"))?;
// Position 5 was lora_id in older formats; consume and discard for compat
let _lora_id: Option<u64> = seq.next_element()?.unwrap_or(None);
let medium: Option<String> = seq.next_element()?.unwrap_or(None);
let lora_name: Option<String> = seq.next_element()?.unwrap_or(None);
let extra_keys: Option<Vec<Option<Vec<String>>>> =
seq.next_element()?.unwrap_or(None);
let block_mm_infos: Option<Vec<Option<BlockExtraInfo>>> =
seq.next_element()?.unwrap_or(None);
while seq.next_element::<IgnoredAny>()?.is_some() {}
let block_mm_infos =
block_mm_infos.or_else(|| extra_keys_to_block_mm_infos(extra_keys));
Ok(RawKvEvent::BlockStored {
block_hashes,
parent_block_hash,
token_ids,
block_size,
medium,
lora_name,
block_mm_infos,
})
}
"BlockRemoved" => {
let block_hashes: Vec<BlockHashValue> = seq
.next_element()?
.ok_or_else(|| de::Error::invalid_length(1, &"missing block_hashes"))?;
let medium: Option<String> = seq.next_element()?.unwrap_or(None);
while seq.next_element::<IgnoredAny>()?.is_some() {}
Ok(RawKvEvent::BlockRemoved {
block_hashes,
medium,
})
}
"AllBlocksCleared" => {
while seq.next_element::<IgnoredAny>()?.is_some() {}
Ok(RawKvEvent::AllBlocksCleared)
}
other => Err(de::Error::unknown_variant(
other,
&["BlockStored", "BlockRemoved", "AllBlocksCleared"],
)),
}
}
}
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
// Metrics Publishers ------------------------------------------------------ // Metrics Publishers ------------------------------------------------------
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment