Unverified Commit 649d9b7d authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

chore: make core kv-router its own crate (#5677)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 63bee8b3
...@@ -2288,6 +2288,29 @@ dependencies = [ ...@@ -2288,6 +2288,29 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "dynamo-kv-router"
version = "0.9.0"
dependencies = [
"anyhow",
"async-trait",
"clap 4.5.53",
"dynamo-runtime",
"dynamo-tokens",
"indicatif 0.18.3",
"prometheus",
"rand 0.9.2",
"rstest 0.18.2",
"rstest_reuse",
"serde",
"serde_json",
"thiserror 2.0.17",
"tokio",
"tokio-util",
"tracing",
"xxhash-rust",
]
[[package]] [[package]]
name = "dynamo-llm" name = "dynamo-llm"
version = "0.9.0" version = "0.9.0"
...@@ -2322,9 +2345,11 @@ dependencies = [ ...@@ -2322,9 +2345,11 @@ dependencies = [
"derive_builder", "derive_builder",
"dialoguer", "dialoguer",
"dynamo-async-openai", "dynamo-async-openai",
"dynamo-kv-router",
"dynamo-memory", "dynamo-memory",
"dynamo-parsers", "dynamo-parsers",
"dynamo-runtime", "dynamo-runtime",
"dynamo-tokens",
"either", "either",
"erased-serde", "erased-serde",
"etcd-client", "etcd-client",
...@@ -2554,6 +2579,7 @@ dependencies = [ ...@@ -2554,6 +2579,7 @@ dependencies = [
"derive-getters", "derive-getters",
"serde", "serde",
"thiserror 2.0.17", "thiserror 2.0.17",
"uuid 1.18.1",
"xxhash-rust", "xxhash-rust",
] ]
......
...@@ -8,6 +8,7 @@ members = [ ...@@ -8,6 +8,7 @@ members = [
"lib/runtime", "lib/runtime",
"lib/config", "lib/config",
"lib/tokens", "lib/tokens",
"lib/kv-router",
"lib/memory", "lib/memory",
"lib/async-openai", "lib/async-openai",
"lib/parsers", "lib/parsers",
...@@ -47,6 +48,7 @@ dynamo-runtime = { path = "lib/runtime", version = "0.9.0" } ...@@ -47,6 +48,7 @@ dynamo-runtime = { path = "lib/runtime", version = "0.9.0" }
dynamo-llm = { path = "lib/llm", version = "0.9.0" } dynamo-llm = { path = "lib/llm", version = "0.9.0" }
dynamo-config = { path = "lib/config", version = "0.9.0" } dynamo-config = { path = "lib/config", version = "0.9.0" }
dynamo-tokens = { path = "lib/tokens", version = "0.9.0" } dynamo-tokens = { path = "lib/tokens", version = "0.9.0" }
dynamo-kv-router = { path = "lib/kv-router", version = "0.9.0", features = ["metrics"] }
dynamo-async-openai = { path = "lib/async-openai", version = "0.9.0", features = [ dynamo-async-openai = { path = "lib/async-openai", version = "0.9.0", features = [
"byot", "byot",
"rustls", "rustls",
......
# SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
[package]
name = "dynamo-kv-router"
description = "KV Router - Radix tree for LLM KV cache routing"
version.workspace = true
edition.workspace = true
authors.workspace = true
license.workspace = true
repository.workspace = true
[features]
default = []
metrics = ["dep:dynamo-runtime"]
bench = ["dep:clap", "dep:indicatif"]
[dependencies]
# repo
dynamo-runtime = { workspace = true, optional = true }
dynamo-tokens = { workspace = true }
# workspace
anyhow = { workspace = true }
async-trait = { workspace = true }
prometheus = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
tracing = { workspace = true }
xxhash-rust = { workspace = true }
# bench (optional)
clap = { version = "4.5", features = ["derive"], optional = true }
indicatif = { version = "0.18.0", optional = true }
[dev-dependencies]
rstest = "0.18.2"
rstest_reuse = "0.7.0"
serde_json = { workspace = true }
tokio = { workspace = true, features = ["rt", "macros", "time"] }
[[bench]]
name = "radix_tree_microbench"
harness = false
required-features = ["bench"]
...@@ -11,15 +11,15 @@ ...@@ -11,15 +11,15 @@
//! Size is defined as total (worker, block) pairs in the tree. //! Size is defined as total (worker, block) pairs in the tree.
//! Depth is the number of blocks per sequence (depth = (isl + osl) / block_size). //! Depth is the number of blocks per sequence (depth = (isl + osl) / block_size).
//! //!
//! Run with: cargo run --package dynamo-llm --bin radix_tree_microbench --features kv-router-stress -- --help //! Run with: cargo bench --package dynamo-kv-router --bench radix_tree_microbench --features bench -- --help
use clap::{Parser, ValueEnum}; use clap::{Parser, ValueEnum};
use dynamo_llm::kv_router::{ use dynamo_kv_router::{
compute_block_hash_for_seq,
indexer::{RadixTree, RouterEvent}, indexer::{RadixTree, RouterEvent},
protocols::{ protocols::{
ExternalSequenceBlockHash, KvCacheEvent, KvCacheEventData, KvCacheRemoveData, ExternalSequenceBlockHash, KvCacheEvent, KvCacheEventData, KvCacheRemoveData,
KvCacheStoreData, KvCacheStoredBlockData, LocalBlockHash, WorkerId, KvCacheStoreData, KvCacheStoredBlockData, LocalBlockHash, WorkerId,
compute_block_hash_for_seq,
}, },
}; };
use rand::rngs::StdRng; use rand::rngs::StdRng;
......
...@@ -12,8 +12,8 @@ use std::collections::{BinaryHeap, HashMap}; ...@@ -12,8 +12,8 @@ use std::collections::{BinaryHeap, HashMap};
use std::hash::Hash; use std::hash::Hash;
use tokio::time::{Duration, Instant}; use tokio::time::{Duration, Instant};
use crate::kv_router::indexer::KvRouterError; use crate::indexer::KvRouterError;
use crate::kv_router::protocols::{ExternalSequenceBlockHash, WorkerWithDpRank}; use crate::protocols::{ExternalSequenceBlockHash, WorkerWithDpRank};
/// Block entry to be inserted in the [`PruneManager::expirations`] heap. /// Block entry to be inserted in the [`PruneManager::expirations`] heap.
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
...@@ -222,8 +222,8 @@ impl<K: Clone + Hash + Eq + Ord> PruneManager<K> { ...@@ -222,8 +222,8 @@ impl<K: Clone + Hash + Eq + Ord> PruneManager<K> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::kv_router::indexer::{KvIndexer, KvIndexerInterface, KvIndexerMetrics}; use crate::indexer::{KvIndexer, KvIndexerInterface, KvIndexerMetrics};
use crate::kv_router::protocols::{TokensWithHashes, WorkerId, WorkerWithDpRank}; use crate::protocols::{TokensWithHashes, WorkerId, WorkerWithDpRank};
use std::sync::Arc; use std::sync::Arc;
use tokio::time::{self, Duration, Instant}; use tokio::time::{self, Duration, Instant};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
......
...@@ -32,28 +32,42 @@ ...@@ -32,28 +32,42 @@
//! This module provides a scalable and efficient way to manage and retrieve data blocks for LLM inference, leveraging a global KV cache to optimize performance. //! This module provides a scalable and efficient way to manage and retrieve data blocks for LLM inference, leveraging a global KV cache to optimize performance.
use async_trait::async_trait; use async_trait::async_trait;
#[cfg(feature = "metrics")]
pub use dynamo_runtime::protocols::maybe_error::MaybeError;
#[cfg(feature = "metrics")]
use dynamo_runtime::{ use dynamo_runtime::{
component::Component, component::Component,
metrics::{MetricsHierarchy, prometheus_names::kvrouter}, metrics::{MetricsHierarchy, prometheus_names::kvrouter},
protocols::maybe_error::MaybeError,
}; };
use prometheus::{IntCounterVec, Opts}; use prometheus::{IntCounterVec, Opts};
/// Trait for types that may represent an error response.
/// Used for RPC-style responses that can indicate success or failure.
#[cfg(not(feature = "metrics"))]
pub trait MaybeError {
/// Construct an instance from an error.
fn from_err(err: Box<dyn std::error::Error + Send + Sync>) -> Self;
/// Convert to an error instance if this represents an error.
fn err(&self) -> Option<anyhow::Error>;
}
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[cfg(feature = "metrics")]
use std::sync::OnceLock;
use std::{ use std::{
cell::RefCell, cell::RefCell,
collections::{HashMap, HashSet, VecDeque}, collections::{HashMap, HashSet, VecDeque},
iter, iter,
rc::Rc, rc::Rc,
sync::{Arc, Mutex, OnceLock}, sync::{Arc, Mutex},
thread::JoinHandle, thread::JoinHandle,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use crate::kv_router::approx::{BlockEntry, PruneConfig, PruneManager}; use crate::approx::{BlockEntry, PruneConfig, PruneManager};
use crate::kv_router::protocols::*; use crate::protocols::*;
use crate::tokens::SequenceHash; use dynamo_tokens::SequenceHash;
/// Errors that can occur in the KV Router. /// Errors that can occur in the KV Router.
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
...@@ -622,9 +636,14 @@ pub const METRIC_EVENT_STORED: &str = "stored"; ...@@ -622,9 +636,14 @@ pub const METRIC_EVENT_STORED: &str = "stored";
pub const METRIC_EVENT_REMOVED: &str = "removed"; pub const METRIC_EVENT_REMOVED: &str = "removed";
pub const METRIC_EVENT_CLEARED: &str = "cleared"; pub const METRIC_EVENT_CLEARED: &str = "cleared";
/// Metric name for KV cache events applied counter.
const KV_CACHE_EVENTS_APPLIED_NAME: &str = "dynamo_kvrouter_kv_cache_events_applied";
#[cfg(feature = "metrics")]
static KV_INDEXER_METRICS: OnceLock<Arc<KvIndexerMetrics>> = OnceLock::new(); static KV_INDEXER_METRICS: OnceLock<Arc<KvIndexerMetrics>> = OnceLock::new();
impl KvIndexerMetrics { impl KvIndexerMetrics {
#[cfg(feature = "metrics")]
fn new(kv_cache_events_applied: IntCounterVec) -> Self { fn new(kv_cache_events_applied: IntCounterVec) -> Self {
Self { Self {
kv_cache_events_applied, kv_cache_events_applied,
...@@ -633,6 +652,7 @@ impl KvIndexerMetrics { ...@@ -633,6 +652,7 @@ impl KvIndexerMetrics {
/// Creates a new KvIndexerMetrics from a Component, memoizing the result in /// Creates a new KvIndexerMetrics from a Component, memoizing the result in
/// KV_INDEXER_METRICS to avoid duplicate registration issues. /// KV_INDEXER_METRICS to avoid duplicate registration issues.
#[cfg(feature = "metrics")]
pub fn from_component(component: &Component) -> Arc<Self> { pub fn from_component(component: &Component) -> Arc<Self> {
KV_INDEXER_METRICS.get_or_init(|| { KV_INDEXER_METRICS.get_or_init(|| {
match component.metrics().create_intcountervec( match component.metrics().create_intcountervec(
...@@ -656,7 +676,7 @@ impl KvIndexerMetrics { ...@@ -656,7 +676,7 @@ impl KvIndexerMetrics {
Self { Self {
kv_cache_events_applied: IntCounterVec::new( kv_cache_events_applied: IntCounterVec::new(
Opts::new( Opts::new(
kvrouter::KV_CACHE_EVENTS_APPLIED, KV_CACHE_EVENTS_APPLIED_NAME,
"Total number of KV cache events applied to index", "Total number of KV cache events applied to index",
), ),
&["event_type", "status"], &["event_type", "status"],
...@@ -2038,14 +2058,14 @@ impl Drop for KvIndexerSharded { ...@@ -2038,14 +2058,14 @@ impl Drop for KvIndexerSharded {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::kv_router::protocols::{ExternalSequenceBlockHash, LocalBlockHash}; use crate::protocols::{ExternalSequenceBlockHash, LocalBlockHash};
use rstest::rstest; use rstest::rstest;
use rstest_reuse::{self, *}; use rstest_reuse::{self, *};
use tokio::time; use tokio::time;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
fn setup() { fn setup() {
dynamo_runtime::logging::init(); // Logging init removed to avoid dynamo-runtime dependency
} }
fn make_blocks(hashes: Vec<u64>) -> Vec<KvCacheStoredBlockData> { fn make_blocks(hashes: Vec<u64>) -> Vec<KvCacheStoredBlockData> {
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! KV Router - Radix tree data structures for LLM KV cache routing.
//!
//! This crate provides the core radix tree implementation and protocols for
//! efficient KV cache lookup and routing in distributed LLM inference systems.
pub mod approx;
pub mod indexer;
pub mod protocols;
// Re-export key types for convenience
pub use indexer::{MaybeError, RadixTree, RouterEvent};
pub use protocols::{LocalBlockHash, WorkerId, compute_block_hash_for_seq};
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use crate::tokens::{SequenceHash, Token}; use dynamo_tokens::{SequenceHash, Token};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use xxhash_rust::xxh3; use xxhash_rust::xxh3;
......
...@@ -41,6 +41,8 @@ required-features = ["block-manager", "testing-cuda"] ...@@ -41,6 +41,8 @@ required-features = ["block-manager", "testing-cuda"]
[dependencies] [dependencies]
# repo # repo
dynamo-runtime = { workspace = true } dynamo-runtime = { workspace = true }
dynamo-tokens = { workspace = true }
dynamo-kv-router = { workspace = true, features = ["metrics"] }
dynamo-memory = { path = "../memory", optional = true } dynamo-memory = { path = "../memory", optional = true }
# workspace # workspace
...@@ -208,8 +210,3 @@ name = "bench_local_transfer_v2" ...@@ -208,8 +210,3 @@ name = "bench_local_transfer_v2"
path = "bin/bench_local_transfer_v2.rs" path = "bin/bench_local_transfer_v2.rs"
required-features = ["block-manager-bench"] required-features = ["block-manager-bench"]
[[bench]]
name = "radix_tree_microbench"
path = "benches/radix_tree_microbench.rs"
harness = false
required-features = ["kv-router-stress"]
...@@ -23,10 +23,12 @@ use rand::Rng; ...@@ -23,10 +23,12 @@ use rand::Rng;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::json; use serde_json::json;
pub mod approx; // Re-export from dynamo-kv-router crate
pub mod indexer; pub use dynamo_kv_router::approx;
pub use dynamo_kv_router::indexer;
pub use dynamo_kv_router::protocols;
pub mod prefill_router; pub mod prefill_router;
pub mod protocols;
pub mod publisher; pub mod publisher;
pub mod recorder; pub mod recorder;
pub mod scheduler; pub mod scheduler;
......
...@@ -21,7 +21,7 @@ use super::indexer::OverlapScores; ...@@ -21,7 +21,7 @@ use super::indexer::OverlapScores;
use super::protocols::{DpRank, WorkerId, WorkerSelectionResult, WorkerWithDpRank}; use super::protocols::{DpRank, WorkerId, WorkerSelectionResult, WorkerWithDpRank};
use super::sequence::{ActiveSequencesMultiWorker, SequenceError}; use super::sequence::{ActiveSequencesMultiWorker, SequenceError};
use crate::tokens::SequenceHash; use dynamo_tokens::SequenceHash;
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KVHitRateEvent { pub struct KVHitRateEvent {
......
...@@ -23,13 +23,13 @@ ...@@ -23,13 +23,13 @@
//! requests share common prefixes (e.g., system prompts, few-shot examples). //! requests share common prefixes (e.g., system prompts, few-shot examples).
use crate::kv_router::indexer::OverlapScores; use crate::kv_router::indexer::OverlapScores;
use crate::tokens::SequenceHash;
use anyhow::Result; use anyhow::Result;
use dashmap::DashMap; use dashmap::DashMap;
use derive_getters::Getters; use derive_getters::Getters;
use dynamo_runtime::component::Component; use dynamo_runtime::component::Component;
use dynamo_runtime::traits::DistributedRuntimeProvider; use dynamo_runtime::traits::DistributedRuntimeProvider;
use dynamo_runtime::transports::event_plane::{EventPublisher, EventSubscriber}; use dynamo_runtime::transports::event_plane::{EventPublisher, EventSubscriber};
use dynamo_tokens::SequenceHash;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::rc::{Rc, Weak}; use std::rc::{Rc, Weak};
use std::sync::Arc; use std::sync::Arc;
......
...@@ -41,10 +41,10 @@ use crate::kv_router::publisher::KvEventPublisher; ...@@ -41,10 +41,10 @@ use crate::kv_router::publisher::KvEventPublisher;
use crate::mocker::evictor::LRUEvictor; use crate::mocker::evictor::LRUEvictor;
use crate::mocker::protocols::{MoveBlock, PrefillCost}; use crate::mocker::protocols::{MoveBlock, PrefillCost};
use crate::mocker::sequence::ActiveSequence; use crate::mocker::sequence::ActiveSequence;
use crate::tokens::blocks::UniqueBlock;
use crate::tokens::{BlockHash, SequenceHash};
use derive_getters::Getters; use derive_getters::Getters;
use dynamo_runtime::component::Component; use dynamo_runtime::component::Component;
use dynamo_tokens::blocks::UniqueBlock;
use dynamo_tokens::{BlockHash, SequenceHash};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::sync::Arc; use std::sync::Arc;
......
...@@ -9,8 +9,8 @@ use std::sync::Arc; ...@@ -9,8 +9,8 @@ use std::sync::Arc;
use uuid::Uuid; use uuid::Uuid;
use crate::mocker::perf_model::PerfModel; use crate::mocker::perf_model::PerfModel;
use crate::tokens::blocks::UniqueBlock; use dynamo_tokens::blocks::UniqueBlock;
use crate::tokens::{BlockHash, SequenceHash, Token}; use dynamo_tokens::{BlockHash, SequenceHash, Token};
pub type NumBlocks = usize; pub type NumBlocks = usize;
......
...@@ -37,7 +37,7 @@ use crate::mocker::protocols::{ ...@@ -37,7 +37,7 @@ use crate::mocker::protocols::{
}; };
use crate::mocker::running_mean::RunningMean; use crate::mocker::running_mean::RunningMean;
use crate::mocker::sequence::ActiveSequence; use crate::mocker::sequence::ActiveSequence;
use crate::tokens::blocks::UniqueBlock; use dynamo_tokens::blocks::UniqueBlock;
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::time::Duration; use tokio::time::Duration;
......
...@@ -2,9 +2,9 @@ ...@@ -2,9 +2,9 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use crate::mocker::protocols::MoveBlock; use crate::mocker::protocols::MoveBlock;
use crate::tokens::blocks::UniqueBlock;
use crate::tokens::{TokenBlockSequence, Tokens};
use derive_getters::Getters; use derive_getters::Getters;
use dynamo_tokens::blocks::UniqueBlock;
use dynamo_tokens::{TokenBlockSequence, Tokens};
use rand::random; use rand::random;
/// Create unique blocks from a TokenBlockSequence /// Create unique blocks from a TokenBlockSequence
......
...@@ -10,8 +10,6 @@ use derive_getters::Dissolve; ...@@ -10,8 +10,6 @@ use derive_getters::Dissolve;
use rayon::prelude::*; use rayon::prelude::*;
use std::ops::Range; use std::ops::Range;
pub mod blocks;
/// A token is represented as a 32-bit unsigned integer. /// A token is represented as a 32-bit unsigned integer.
pub type Token = u32; pub type Token = u32;
......
...@@ -16,6 +16,7 @@ dashmap = { workspace = true } ...@@ -16,6 +16,7 @@ dashmap = { workspace = true }
derive-getters = { workspace = true } derive-getters = { workspace = true }
serde = { workspace = true } serde = { workspace = true }
thiserror = { workspace = true } thiserror = { workspace = true }
uuid = { workspace = true }
xxhash-rust = { workspace = true } xxhash-rust = { workspace = true }
bs58 = "0.5" bs58 = "0.5"
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
//! Block identification types for token sequences.
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use uuid::Uuid; use uuid::Uuid;
/// A global hash type for identifying blocks.
pub type GlobalHash = u64; pub type GlobalHash = u64;
/// Represents an active block beign built /// Represents an active block being built.
#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
pub enum UniqueBlock { pub enum UniqueBlock {
/// Block identified by UUID /// Block identified by UUID (partial/incomplete block).
PartialBlock(Uuid), PartialBlock(Uuid),
/// Block identified by hash /// Block identified by hash (complete block).
FullBlock(GlobalHash), FullBlock(GlobalHash),
} }
......
...@@ -9,6 +9,7 @@ use bytemuck::cast_slice; ...@@ -9,6 +9,7 @@ use bytemuck::cast_slice;
use derive_getters::Dissolve; use derive_getters::Dissolve;
use std::ops::Range; use std::ops::Range;
pub mod blocks;
mod radix; mod radix;
pub use radix::PositionalRadixTree; pub use radix::PositionalRadixTree;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment