Unverified Commit d849f7ec authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

feat: expose router configurations to dynamo-run (#1259)

parent 3f6a7472
......@@ -56,7 +56,7 @@ async fn app(runtime: Runtime) -> Result<()> {
// the cli when operating on an `http` component will validate the namespace.component is
// registered with HttpServiceComponentDefinition
let watch_obj = ModelWatcher::new(distributed.clone(), manager, RouterMode::Random);
let watch_obj = ModelWatcher::new(distributed.clone(), manager, RouterMode::Random, None);
if let Some(etcd_client) = distributed.etcd_client() {
let models_watcher: PrefixWatcher =
......
# Running Dynamo (`dynamo run`)
* [Quickstart with pip and vllm](#quickstart-with-pip-and-vllm)
* [Automatically download a model from Hugging Face](#use-model-from-hugging-face)
* [Run a model from local file](#run-a-model-from-local-file)
* [Distributed system](#distributed-system)
* [Network names](#network-names)
* [KV-aware routing](#kv-aware-routing)
* [Full usage details](#full-usage-details)
* [Setup](#setup)
* [mistral.rs](#mistralrs)
* [llama.cpp](#llamacpp)
* [Sglang](#sglang)
* [Vllm](#vllm)
* [TensorRT-LLM](#trtllm)
* [Echo Engines](#echo-engines)
* [Writing your own engine in Python](#writing-your-own-engine-in-python)
* [Batch mode](#batch-mode)
* [Defaults](#defaults)
* [Extra engine arguments](#extra-engine-arguments)
- [Running Dynamo (`dynamo run`)](#running-dynamo-dynamo-run)
- [Quickstart with pip and vllm](#quickstart-with-pip-and-vllm)
- [Use model from Hugging Face](#use-model-from-hugging-face)
- [Run a model from local file](#run-a-model-from-local-file)
- [Download model from Hugging Face](#download-model-from-hugging-face)
- [Run model from local file](#run-model-from-local-file)
- [Distributed System](#distributed-system)
- [Network names](#network-names)
- [KV-aware routing](#kv-aware-routing)
- [Full usage details](#full-usage-details)
- [Getting Started](#getting-started)
- [Setup](#setup)
- [Step 1: Install libraries](#step-1-install-libraries)
- [Step 2: Install Rust](#step-2-install-rust)
- [Step 3: Build](#step-3-build)
- [Defaults](#defaults)
- [Running Inference with Pre-built Engines](#running-inference-with-pre-built-engines)
- [mistralrs](#mistralrs)
- [llamacpp](#llamacpp)
- [sglang](#sglang)
- [vllm](#vllm)
- [trtllm](#trtllm)
- [Step 1: Build the environment](#step-1-build-the-environment)
- [Step 2: Run the environment](#step-2-run-the-environment)
- [Step 3: Execute `dynamo run` command](#step-3-execute-dynamo-run-command)
- [Echo Engines](#echo-engines)
- [echo\_core](#echo_core)
- [echo\_full](#echo_full)
- [Configuration](#configuration)
- [Batch mode](#batch-mode)
- [Extra engine arguments](#extra-engine-arguments)
- [Writing your own engine in Python](#writing-your-own-engine-in-python)
This guide explains the`dynamo run` command.
......@@ -28,7 +41,7 @@ It supports these engines: mistralrs, llamacpp, sglang, vllm, and tensorrt-llm.
Usage:
```
dynamo-run in=[http|text|dyn://<path>|batch:<folder>] out=echo_core|echo_full|mistralrs|llamacpp|sglang|vllm|dyn [--http-port 8080] [--model-path <path>] [--model-name <served-model-name>] [--model-config <hf-repo>] [--tensor-parallel-size=1] [--context-length=N] [--num-nodes=1] [--node-rank=0] [--leader-addr=127.0.0.1:9876] [--base-gpu-id=0] [--extra-engine-args=args.json] [--router-mode random|round-robin|kv]
dynamo-run in=[http|text|dyn://<path>|batch:<folder>] out=echo_core|echo_full|mistralrs|llamacpp|sglang|vllm|dyn [--http-port 8080] [--model-path <path>] [--model-name <served-model-name>] [--model-config <hf-repo>] [--tensor-parallel-size=1] [--context-length=N] [--num-nodes=1] [--node-rank=0] [--leader-addr=127.0.0.1:9876] [--base-gpu-id=0] [--extra-engine-args=args.json] [--router-mode random|round-robin|kv] [--kv-overlap-score-weight=2.0] [--kv-gpu-cache-usage-weight=1.0] [--kv-waiting-requests-weight=1.0]
```
Example: `dynamo run Qwen/Qwen3-0.6B`
......
......@@ -17,6 +17,7 @@ use std::collections::HashMap;
use std::path::PathBuf;
use clap::ValueEnum;
use dynamo_llm::kv_router::KvRouterConfig;
use dynamo_runtime::pipeline::RouterMode as RuntimeRouterMode;
/// Required options depend on the in and out choices
......@@ -105,6 +106,21 @@ pub struct Flags {
#[arg(long, default_value = "round-robin")]
pub router_mode: RouterMode,
/// KV Router: Weight for overlap score in worker selection.
/// Higher values prioritize KV cache reuse. Default: 2.0
#[arg(long)]
pub kv_overlap_score_weight: Option<f64>,
/// KV Router: Weight for GPU cache usage in worker selection.
/// Higher values avoid workers with nearly full KV caches. Default: 1.0
#[arg(long)]
pub kv_gpu_cache_usage_weight: Option<f64>,
/// KV Router: Weight for waiting requests in worker selection.
/// Higher values avoid workers with queued requests. Default: 1.0
#[arg(long)]
pub kv_waiting_requests_weight: Option<f64>,
/// Max model context length. Reduce this if you don't have enough VRAM for the full model
/// context length (e.g. Llama 4).
/// Defaults to the model's max, which is usually model_max_length in tokenizer_config.json.
......@@ -138,6 +154,15 @@ pub struct Flags {
}
impl Flags {
/// Get KV router configuration
pub fn kv_router_config(&self) -> KvRouterConfig {
KvRouterConfig::new(
self.kv_overlap_score_weight,
self.kv_gpu_cache_usage_weight,
self.kv_waiting_requests_weight,
)
}
/// Convert the flags back to a command line. Including only the non-null values, but
/// include the defaults. Includes the canonicalized model path and normalized model name.
///
......@@ -175,6 +200,18 @@ impl Flags {
out.push("--extra-engine-args".to_string());
out.push(extra_engine_args.display().to_string());
}
if let Some(weight) = self.kv_overlap_score_weight {
out.push("--kv-overlap-score-weight".to_string());
out.push(weight.to_string());
}
if let Some(weight) = self.kv_gpu_cache_usage_weight {
out.push("--kv-gpu-cache-usage-weight".to_string());
out.push(weight.to_string());
}
if let Some(weight) = self.kv_waiting_requests_weight {
out.push("--kv-waiting-requests-weight".to_string());
out.push(weight.to_string());
}
out.extend(self.last.clone());
out
}
......
......@@ -50,6 +50,7 @@ pub async fn prepare_engine(
distributed_runtime,
model_manager.clone(),
dynamo_runtime::pipeline::RouterMode::RoundRobin,
None,
));
let models_watcher = etcd_client.kv_get_and_watch_prefix(MODEL_ROOT_PATH).await?;
let (_prefix, _watcher, receiver) = models_watcher.dissolve();
......
......@@ -5,6 +5,7 @@ use std::sync::Arc;
use crate::input::common;
use crate::{EngineConfig, Flags};
use dynamo_llm::kv_router::KvRouterConfig;
use dynamo_llm::{
discovery::{ModelManager, ModelWatcher, MODEL_ROOT_PATH},
engines::StreamingEngineAdapter,
......@@ -46,6 +47,7 @@ pub async fn run(
etcd_client.clone(),
MODEL_ROOT_PATH,
flags.router_mode.into(),
Some(flags.kv_router_config()),
)
.await?;
}
......@@ -102,8 +104,9 @@ async fn run_watcher(
etcd_client: etcd::Client,
network_prefix: &str,
router_mode: RouterMode,
kv_router_config: Option<KvRouterConfig>,
) -> anyhow::Result<()> {
let watch_obj = ModelWatcher::new(runtime, model_manager, router_mode);
let watch_obj = ModelWatcher::new(runtime, model_manager, router_mode, kv_router_config);
tracing::info!("Watching for remote model at {network_prefix}");
let models_watcher = etcd_client.kv_get_and_watch_prefix(network_prefix).await?;
let (_prefix, _watcher, receiver) = models_watcher.dissolve();
......
......@@ -30,7 +30,7 @@ Example:
- OR: ./dynamo-run /data/models/Llama-3.2-1B-Instruct-Q4_K_M.gguf
"#;
const USAGE: &str = "USAGE: dynamo-run in=[http|text|dyn://<path>|batch:<folder>] out=ENGINE_LIST|dyn [--http-port 8080] [--model-path <path>] [--model-name <served-model-name>] [--model-config <hf-repo>] [--tensor-parallel-size=1] [--context-length=N] [--kv-cache-block-size=16] [--num-nodes=1] [--node-rank=0] [--leader-addr=127.0.0.1:9876] [--base-gpu-id=0] [--extra-engine-args=args.json] [--router-mode random|round-robin|kv]";
const USAGE: &str = "USAGE: dynamo-run in=[http|text|dyn://<path>|batch:<folder>] out=ENGINE_LIST|dyn [--http-port 8080] [--model-path <path>] [--model-name <served-model-name>] [--model-config <hf-repo>] [--tensor-parallel-size=1] [--context-length=N] [--kv-cache-block-size=16] [--num-nodes=1] [--node-rank=0] [--leader-addr=127.0.0.1:9876] [--base-gpu-id=0] [--extra-engine-args=args.json] [--router-mode random|round-robin|kv] [--kv-overlap-score-weight=2.0] [--kv-gpu-cache-usage-weight=1.0] [--kv-waiting-requests-weight=1.0]";
fn main() -> anyhow::Result<()> {
// Set log level based on verbosity flag
......
......@@ -258,6 +258,7 @@ async fn list_models(
distributed.clone(),
Arc::new(ModelManager::new()),
RouterMode::Random,
None,
);
let mut models = Vec::new();
......@@ -313,6 +314,7 @@ async fn remove_model(
distributed.clone(),
Arc::new(ModelManager::new()),
RouterMode::Random,
None,
);
let Some(etcd_client) = distributed.etcd_client() else {
anyhow::bail!("llmctl is only useful with dynamic workers");
......
......@@ -5,7 +5,7 @@ use dynamo_runtime::component::Component;
use crate::discovery::ModelEntry;
use crate::kv_router::scheduler::DefaultWorkerSelector;
use crate::kv_router::{scheduler::DefaultWorkerSelector, KvRouterConfig};
use crate::{
kv_router::KvRouter,
types::openai::{
......@@ -183,6 +183,7 @@ impl ModelManager {
model_name: &str,
component: &Component,
kv_cache_block_size: usize,
kv_router_config: Option<KvRouterConfig>,
) -> anyhow::Result<Arc<KvRouter>> {
if let Some(kv_chooser) = self.get_kv_chooser(model_name) {
// Check if the existing router has a different block size
......@@ -197,7 +198,7 @@ impl ModelManager {
}
return Ok(kv_chooser);
}
self.create_kv_chooser(model_name, component, kv_cache_block_size)
self.create_kv_chooser(model_name, component, kv_cache_block_size, kv_router_config)
.await
}
......@@ -211,8 +212,9 @@ impl ModelManager {
model_name: &str,
component: &Component,
kv_cache_block_size: usize,
kv_router_config: Option<KvRouterConfig>,
) -> anyhow::Result<Arc<KvRouter>> {
let selector = Box::new(DefaultWorkerSelector {});
let selector = Box::new(DefaultWorkerSelector::new(kv_router_config));
let chooser = KvRouter::new(component.clone(), kv_cache_block_size, Some(selector)).await?;
let new_kv_chooser = Arc::new(chooser);
self.kv_choosers
......
......@@ -18,7 +18,7 @@ use dynamo_runtime::{
use crate::{
backend::Backend,
kv_router::KvPushRouter,
kv_router::{KvPushRouter, KvRouterConfig},
model_type::ModelType,
preprocessor::{OpenAIPreprocessor, PreprocessedRequest},
protocols::common::llm_backend::LLMEngineOutput,
......@@ -36,6 +36,7 @@ pub struct ModelWatcher {
drt: DistributedRuntime,
router_mode: RouterMode,
notify_on_model: Notify,
kv_router_config: Option<KvRouterConfig>,
}
impl ModelWatcher {
......@@ -43,12 +44,14 @@ impl ModelWatcher {
runtime: DistributedRuntime,
model_manager: Arc<ModelManager>,
router_mode: RouterMode,
kv_router_config: Option<KvRouterConfig>,
) -> ModelWatcher {
Self {
manager: model_manager,
drt: runtime,
router_mode,
notify_on_model: Notify::new(),
kv_router_config,
}
}
......@@ -209,7 +212,12 @@ impl ModelWatcher {
RouterMode::KV => {
let chooser = self
.manager
.kv_chooser_for(&model_entry.name, &component, card.kv_cache_block_size)
.kv_chooser_for(
&model_entry.name,
&component,
card.kv_cache_block_size,
self.kv_router_config.clone(),
)
.await?;
let kv_push_router = KvPushRouter::new(router, chooser);
ServiceBackend::from_engine(Arc::new(kv_push_router))
......@@ -245,7 +253,12 @@ impl ModelWatcher {
RouterMode::KV => {
let chooser = self
.manager
.kv_chooser_for(&model_entry.name, &component, card.kv_cache_block_size)
.kv_chooser_for(
&model_entry.name,
&component,
card.kv_cache_block_size,
self.kv_router_config.clone(),
)
.await?;
let kv_push_router = KvPushRouter::new(router, chooser);
ServiceBackend::from_engine(Arc::new(kv_push_router))
......
......@@ -54,6 +54,51 @@ pub trait WorkerSelector {
) -> Result<WorkerSelectionResult, KvSchedulerError>;
}
/// KV Router configuration parameters
#[derive(Debug, Clone)]
pub struct KvRouterConfig {
/// Weight for overlap score in worker selection.
/// Higher values prioritize KV cache reuse. Default: 2.0
pub overlap_score_weight: f64,
/// Weight for GPU cache usage in worker selection.
/// Higher values avoid workers with nearly full KV caches. Default: 1.0
pub gpu_cache_usage_weight: f64,
/// Weight for waiting requests in worker selection.
/// Higher values avoid workers with queued requests. Default: 1.0
pub waiting_requests_weight: f64,
}
impl Default for KvRouterConfig {
fn default() -> Self {
Self {
overlap_score_weight: 2.0,
gpu_cache_usage_weight: 1.0,
waiting_requests_weight: 1.0,
}
}
}
impl KvRouterConfig {
/// Create a new KvRouterConfig with optional weight values.
/// If a weight is None, the default value will be used.
pub fn new(
overlap_score_weight: Option<f64>,
gpu_cache_usage_weight: Option<f64>,
waiting_requests_weight: Option<f64>,
) -> Self {
let default = Self::default();
Self {
overlap_score_weight: overlap_score_weight.unwrap_or(default.overlap_score_weight),
gpu_cache_usage_weight: gpu_cache_usage_weight
.unwrap_or(default.gpu_cache_usage_weight),
waiting_requests_weight: waiting_requests_weight
.unwrap_or(default.waiting_requests_weight),
}
}
}
/// A KvRouter only decides which worker you should use. It doesn't send you there.
/// TODO: Rename this to indicate it only selects a worker, it does not route.
pub struct KvRouter {
......
......@@ -20,14 +20,14 @@ use serde::{Deserialize, Serialize};
use std::borrow::BorrowMut;
use std::collections::HashMap;
use super::protocols::WorkerSelectionResult;
use super::WorkerSelector;
use crate::kv_router::indexer::OverlapScores;
pub use crate::kv_router::protocols::ForwardPassMetrics;
use crate::kv_router::scoring::ProcessedEndpoints;
use crate::kv_router::KvRouterConfig;
use crate::kv_router::KV_HIT_RATE_SUBJECT;
use super::protocols::WorkerSelectionResult;
use super::WorkerSelector;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KVHitRateEvent {
pub worker_id: i64,
......@@ -96,7 +96,7 @@ impl KvScheduler {
endpoints_rx: tokio::sync::watch::Receiver<ProcessedEndpoints>,
selector: Option<Box<dyn WorkerSelector + Send + Sync>>,
) -> Result<Self, KvSchedulerError> {
let selector = selector.unwrap_or(Box::new(DefaultWorkerSelector));
let selector = selector.unwrap_or(Box::new(DefaultWorkerSelector::default()));
let mut endpoints_rx = endpoints_rx;
let mut endpoints: ProcessedEndpoints = endpoints_rx.borrow_and_update().clone();
......@@ -231,8 +231,18 @@ pub fn process_worker_selection(
}
// Default implementation matching the Python _cost_function
#[derive(Default)]
pub struct DefaultWorkerSelector;
#[derive(Debug, Clone, Default)]
pub struct DefaultWorkerSelector {
pub kv_router_config: KvRouterConfig,
}
impl DefaultWorkerSelector {
pub fn new(kv_router_config: Option<KvRouterConfig>) -> Self {
Self {
kv_router_config: kv_router_config.unwrap_or_default(),
}
}
}
impl WorkerSelector for DefaultWorkerSelector {
fn select_worker(
......@@ -285,10 +295,15 @@ impl WorkerSelector for DefaultWorkerSelector {
};
// Calculate logit using same formula as Python
let logit = 2.0 * score - gpu_cache_usage - normalized_waiting;
let logit = self.kv_router_config.overlap_score_weight * score
- self.kv_router_config.gpu_cache_usage_weight * gpu_cache_usage
- self.kv_router_config.waiting_requests_weight * normalized_waiting;
tracing::trace!(
"Formula for {worker_id}: {logit:.3} = 2.0 * {score:.3} - {gpu_cache_usage:.3} - {normalized_waiting:.3}",
"Formula for {worker_id}: {logit:.3} = {:.1} * {score:.3} - {:.1} * {gpu_cache_usage:.3} - {:.1} * {normalized_waiting:.3}",
self.kv_router_config.overlap_score_weight,
self.kv_router_config.gpu_cache_usage_weight,
self.kv_router_config.waiting_requests_weight,
);
// Track best workers
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment