"vscode:/vscode.git/clone" did not exist on "bbbb6b2a013b3fb27ddd7d7132a9dd6026f3e7ad"
Unverified Commit b24b2e7e authored by Simo Lin's avatar Simo Lin Committed by GitHub
Browse files

[router] use dashmap for radix tree instead of hash for multi model (#10814)

parent 7135db5d
...@@ -63,8 +63,9 @@ use super::{get_healthy_worker_indices, CacheAwareConfig, LoadBalancingPolicy}; ...@@ -63,8 +63,9 @@ use super::{get_healthy_worker_indices, CacheAwareConfig, LoadBalancingPolicy};
use crate::core::Worker; use crate::core::Worker;
use crate::metrics::RouterMetrics; use crate::metrics::RouterMetrics;
use crate::tree::Tree; use crate::tree::Tree;
use std::collections::HashMap; use dashmap::DashMap;
use std::sync::{Arc, Mutex}; use rand::Rng;
use std::sync::Arc;
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
use tracing::debug; use tracing::debug;
...@@ -77,7 +78,7 @@ use tracing::debug; ...@@ -77,7 +78,7 @@ use tracing::debug;
#[derive(Debug)] #[derive(Debug)]
pub struct CacheAwarePolicy { pub struct CacheAwarePolicy {
config: CacheAwareConfig, config: CacheAwareConfig,
trees: Arc<Mutex<HashMap<String, Tree>>>, // model_id -> Tree trees: Arc<DashMap<String, Arc<Tree>>>,
eviction_handle: Option<thread::JoinHandle<()>>, eviction_handle: Option<thread::JoinHandle<()>>,
} }
...@@ -87,7 +88,7 @@ impl CacheAwarePolicy { ...@@ -87,7 +88,7 @@ impl CacheAwarePolicy {
} }
pub fn with_config(config: CacheAwareConfig) -> Self { pub fn with_config(config: CacheAwareConfig) -> Self {
let trees = Arc::new(Mutex::new(HashMap::<String, Tree>::new())); let trees = Arc::new(DashMap::<String, Arc<Tree>>::new());
// Start background eviction thread if configured // Start background eviction thread if configured
let eviction_handle = if config.eviction_interval_secs > 0 { let eviction_handle = if config.eviction_interval_secs > 0 {
...@@ -98,16 +99,16 @@ impl CacheAwarePolicy { ...@@ -98,16 +99,16 @@ impl CacheAwarePolicy {
Some(thread::spawn(move || loop { Some(thread::spawn(move || loop {
thread::sleep(Duration::from_secs(interval)); thread::sleep(Duration::from_secs(interval));
if let Ok(mut trees_guard) = trees_clone.lock() {
// Evict for all model trees // Evict for all model trees
for (model_id, tree) in trees_guard.iter_mut() { for tree_ref in trees_clone.iter() {
let model_id = tree_ref.key();
let tree = tree_ref.value();
tree.evict_tenant_by_size(max_tree_size); tree.evict_tenant_by_size(max_tree_size);
debug!( debug!(
"Cache eviction completed for model {}, max_size: {}", "Cache eviction completed for model {}, max_size: {}",
model_id, max_tree_size model_id, max_tree_size
); );
} }
}
})) }))
} else { } else {
None None
...@@ -122,9 +123,9 @@ impl CacheAwarePolicy { ...@@ -122,9 +123,9 @@ impl CacheAwarePolicy {
/// Initialize the tree with worker URLs (used only during initial setup) /// Initialize the tree with worker URLs (used only during initial setup)
pub fn init_workers(&self, workers: &[Arc<dyn Worker>]) { pub fn init_workers(&self, workers: &[Arc<dyn Worker>]) {
if let Ok(mut trees) = self.trees.lock() {
// Group workers by model // Group workers by model
let mut model_workers: HashMap<String, Vec<&Arc<dyn Worker>>> = HashMap::new(); let mut model_workers: std::collections::HashMap<String, Vec<&Arc<dyn Worker>>> =
std::collections::HashMap::new();
for worker in workers { for worker in workers {
// Use "default" for unknown/empty model_ids for backward compatibility // Use "default" for unknown/empty model_ids for backward compatibility
let model_id = worker.model_id(); let model_id = worker.model_id();
...@@ -141,17 +142,18 @@ impl CacheAwarePolicy { ...@@ -141,17 +142,18 @@ impl CacheAwarePolicy {
// Initialize tree for each model // Initialize tree for each model
for (tree_key, model_workers) in model_workers { for (tree_key, model_workers) in model_workers {
let tree = trees.entry(tree_key).or_insert_with(Tree::new); let tree = self
.trees
.entry(tree_key)
.or_insert_with(|| Arc::new(Tree::new()));
for worker in model_workers { for worker in model_workers {
tree.insert("", worker.url()); tree.insert("", worker.url());
} }
} }
} }
}
/// Add a single worker to the tree (incremental update) /// Add a single worker to the tree (incremental update)
pub fn add_worker(&self, worker: &dyn Worker) { pub fn add_worker(&self, worker: &dyn Worker) {
if let Ok(mut trees) = self.trees.lock() {
// For backward compatibility: if model_id is "unknown" or empty, // For backward compatibility: if model_id is "unknown" or empty,
// use a default tree. This preserves existing behavior for single-model routers. // use a default tree. This preserves existing behavior for single-model routers.
let model_id = worker.model_id(); let model_id = worker.model_id();
...@@ -160,22 +162,24 @@ impl CacheAwarePolicy { ...@@ -160,22 +162,24 @@ impl CacheAwarePolicy {
} else { } else {
model_id model_id
}; };
let tree = trees.entry(tree_key.to_string()).or_insert_with(Tree::new); let tree = self
.trees
.entry(tree_key.to_string())
.or_insert_with(|| Arc::new(Tree::new()));
tree.insert("", worker.url()); tree.insert("", worker.url());
} }
}
/// Add a worker by URL and model (for backward compatibility) /// Add a worker by URL and model (for backward compatibility)
pub fn add_worker_by_url(&self, url: &str, model_id: &str) { pub fn add_worker_by_url(&self, url: &str, model_id: &str) {
if let Ok(mut trees) = self.trees.lock() { let tree = self
let tree = trees.entry(model_id.to_string()).or_insert_with(Tree::new); .trees
.entry(model_id.to_string())
.or_insert_with(|| Arc::new(Tree::new()));
tree.insert("", url); tree.insert("", url);
} }
}
/// Remove a worker from the tree /// Remove a worker from the tree
pub fn remove_worker(&self, worker: &dyn Worker) { pub fn remove_worker(&self, worker: &dyn Worker) {
if let Ok(mut trees) = self.trees.lock() {
// Use same logic as add_worker for consistency // Use same logic as add_worker for consistency
let model_id = worker.model_id(); let model_id = worker.model_id();
let tree_key = if model_id.is_empty() || model_id == "unknown" { let tree_key = if model_id.is_empty() || model_id == "unknown" {
...@@ -183,26 +187,24 @@ impl CacheAwarePolicy { ...@@ -183,26 +187,24 @@ impl CacheAwarePolicy {
} else { } else {
model_id model_id
}; };
if let Some(tree) = trees.get_mut(tree_key) { if let Some(tree) = self.trees.get(tree_key) {
tree.remove_tenant(worker.url()); tree.remove_tenant(worker.url());
} }
} }
}
/// Remove a worker by URL (removes from all model trees for backward compatibility) /// Remove a worker by URL (removes from all model trees for backward compatibility)
pub fn remove_worker_by_url(&self, url: &str) { pub fn remove_worker_by_url(&self, url: &str) {
if let Ok(mut trees) = self.trees.lock() {
// Remove from all trees since we don't know which model it belongs to // Remove from all trees since we don't know which model it belongs to
for (_model_id, tree) in trees.iter_mut() { for tree_ref in self.trees.iter() {
tree.remove_tenant(url); tree_ref.value().remove_tenant(url);
}
} }
} }
/// Run cache eviction to prevent unbounded growth /// Run cache eviction to prevent unbounded growth
pub fn evict_cache(&self, max_size: usize) { pub fn evict_cache(&self, max_size: usize) {
if let Ok(mut trees) = self.trees.lock() { for tree_ref in self.trees.iter() {
for (model_id, tree) in trees.iter_mut() { let model_id = tree_ref.key();
let tree = tree_ref.value();
tree.evict_tenant_by_size(max_size); tree.evict_tenant_by_size(max_size);
debug!( debug!(
"Cache eviction for model {}, max_size: {}", "Cache eviction for model {}, max_size: {}",
...@@ -210,7 +212,6 @@ impl CacheAwarePolicy { ...@@ -210,7 +212,6 @@ impl CacheAwarePolicy {
); );
} }
} }
}
} }
impl LoadBalancingPolicy for CacheAwarePolicy { impl LoadBalancingPolicy for CacheAwarePolicy {
...@@ -266,20 +267,18 @@ impl LoadBalancingPolicy for CacheAwarePolicy { ...@@ -266,20 +267,18 @@ impl LoadBalancingPolicy for CacheAwarePolicy {
// Even in imbalanced mode, update the tree to maintain cache state // Even in imbalanced mode, update the tree to maintain cache state
if let Some(text) = request_text { if let Some(text) = request_text {
if let Ok(mut trees) = self.trees.lock() { // Get the tree reference without locking the entire HashMap
// Avoid allocation if tree already exists // DashMap only locks the specific shard containing this key
let tree = if let Some(tree) = trees.get_mut(model_id) { let tree = self.trees.get(model_id).map(|entry| entry.value().clone());
tree
} else { if let Some(tree) = tree {
// Create new tree and initialize with all workers // Now we can work with the tree without holding the HashMap lock
let new_tree = Tree::new();
// Initialize with all healthy workers like OLD version does
for &idx in &healthy_indices {
new_tree.insert("", workers[idx].url());
}
trees.entry(model_id.to_string()).or_insert(new_tree)
};
tree.insert(text, workers[min_load_idx].url()); tree.insert(text, workers[min_load_idx].url());
} else {
debug!(
"Warning: No tree found for model '{}', skipping cache update",
model_id
);
} }
} }
...@@ -294,19 +293,12 @@ impl LoadBalancingPolicy for CacheAwarePolicy { ...@@ -294,19 +293,12 @@ impl LoadBalancingPolicy for CacheAwarePolicy {
// Use cache-aware routing when balanced // Use cache-aware routing when balanced
let text = request_text.unwrap_or(""); let text = request_text.unwrap_or("");
if let Ok(mut trees) = self.trees.lock() { // Get the tree reference without locking the entire HashMap
// Avoid allocation if tree already exists // DashMap only locks the specific shard containing this key
let tree = if let Some(tree) = trees.get_mut(model_id) { let tree = self.trees.get(model_id).map(|entry| entry.value().clone());
tree
} else { if let Some(tree) = tree {
// Create new tree and initialize with all workers // Now we work with the tree without holding the HashMap lock
let new_tree = Tree::new();
// Initialize with all healthy workers like OLD version does
for &idx in &healthy_indices {
new_tree.insert("", workers[idx].url());
}
trees.entry(model_id.to_string()).or_insert(new_tree)
};
let (matched_text, matched_worker) = tree.prefix_match(text); let (matched_text, matched_worker) = tree.prefix_match(text);
let match_rate = if text.is_empty() { let match_rate = if text.is_empty() {
0.0 0.0
...@@ -324,7 +316,7 @@ impl LoadBalancingPolicy for CacheAwarePolicy { ...@@ -324,7 +316,7 @@ impl LoadBalancingPolicy for CacheAwarePolicy {
// Find the index of the selected worker // Find the index of the selected worker
if let Some(selected_idx) = workers.iter().position(|w| w.url() == selected_url) { if let Some(selected_idx) = workers.iter().position(|w| w.url() == selected_url) {
// Only proceed if the worker is healthy - use direct check like OLD version // Only proceed if the worker is healthy
if workers[selected_idx].is_healthy() { if workers[selected_idx].is_healthy() {
// Update the tree with this request // Update the tree with this request
tree.insert(text, &selected_url); tree.insert(text, &selected_url);
...@@ -342,11 +334,18 @@ impl LoadBalancingPolicy for CacheAwarePolicy { ...@@ -342,11 +334,18 @@ impl LoadBalancingPolicy for CacheAwarePolicy {
} }
// Fallback to first healthy worker // Fallback to first healthy worker
return healthy_indices.first().copied();
}
// Fallback to first healthy worker if tree operations fail
healthy_indices.first().copied() healthy_indices.first().copied()
} else {
// No tree for this model, log warning and use random selection
debug!(
"Warning: No tree found for model '{}', using random worker selection",
model_id
);
// Return a random healthy worker
let mut rng = rand::rng();
let random_idx = rng.random_range(0..healthy_indices.len());
Some(healthy_indices[random_idx])
}
} }
fn name(&self) -> &'static str { fn name(&self) -> &'static str {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment