"csrc/vscode:/vscode.git/clone" did not exist on "fdea8ec16775e1645620b5ff46b799d60df4624c"
Unverified Commit 45be2fdc authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

chore(kv-router): drop easy llm facade reexports (#7474)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 0ac9ef9c
...@@ -4067,6 +4067,7 @@ dependencies = [ ...@@ -4067,6 +4067,7 @@ dependencies = [
"anyhow", "anyhow",
"async-once-cell", "async-once-cell",
"cbindgen", "cbindgen",
"dynamo-kv-router",
"dynamo-llm", "dynamo-llm",
"dynamo-runtime", "dynamo-runtime",
"libc", "libc",
......
...@@ -31,6 +31,7 @@ cbindgen = "0.27" ...@@ -31,6 +31,7 @@ cbindgen = "0.27"
[dependencies] [dependencies]
dynamo-llm = { path = "../../llm" } dynamo-llm = { path = "../../llm" }
dynamo-kv-router = { path = "../../kv-router" }
dynamo-runtime = { path = "../../runtime" } dynamo-runtime = { path = "../../runtime" }
anyhow = { workspace = true } anyhow = { workspace = true }
......
...@@ -11,7 +11,11 @@ use std::sync::Arc; ...@@ -11,7 +11,11 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration; use std::time::Duration;
use dynamo_llm::kv_router::{protocols::*, publisher::KvEventPublisher}; use dynamo_kv_router::{
config::{KvRouterConfig, RouterConfigOverride},
protocols::*,
};
use dynamo_llm::kv_router::publisher::KvEventPublisher;
use dynamo_llm::preprocessor::OpenAIPreprocessor; use dynamo_llm::preprocessor::OpenAIPreprocessor;
use dynamo_runtime::discovery::{DiscoveryQuery, hash_pod_name}; use dynamo_runtime::discovery::{DiscoveryQuery, hash_pod_name};
use dynamo_runtime::{DistributedRuntime, Worker}; use dynamo_runtime::{DistributedRuntime, Worker};
...@@ -19,9 +23,7 @@ use dynamo_runtime::{DistributedRuntime, Worker}; ...@@ -19,9 +23,7 @@ use dynamo_runtime::{DistributedRuntime, Worker};
use dynamo_runtime::Runtime; use dynamo_runtime::Runtime;
use dynamo_llm::discovery::{ModelManager, WORKER_TYPE_DECODE}; use dynamo_llm::discovery::{ModelManager, WORKER_TYPE_DECODE};
use dynamo_llm::kv_router::KvRouterConfig; use dynamo_llm::kv_router::{KvRouter, PrefillRouter};
use dynamo_llm::kv_router::protocols::WorkerWithDpRank;
use dynamo_llm::kv_router::{KvRouter, PrefillRouter, RouterConfigOverride};
use dynamo_runtime::pipeline::RouterMode; use dynamo_runtime::pipeline::RouterMode;
use std::collections::HashSet; use std::collections::HashSet;
...@@ -433,7 +435,7 @@ impl RouterHandles { ...@@ -433,7 +435,7 @@ impl RouterHandles {
async fn query_prefill_worker( async fn query_prefill_worker(
&self, &self,
tokens: &[u32], tokens: &[u32],
block_mm_infos: Option<&[Option<dynamo_llm::kv_router::protocols::BlockExtraInfo>]>, block_mm_infos: Option<&[Option<dynamo_kv_router::protocols::BlockExtraInfo>]>,
update_states: bool, update_states: bool,
lora_name: Option<String>, lora_name: Option<String>,
priority_jump: f64, priority_jump: f64,
......
...@@ -24,7 +24,7 @@ crate-type = ["cdylib", "rlib"] ...@@ -24,7 +24,7 @@ crate-type = ["cdylib", "rlib"]
[features] [features]
default = [] default = []
media-ffmpeg = ["dynamo-llm/media-ffmpeg"] media-ffmpeg = ["dynamo-llm/media-ffmpeg"]
kv-indexer = ["dep:dynamo-kv-router", "dep:clap", "dep:tracing-subscriber"] kv-indexer = ["dep:clap", "dep:tracing-subscriber"]
kv-indexer-runtime = ["kv-indexer", "dynamo-kv-router/indexer-runtime"] kv-indexer-runtime = ["kv-indexer", "dynamo-kv-router/indexer-runtime"]
kv-indexer-metrics = ["kv-indexer", "dynamo-kv-router/metrics"] kv-indexer-metrics = ["kv-indexer", "dynamo-kv-router/metrics"]
...@@ -46,8 +46,8 @@ tokio-stream = { version = "0" } ...@@ -46,8 +46,8 @@ tokio-stream = { version = "0" }
tokio-util = { version = "0.7", features = ["rt"] } tokio-util = { version = "0.7", features = ["rt"] }
tracing = { version = "0" } tracing = { version = "0" }
# kv-indexer (optional) # kv-indexer / shared kv-router types
dynamo-kv-router = { path = "../../kv-router", features = ["standalone-indexer"], optional = true } dynamo-kv-router = { path = "../../kv-router", features = ["standalone-indexer"] }
clap = { version = "4.5", features = ["derive"], optional = true } clap = { version = "4.5", features = ["derive"], optional = true }
tracing-subscriber = { version = "0.3", features = ["env-filter"], optional = true } tracing-subscriber = { version = "0.3", features = ["env-filter"], optional = true }
......
...@@ -34,8 +34,9 @@ use dynamo_runtime::{ ...@@ -34,8 +34,9 @@ use dynamo_runtime::{
traits::DistributedRuntimeProvider, traits::DistributedRuntimeProvider,
}; };
use dynamo_kv_router::config::KvRouterConfig;
use dynamo_llm::entrypoint::RouterConfig;
use dynamo_llm::{self as llm_rs}; use dynamo_llm::{self as llm_rs};
use dynamo_llm::{entrypoint::RouterConfig, kv_router::KvRouterConfig};
use crate::llm::local_model::ModelRuntimeConfig; use crate::llm::local_model::ModelRuntimeConfig;
use crate::llm::preprocessor::{MediaDecoder, MediaFetcher}; use crate::llm::preprocessor::{MediaDecoder, MediaFetcher};
......
...@@ -10,12 +10,12 @@ use std::sync::Arc; ...@@ -10,12 +10,12 @@ use std::sync::Arc;
use pyo3::{exceptions::PyException, prelude::*}; use pyo3::{exceptions::PyException, prelude::*};
use pyo3_async_runtimes::TaskLocals; use pyo3_async_runtimes::TaskLocals;
use dynamo_kv_router::config::KvRouterConfig as RsKvRouterConfig;
use dynamo_llm::discovery::LoadThresholdConfig as RsLoadThresholdConfig; use dynamo_llm::discovery::LoadThresholdConfig as RsLoadThresholdConfig;
use dynamo_llm::entrypoint::ChatEngineFactoryCallback; use dynamo_llm::entrypoint::ChatEngineFactoryCallback;
use dynamo_llm::entrypoint::EngineConfig as RsEngineConfig; use dynamo_llm::entrypoint::EngineConfig as RsEngineConfig;
use dynamo_llm::entrypoint::RouterConfig as RsRouterConfig; use dynamo_llm::entrypoint::RouterConfig as RsRouterConfig;
use dynamo_llm::entrypoint::input::Input; use dynamo_llm::entrypoint::input::Input;
use dynamo_llm::kv_router::KvRouterConfig as RsKvRouterConfig;
use dynamo_llm::local_model::DEFAULT_HTTP_PORT; use dynamo_llm::local_model::DEFAULT_HTTP_PORT;
use dynamo_llm::local_model::{LocalModel, LocalModelBuilder}; use dynamo_llm::local_model::{LocalModel, LocalModelBuilder};
use dynamo_llm::mocker::make_mocker_engine; use dynamo_llm::mocker::make_mocker_engine;
......
...@@ -13,17 +13,18 @@ use super::*; ...@@ -13,17 +13,18 @@ use super::*;
use crate::Endpoint; use crate::Endpoint;
#[cfg(feature = "kv-indexer")] #[cfg(feature = "kv-indexer")]
use clap::Parser; use clap::Parser;
use dynamo_kv_router::config::{KvRouterConfig, RouterConfigOverride};
use dynamo_kv_router::protocols::compute_block_hash_for_seq;
use dynamo_kv_router::protocols::*;
#[cfg(feature = "kv-indexer-runtime")] #[cfg(feature = "kv-indexer-runtime")]
use dynamo_kv_router::standalone_indexer::RuntimeConfig; use dynamo_kv_router::standalone_indexer::RuntimeConfig;
#[cfg(feature = "kv-indexer")] #[cfg(feature = "kv-indexer")]
use dynamo_kv_router::standalone_indexer::{self, IndexerConfig}; use dynamo_kv_router::standalone_indexer::{self, IndexerConfig};
use llm_rs::kv_router::protocols::compute_block_hash_for_seq;
use rs::pipeline::{AsyncEngine, SingleIn}; use rs::pipeline::{AsyncEngine, SingleIn};
use rs::protocols::annotated::Annotated as RsAnnotated; use rs::protocols::annotated::Annotated as RsAnnotated;
use tracing; use tracing;
use llm_rs::kv_router::KvPushRouter as RsKvPushRouter; use llm_rs::kv_router::KvPushRouter as RsKvPushRouter;
use llm_rs::kv_router::protocols::*;
use llm_rs::kv_router::publisher::{KvEventSourceConfig, create_stored_blocks}; use llm_rs::kv_router::publisher::{KvEventSourceConfig, create_stored_blocks};
use llm_rs::protocols::common::timing::RequestTracker; use llm_rs::protocols::common::timing::RequestTracker;
use llm_rs::protocols::common::{OutputOptions, SamplingOptions, StopConditions}; use llm_rs::protocols::common::{OutputOptions, SamplingOptions, StopConditions};
...@@ -389,7 +390,7 @@ impl KvEventPublisher { ...@@ -389,7 +390,7 @@ impl KvEventPublisher {
#[pyclass] #[pyclass]
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct OverlapScores { pub(crate) struct OverlapScores {
inner: llm_rs::kv_router::protocols::OverlapScores, inner: dynamo_kv_router::protocols::OverlapScores,
} }
#[pymethods] #[pymethods]
...@@ -413,9 +414,9 @@ impl OverlapScores { ...@@ -413,9 +414,9 @@ impl OverlapScores {
#[derive(Debug)] #[derive(Debug)]
enum RadixTreeRequest { enum RadixTreeRequest {
FindMatches { FindMatches {
local_block_hashes: Vec<llm_rs::kv_router::protocols::LocalBlockHash>, local_block_hashes: Vec<LocalBlockHash>,
early_exit: bool, early_exit: bool,
response_tx: mpsc::SyncSender<llm_rs::kv_router::protocols::OverlapScores>, response_tx: mpsc::SyncSender<dynamo_kv_router::protocols::OverlapScores>,
}, },
ApplyEvent { ApplyEvent {
worker_id: WorkerId, worker_id: WorkerId,
...@@ -431,7 +432,7 @@ enum RadixTreeRequest { ...@@ -431,7 +432,7 @@ enum RadixTreeRequest {
response_tx: mpsc::SyncSender<()>, response_tx: mpsc::SyncSender<()>,
}, },
DumpTreeAsEvents { DumpTreeAsEvents {
response_tx: mpsc::SyncSender<Vec<llm_rs::kv_router::protocols::RouterEvent>>, response_tx: mpsc::SyncSender<Vec<RouterEvent>>,
}, },
Shutdown, Shutdown,
} }
...@@ -454,7 +455,7 @@ impl RadixTree { ...@@ -454,7 +455,7 @@ impl RadixTree {
// Spawn dedicated thread with simplified sync processing // Spawn dedicated thread with simplified sync processing
std::thread::spawn(move || { std::thread::spawn(move || {
let mut radix_tree = let mut radix_tree =
llm_rs::kv_router::indexer::RadixTree::new_with_frequency(expiration_duration); dynamo_kv_router::indexer::RadixTree::new_with_frequency(expiration_duration);
loop { loop {
match request_rx.recv() { match request_rx.recv() {
...@@ -485,12 +486,8 @@ impl RadixTree { ...@@ -485,12 +486,8 @@ impl RadixTree {
) -> PyResult<OverlapScores> { ) -> PyResult<OverlapScores> {
let (response_tx, response_rx) = mpsc::sync_channel(1); let (response_tx, response_rx) = mpsc::sync_channel(1);
let local_block_hashes = py.allow_threads(|| { let local_block_hashes =
sequence py.allow_threads(|| sequence.into_iter().map(LocalBlockHash).collect());
.into_iter()
.map(llm_rs::kv_router::protocols::LocalBlockHash)
.collect()
});
let request = RadixTreeRequest::FindMatches { let request = RadixTreeRequest::FindMatches {
local_block_hashes, local_block_hashes,
...@@ -623,7 +620,7 @@ impl RadixTree { ...@@ -623,7 +620,7 @@ impl RadixTree {
impl RadixTree { impl RadixTree {
fn handle_request( fn handle_request(
radix_tree: &mut llm_rs::kv_router::indexer::RadixTree, radix_tree: &mut dynamo_kv_router::indexer::RadixTree,
request: RadixTreeRequest, request: RadixTreeRequest,
) { ) {
match request { match request {
...@@ -640,15 +637,9 @@ impl RadixTree { ...@@ -640,15 +637,9 @@ impl RadixTree {
kv_cache_event_bytes, kv_cache_event_bytes,
response_tx, response_tx,
} => { } => {
let result = match serde_json::from_slice::< let result = match serde_json::from_slice::<KvCacheEvent>(&kv_cache_event_bytes) {
llm_rs::kv_router::protocols::KvCacheEvent,
>(&kv_cache_event_bytes)
{
Ok(kv_cache_event) => { Ok(kv_cache_event) => {
let router_event = llm_rs::kv_router::protocols::RouterEvent::new( let router_event = RouterEvent::new(worker_id, kv_cache_event);
worker_id,
kv_cache_event,
);
match radix_tree.apply_event(router_event) { match radix_tree.apply_event(router_event) {
Ok(_) => Ok(()), Ok(_) => Ok(()),
Err(e) => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>( Err(e) => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
...@@ -705,7 +696,7 @@ impl Drop for RadixTree { ...@@ -705,7 +696,7 @@ impl Drop for RadixTree {
async fn create_kv_router_from_endpoint( async fn create_kv_router_from_endpoint(
endpoint: &Endpoint, endpoint: &Endpoint,
block_size: usize, block_size: usize,
kv_router_config: Option<llm_rs::kv_router::KvRouterConfig>, kv_router_config: Option<KvRouterConfig>,
) -> Result<Arc<llm_rs::kv_router::KvRouter>, PyErr> { ) -> Result<Arc<llm_rs::kv_router::KvRouter>, PyErr> {
// Create ModelManager and use it to create KvRouter (ensures registration) // Create ModelManager and use it to create KvRouter (ensures registration)
let model_manager = Arc::new(llm_rs::discovery::ModelManager::new()); let model_manager = Arc::new(llm_rs::discovery::ModelManager::new());
...@@ -964,7 +955,7 @@ impl KvRouter { ...@@ -964,7 +955,7 @@ impl KvRouter {
OutputOptions::default() OutputOptions::default()
}; };
let router_config_override: Option<llm_rs::kv_router::RouterConfigOverride> = let router_config_override: Option<RouterConfigOverride> =
if let Some(obj) = router_config_override { if let Some(obj) = router_config_override {
Some(depythonize(obj.bind(py)).map_err(to_pyerr)?) Some(depythonize(obj.bind(py)).map_err(to_pyerr)?)
} else { } else {
...@@ -1068,7 +1059,7 @@ impl KvRouter { ...@@ -1068,7 +1059,7 @@ impl KvRouter {
lora_name: Option<String>, lora_name: Option<String>,
) -> PyResult<Bound<'p, PyAny>> { ) -> PyResult<Bound<'p, PyAny>> {
let router_config_override = if let Some(obj) = router_config_override { let router_config_override = if let Some(obj) = router_config_override {
let override_config: llm_rs::kv_router::RouterConfigOverride = let override_config: RouterConfigOverride =
depythonize(obj.bind(py)).map_err(to_pyerr)?; depythonize(obj.bind(py)).map_err(to_pyerr)?;
Some(override_config) Some(override_config)
} else { } else {
......
...@@ -31,7 +31,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; ...@@ -31,7 +31,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokenizers::Tokenizer; use tokenizers::Tokenizer;
use tokio::sync::{Mutex, Semaphore}; use tokio::sync::{Mutex, Semaphore};
use dynamo_llm::kv_router::protocols::{ use dynamo_kv_router::protocols::{
ExternalSequenceBlockHash, KvCacheEvent, KvCacheEventData, KvCacheStoreData, ExternalSequenceBlockHash, KvCacheEvent, KvCacheEventData, KvCacheStoreData,
KvCacheStoredBlockData, LocalBlockHash, RouterEvent, WorkerId, compute_hash, KvCacheStoredBlockData, LocalBlockHash, RouterEvent, WorkerId, compute_hash,
compute_seq_hash_for_block, compute_seq_hash_for_block,
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
use std::{collections::HashSet, sync::Arc}; use std::{collections::HashSet, sync::Arc};
use dashmap::{DashMap, mapref::entry::Entry}; use dashmap::{DashMap, mapref::entry::Entry};
use dynamo_kv_router::{config::KvRouterConfig, protocols::WorkerId};
use tokio::sync::oneshot; use tokio::sync::oneshot;
use super::worker_monitor::LoadThresholdConfig; use super::worker_monitor::LoadThresholdConfig;
...@@ -17,10 +18,7 @@ use dynamo_runtime::{ ...@@ -17,10 +18,7 @@ use dynamo_runtime::{
}; };
use crate::{ use crate::{
kv_router::{ kv_router::{KvRouter, router_endpoint_id, scheduler::DefaultWorkerSelector},
KvRouter, KvRouterConfig, protocols::WorkerId, router_endpoint_id,
scheduler::DefaultWorkerSelector,
},
local_model::runtime_config::DisaggregatedEndpoint, local_model::runtime_config::DisaggregatedEndpoint,
model_card::ModelDeploymentCard, model_card::ModelDeploymentCard,
types::{ types::{
......
...@@ -9,9 +9,9 @@ use dynamo_runtime::component::Endpoint; ...@@ -9,9 +9,9 @@ use dynamo_runtime::component::Endpoint;
use dynamo_runtime::discovery::{DiscoveryQuery, watch_and_extract_field}; use dynamo_runtime::discovery::{DiscoveryQuery, watch_and_extract_field};
use dynamo_runtime::prelude::DistributedRuntimeProvider; use dynamo_runtime::prelude::DistributedRuntimeProvider;
use crate::kv_router::protocols::WorkerId;
use crate::local_model::runtime_config::ModelRuntimeConfig; use crate::local_model::runtime_config::ModelRuntimeConfig;
use crate::model_card::ModelDeploymentCard; use crate::model_card::ModelDeploymentCard;
use dynamo_kv_router::protocols::WorkerId;
/// Type alias for the runtime config watch receiver. /// Type alias for the runtime config watch receiver.
pub type RuntimeConfigWatch = watch::Receiver<HashMap<WorkerId, ModelRuntimeConfig>>; pub type RuntimeConfigWatch = watch::Receiver<HashMap<WorkerId, ModelRuntimeConfig>>;
......
...@@ -9,6 +9,7 @@ use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; ...@@ -9,6 +9,7 @@ use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use tokio::sync::Notify; use tokio::sync::Notify;
use dashmap::DashMap; use dashmap::DashMap;
use dynamo_kv_router::protocols::ActiveLoad;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::http::service::metrics::{ use crate::http::service::metrics::{
...@@ -17,7 +18,6 @@ use crate::http::service::metrics::{ ...@@ -17,7 +18,6 @@ use crate::http::service::metrics::{
}; };
use crate::kv_router::KV_METRICS_SUBJECT; use crate::kv_router::KV_METRICS_SUBJECT;
use crate::kv_router::metrics::WORKER_LOAD_METRICS; use crate::kv_router::metrics::WORKER_LOAD_METRICS;
use crate::kv_router::protocols::ActiveLoad;
use crate::model_card::ModelDeploymentCard; use crate::model_card::ModelDeploymentCard;
use dynamo_runtime::component::Client; use dynamo_runtime::component::Client;
use dynamo_runtime::discovery::{DiscoveryQuery, watch_and_extract_field}; use dynamo_runtime::discovery::{DiscoveryQuery, watch_and_extract_field};
......
...@@ -12,11 +12,12 @@ use std::future::Future; ...@@ -12,11 +12,12 @@ use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use dynamo_kv_router::config::KvRouterConfig;
use dynamo_runtime::{discovery::ModelCardInstanceId, pipeline::RouterMode}; use dynamo_runtime::{discovery::ModelCardInstanceId, pipeline::RouterMode};
use crate::{ use crate::{
backend::ExecutionContext, discovery::LoadThresholdConfig, engines::StreamingEngine, backend::ExecutionContext, discovery::LoadThresholdConfig, engines::StreamingEngine,
kv_router::KvRouterConfig, local_model::LocalModel, model_card::ModelDeploymentCard, local_model::LocalModel, model_card::ModelDeploymentCard,
types::openai::chat_completions::OpenAIChatCompletionsStreamingEngine, types::openai::chat_completions::OpenAIChatCompletionsStreamingEngine,
}; };
......
...@@ -5,7 +5,17 @@ use std::sync::Arc; ...@@ -5,7 +5,17 @@ use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use anyhow::Result; use anyhow::Result;
use dynamo_kv_router::{ConcurrentRadixTree, ThreadPoolIndexer}; use dynamo_kv_router::{
ConcurrentRadixTree, ThreadPoolIndexer,
approx::PruneConfig,
config::{KvRouterConfig, RouterConfigOverride},
indexer::{GetWorkersRequest, KvIndexer, KvIndexerInterface, KvIndexerMetrics, KvRouterError},
protocols::KV_EVENT_SUBJECT,
protocols::{
BlockExtraInfo, DpRank, LocalBlockHash, OverlapScores, RouterEvent, RouterRequest,
RouterResponse, TokensWithHashes, WorkerId, WorkerWithDpRank, compute_block_hash_for_seq,
},
};
use dynamo_runtime::{ use dynamo_runtime::{
component::{Client, Endpoint}, component::{Client, Endpoint},
discovery::DiscoveryQuery, discovery::DiscoveryQuery,
...@@ -22,15 +32,7 @@ use tokio::sync::oneshot; ...@@ -22,15 +32,7 @@ use tokio::sync::oneshot;
use tracing::Instrument; use tracing::Instrument;
use validator::Validate; use validator::Validate;
// Re-export from dynamo-kv-router crate
pub use dynamo_kv_router::approx;
pub use dynamo_kv_router::indexer;
pub use dynamo_kv_router::protocols;
pub use dynamo_kv_router::scheduling;
pub use dynamo_kv_router::selector;
pub mod cache_control; pub mod cache_control;
pub mod config;
mod jetstream; mod jetstream;
pub mod metrics; pub mod metrics;
pub mod prefill_router; pub mod prefill_router;
...@@ -45,20 +47,12 @@ pub mod subscriber; ...@@ -45,20 +47,12 @@ pub mod subscriber;
pub mod worker_query; pub mod worker_query;
pub use cache_control::{CacheControlClient, spawn_pin_prefix}; pub use cache_control::{CacheControlClient, spawn_pin_prefix};
pub use config::{KvRouterConfig, RouterConfigOverride};
pub use prefill_router::PrefillRouter; pub use prefill_router::PrefillRouter;
pub use push_router::{DirectRoutingRouter, KvPushRouter}; pub use push_router::{DirectRoutingRouter, KvPushRouter};
use crate::{ use crate::{
discovery::RuntimeConfigWatch, discovery::RuntimeConfigWatch,
kv_router::{ kv_router::{
approx::PruneConfig,
indexer::{GetWorkersRequest, KvIndexer, KvIndexerInterface, KvRouterError},
protocols::{
BlockExtraInfo, DpRank, LocalBlockHash, OverlapScores, RouterEvent, RouterRequest,
RouterResponse, TokensWithHashes, WorkerId, WorkerWithDpRank,
compute_block_hash_for_seq,
},
remote_indexer::RemoteIndexer, remote_indexer::RemoteIndexer,
scheduler::{KvScheduler, PotentialLoad}, scheduler::{KvScheduler, PotentialLoad},
sequence::{SequenceError, SequenceRequest}, sequence::{SequenceError, SequenceRequest},
...@@ -75,7 +69,6 @@ use std::collections::HashSet; ...@@ -75,7 +69,6 @@ use std::collections::HashSet;
pub const KV_METRICS_ENDPOINT: &str = "load_metrics"; pub const KV_METRICS_ENDPOINT: &str = "load_metrics";
// for metric publishing (push-based) // for metric publishing (push-based)
pub use dynamo_kv_router::protocols::KV_EVENT_SUBJECT;
pub const KV_METRICS_SUBJECT: &str = "kv_metrics"; pub const KV_METRICS_SUBJECT: &str = "kv_metrics";
// for inter-router comms // for inter-router comms
...@@ -86,9 +79,6 @@ pub const ACTIVE_SEQUENCES_SUBJECT: &str = "active_sequences_events"; ...@@ -86,9 +79,6 @@ pub const ACTIVE_SEQUENCES_SUBJECT: &str = "active_sequences_events";
pub const RADIX_STATE_BUCKET: &str = "radix-bucket"; pub const RADIX_STATE_BUCKET: &str = "radix-bucket";
pub const RADIX_STATE_FILE: &str = "radix-state"; pub const RADIX_STATE_FILE: &str = "radix-state";
// for standalone indexer query — re-export from shared crate
pub use dynamo_kv_router::indexer::KV_INDEXER_QUERY_ENDPOINT;
// for worker-local kvindexer query // for worker-local kvindexer query
pub const WORKER_KV_INDEXER_BUFFER_SIZE: usize = 1024; // store 1024 most recent events in worker buffer pub const WORKER_KV_INDEXER_BUFFER_SIZE: usize = 1024; // store 1024 most recent events in worker buffer
...@@ -175,7 +165,7 @@ impl Indexer { ...@@ -175,7 +165,7 @@ impl Indexer {
// with TTL/pruning regardless of event_threads, since updates come from // with TTL/pruning regardless of event_threads, since updates come from
// routing decisions only, not live KV events from workers. // routing decisions only, not live KV events from workers.
if !kv_router_config.use_kv_events { if !kv_router_config.use_kv_events {
let kv_indexer_metrics = indexer::KvIndexerMetrics::from_component(component); let kv_indexer_metrics = KvIndexerMetrics::from_component(component);
let cancellation_token = component.drt().primary_token(); let cancellation_token = component.drt().primary_token();
let prune_config = Some(PruneConfig { let prune_config = Some(PruneConfig {
ttl: Duration::from_secs_f64(kv_router_config.router_ttl_secs), ttl: Duration::from_secs_f64(kv_router_config.router_ttl_secs),
...@@ -199,7 +189,7 @@ impl Indexer { ...@@ -199,7 +189,7 @@ impl Indexer {
)))); ))));
} }
let kv_indexer_metrics = indexer::KvIndexerMetrics::from_component(component); let kv_indexer_metrics = KvIndexerMetrics::from_component(component);
let cancellation_token = component.drt().primary_token(); let cancellation_token = component.drt().primary_token();
Ok(Indexer::KvIndexer(KvIndexer::new_with_frequency( Ok(Indexer::KvIndexer(KvIndexer::new_with_frequency(
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
pub use dynamo_kv_router::config::{KvRouterConfig, RouterConfigOverride};
...@@ -5,6 +5,7 @@ use std::collections::HashSet; ...@@ -5,6 +5,7 @@ use std::collections::HashSet;
use std::time::Duration; use std::time::Duration;
use anyhow::Result; use anyhow::Result;
use dynamo_kv_router::{config::KvRouterConfig, protocols::RouterEvent};
use dynamo_runtime::{ use dynamo_runtime::{
component::Component, component::Component,
config::environment_names::nats as env_nats, config::environment_names::nats as env_nats,
...@@ -17,8 +18,7 @@ use rand::Rng; ...@@ -17,8 +18,7 @@ use rand::Rng;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use crate::kv_router::{ use crate::kv_router::{
Indexer, KV_EVENT_SUBJECT, KvRouterConfig, RADIX_STATE_BUCKET, RADIX_STATE_FILE, Indexer, KV_EVENT_SUBJECT, RADIX_STATE_BUCKET, RADIX_STATE_FILE, router_discovery_query,
protocols::RouterEvent, router_discovery_query,
}; };
/// Helper function to create a KV stream name from a component and subject. /// Helper function to create a KV stream name from a component and subject.
......
...@@ -10,6 +10,10 @@ use tokio::sync::{OwnedSemaphorePermit, oneshot}; ...@@ -10,6 +10,10 @@ use tokio::sync::{OwnedSemaphorePermit, oneshot};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::Instrument; use tracing::Instrument;
use dynamo_kv_router::{
config::{KvRouterConfig, RouterConfigOverride},
protocols::{BlockExtraInfo, WorkerId},
};
use dynamo_runtime::{ use dynamo_runtime::{
component::Endpoint, component::Endpoint,
pipeline::{ pipeline::{
...@@ -21,8 +25,7 @@ use dynamo_runtime::{ ...@@ -21,8 +25,7 @@ use dynamo_runtime::{
use crate::{ use crate::{
discovery::ModelManager, discovery::ModelManager,
kv_router::protocols::WorkerId, kv_router::KvPushRouter,
kv_router::{KvPushRouter, KvRouterConfig, RouterConfigOverride, protocols::BlockExtraInfo},
protocols::common::llm_backend::{LLMEngineOutput, PreprocessedRequest}, protocols::common::llm_backend::{LLMEngineOutput, PreprocessedRequest},
protocols::common::preprocessor::{BootstrapInfo, PrefillResult}, protocols::common::preprocessor::{BootstrapInfo, PrefillResult},
protocols::common::timing::{RequestPhase, RequestTracker, WORKER_TYPE_PREFILL}, protocols::common::timing::{RequestPhase, RequestTracker, WORKER_TYPE_PREFILL},
......
...@@ -35,13 +35,13 @@ fn create_kv_stream_name(component: &Component, subject: &str) -> String { ...@@ -35,13 +35,13 @@ fn create_kv_stream_name(component: &Component, subject: &str) -> String {
.replace("_", "-") .replace("_", "-")
} }
use dynamo_kv_router::indexer::{KvIndexerMetrics, LocalKvIndexer};
use dynamo_kv_router::protocols::*;
pub use dynamo_kv_router::zmq_wire::create_stored_blocks; pub use dynamo_kv_router::zmq_wire::create_stored_blocks;
use dynamo_kv_router::zmq_wire::*; use dynamo_kv_router::zmq_wire::*;
use crate::kv_router::{ use crate::kv_router::{
KV_EVENT_SUBJECT, KV_METRICS_SUBJECT, WORKER_KV_INDEXER_BUFFER_SIZE, KV_EVENT_SUBJECT, KV_METRICS_SUBJECT, WORKER_KV_INDEXER_BUFFER_SIZE,
indexer::{KvIndexerMetrics, LocalKvIndexer},
protocols::*,
worker_query::start_worker_kv_query_endpoint, worker_query::start_worker_kv_query_endpoint,
}; };
use dynamo_runtime::config::environment_names::nats as env_nats; use dynamo_runtime::config::environment_names::nats as env_nats;
...@@ -1048,7 +1048,7 @@ impl WorkerMetricsPublisher { ...@@ -1048,7 +1048,7 @@ impl WorkerMetricsPublisher {
#[cfg(test)] #[cfg(test)]
mod test_event_processing { mod test_event_processing {
use super::*; use super::*;
use crate::kv_router::protocols::compute_block_hash_for_seq; use dynamo_kv_router::protocols::compute_block_hash_for_seq;
// --------------------------------------------------------------------- // ---------------------------------------------------------------------
// create_stored_block_from_parts -------------------------------------- // create_stored_block_from_parts --------------------------------------
...@@ -1452,9 +1452,9 @@ mod test_event_processing { ...@@ -1452,9 +1452,9 @@ mod test_event_processing {
mod tests_startup_helpers { mod tests_startup_helpers {
use super::*; use super::*;
use crate::kv_router::KvIndexer; use crate::kv_router::KvIndexer;
use crate::kv_router::indexer::KvIndexerInterface;
use crate::kv_router::protocols::{ExternalSequenceBlockHash, LocalBlockHash};
use bytes::Bytes; use bytes::Bytes;
use dynamo_kv_router::indexer::{GetWorkersRequest, KvIndexerInterface};
use dynamo_kv_router::protocols::{ExternalSequenceBlockHash, LocalBlockHash};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use zeromq::{PubSocket, Socket, SocketSend, ZmqMessage}; use zeromq::{PubSocket, Socket, SocketSend, ZmqMessage};
...@@ -1608,7 +1608,7 @@ mod tests_startup_helpers { ...@@ -1608,7 +1608,7 @@ mod tests_startup_helpers {
// Try up to 20 times (200ms total) // Try up to 20 times (200ms total)
let (resp_tx, resp_rx) = tokio::sync::oneshot::channel(); let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
get_workers_tx get_workers_tx
.send(crate::kv_router::indexer::GetWorkersRequest { resp: resp_tx }) .send(GetWorkersRequest { resp: resp_tx })
.await .await
.unwrap(); .unwrap();
let workers: Vec<u64> = resp_rx.await.unwrap(); let workers: Vec<u64> = resp_rx.await.unwrap();
...@@ -2014,7 +2014,7 @@ mod tests_startup_helpers { ...@@ -2014,7 +2014,7 @@ mod tests_startup_helpers {
for _ in 0..20 { for _ in 0..20 {
let (resp_tx, resp_rx) = tokio::sync::oneshot::channel(); let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
get_workers_tx get_workers_tx
.send(crate::kv_router::indexer::GetWorkersRequest { resp: resp_tx }) .send(GetWorkersRequest { resp: resp_tx })
.await .await
.unwrap(); .unwrap();
let workers: Vec<u64> = resp_rx.await.unwrap(); let workers: Vec<u64> = resp_rx.await.unwrap();
...@@ -2085,7 +2085,7 @@ mod tests_startup_helpers { ...@@ -2085,7 +2085,7 @@ mod tests_startup_helpers {
.unwrap(); .unwrap();
let router_overlap = overlap let router_overlap = overlap
.scores .scores
.get(&crate::kv_router::protocols::WorkerWithDpRank::from_worker_id(worker_1_id)) .get(&dynamo_kv_router::protocols::WorkerWithDpRank::from_worker_id(worker_1_id))
.copied() .copied()
.unwrap_or(0); .unwrap_or(0);
assert_eq!( assert_eq!(
...@@ -2101,9 +2101,9 @@ mod tests_startup_helpers { ...@@ -2101,9 +2101,9 @@ mod tests_startup_helpers {
.get_events_in_id_range(Some(last_known_id + 1), None) .get_events_in_id_range(Some(last_known_id + 1), None)
.await; .await;
let missed_events = match response { let missed_events = match response {
crate::kv_router::indexer::WorkerKvQueryResponse::Events(e) => e, dynamo_kv_router::indexer::WorkerKvQueryResponse::Events(e) => e,
crate::kv_router::indexer::WorkerKvQueryResponse::TreeDump { events: e, .. } => e, dynamo_kv_router::indexer::WorkerKvQueryResponse::TreeDump { events: e, .. } => e,
crate::kv_router::indexer::WorkerKvQueryResponse::Error(message) => { dynamo_kv_router::indexer::WorkerKvQueryResponse::Error(message) => {
panic!("Unexpected error response: {message}") panic!("Unexpected error response: {message}")
} }
other => panic!("Unexpected response: {:?}", other), other => panic!("Unexpected response: {:?}", other),
...@@ -2129,7 +2129,7 @@ mod tests_startup_helpers { ...@@ -2129,7 +2129,7 @@ mod tests_startup_helpers {
let overlap = router_indexer.find_matches(block_hashes_2).await.unwrap(); let overlap = router_indexer.find_matches(block_hashes_2).await.unwrap();
let router_overlap_after = overlap let router_overlap_after = overlap
.scores .scores
.get(&crate::kv_router::protocols::WorkerWithDpRank::from_worker_id(worker_1_id)) .get(&dynamo_kv_router::protocols::WorkerWithDpRank::from_worker_id(worker_1_id))
.copied() .copied()
.unwrap_or(0); .unwrap_or(0);
assert_eq!( assert_eq!(
...@@ -2193,7 +2193,7 @@ mod test_exponential_backoff { ...@@ -2193,7 +2193,7 @@ mod test_exponential_backoff {
#[cfg(all(test, feature = "integration"))] #[cfg(all(test, feature = "integration"))]
mod test_integration_publisher { mod test_integration_publisher {
use super::*; use super::*;
use crate::kv_router::protocols::ActiveLoad; use dynamo_kv_router::protocols::ActiveLoad;
use dynamo_runtime::distributed_test_utils::create_test_drt_async; use dynamo_runtime::distributed_test_utils::create_test_drt_async;
use dynamo_runtime::transports::event_plane::EventSubscriber; use dynamo_runtime::transports::event_plane::EventSubscriber;
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
use std::sync::Arc; use std::sync::Arc;
use anyhow::Result; use anyhow::Result;
use dynamo_kv_router::protocols::{TokensWithHashes, WorkerWithDpRank};
use dynamo_runtime::{ use dynamo_runtime::{
pipeline::{ pipeline::{
AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, PushRouter, ResponseStream, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, PushRouter, ResponseStream,
...@@ -21,7 +22,6 @@ use crate::{ ...@@ -21,7 +22,6 @@ use crate::{
CacheControlClient, KvRouter, CacheControlClient, KvRouter,
cache_control::{PinState, create_cache_control_client, spawn_pin_prefix}, cache_control::{PinState, create_cache_control_client, spawn_pin_prefix},
metrics::RouterRequestMetrics, metrics::RouterRequestMetrics,
protocols::{TokensWithHashes, WorkerWithDpRank},
}, },
preprocessor::PreprocessedRequest, preprocessor::PreprocessedRequest,
protocols::common::{ protocols::common::{
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use crate::kv_router::protocols::RouterEvent;
use crate::recorder::Recorder; use crate::recorder::Recorder;
use dynamo_kv_router::protocols::RouterEvent;
// Type alias for backward compatibility // Type alias for backward compatibility
pub type KvRecorder = Recorder<RouterEvent>; pub type KvRecorder = Recorder<RouterEvent>;
...@@ -10,9 +10,8 @@ pub type KvRecorder = Recorder<RouterEvent>; ...@@ -10,9 +10,8 @@ pub type KvRecorder = Recorder<RouterEvent>;
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::kv_router::indexer::KvIndexer; use dynamo_kv_router::indexer::{KvIndexer, KvIndexerMetrics};
use crate::kv_router::indexer::KvIndexerMetrics; use dynamo_kv_router::protocols::*;
use crate::kv_router::protocols::*;
use std::time::Duration; use std::time::Duration;
use tempfile::tempdir; use tempfile::tempdir;
use tokio::fs; use tokio::fs;
......
...@@ -7,11 +7,8 @@ pub use dynamo_kv_router::scheduling::{ ...@@ -7,11 +7,8 @@ pub use dynamo_kv_router::scheduling::{
}; };
pub use dynamo_kv_router::selector::DefaultWorkerSelector; pub use dynamo_kv_router::selector::DefaultWorkerSelector;
use super::KvRouterConfig;
use super::RouterConfigOverride;
use super::WorkerSelector; use super::WorkerSelector;
use super::metrics::ROUTER_QUEUE_METRICS; use super::metrics::ROUTER_QUEUE_METRICS;
use super::protocols::{OverlapScores, WorkerId};
use super::queue::SchedulerQueue; use super::queue::SchedulerQueue;
use super::sequence::{ use super::sequence::{
ActiveSequencesMulti, SequenceError, SequenceRequest, create_multi_worker_sequences, ActiveSequencesMulti, SequenceError, SequenceRequest, create_multi_worker_sequences,
...@@ -19,6 +16,10 @@ use super::sequence::{ ...@@ -19,6 +16,10 @@ use super::sequence::{
use crate::discovery::RuntimeConfigWatch; use crate::discovery::RuntimeConfigWatch;
use crate::local_model::runtime_config::ModelRuntimeConfig; use crate::local_model::runtime_config::ModelRuntimeConfig;
use anyhow::Result; use anyhow::Result;
use dynamo_kv_router::{
config::{KvRouterConfig, RouterConfigOverride},
protocols::{OverlapScores, WorkerId},
};
use dynamo_runtime::component::Component; use dynamo_runtime::component::Component;
use dynamo_runtime::traits::DistributedRuntimeProvider; use dynamo_runtime::traits::DistributedRuntimeProvider;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment