// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 //! A Model represents a named model (e.g., "llama-3-70b") that may be served by //! one or more WorkerSets. Each WorkerSet corresponds to a namespace. //! //! Requests are routed to a WorkerSet selected by weighted random (proportional to worker count). use std::sync::Arc; use dashmap::DashMap; use rand::Rng; use super::worker_monitor::LoadThresholdConfig; use super::worker_set::WorkerSet; use super::{KvWorkerMonitor, ModelManagerError}; use crate::protocols::openai::ParsingOptions; use crate::types::{ generic::tensor::TensorStreamingEngine, openai::{ audios::OpenAIAudiosStreamingEngine, chat_completions::OpenAIChatCompletionsStreamingEngine, completions::OpenAICompletionsStreamingEngine, embeddings::OpenAIEmbeddingsStreamingEngine, images::OpenAIImagesStreamingEngine, videos::OpenAIVideosStreamingEngine, }, }; /// A named model backed by one or more WorkerSets. pub struct Model { name: String, worker_sets: DashMap>, } impl Model { pub fn new(name: String) -> Self { Self { name, worker_sets: DashMap::new(), } } pub fn name(&self) -> &str { &self.name } /// Add a WorkerSet to this model. pub fn add_worker_set(&self, namespace: String, worker_set: Arc) { tracing::info!( model = %self.name, namespace = %namespace, "Adding worker set to model" ); self.worker_sets.insert(namespace, worker_set); } /// Check whether a candidate checksum is compatible with an existing WorkerSet /// identified by `ws_key`. pub fn is_checksum_compatible(&self, ws_key: &str, candidate_checksum: &str) -> bool { match self.worker_sets.get(ws_key) { Some(existing_ws) => existing_ws.mdcsum() == candidate_checksum, None => true, } } pub fn remove_worker_set(&self, namespace: &str) -> Option> { let removed = self.worker_sets.remove(namespace).map(|(_, ws)| ws); if removed.is_some() { tracing::info!( model = %self.name, namespace = %namespace, remaining_sets = self.worker_sets.len(), "Removed worker set from model" ); } removed } pub fn has_worker_set(&self, namespace: &str) -> bool { self.worker_sets.contains_key(namespace) } pub fn get_worker_set(&self, namespace: &str) -> Option> { self.worker_sets .get(namespace) .map(|entry| entry.value().clone()) } pub fn is_empty(&self) -> bool { self.worker_sets.is_empty() } pub fn worker_set_count(&self) -> usize { self.worker_sets.len() } /// Check if this model has any decode engine (chat or completions) across any WorkerSet. pub fn has_decode_engine(&self) -> bool { self.worker_sets .iter() .any(|entry| entry.value().has_decode_engine()) } /// Check if this model tracks prefill (any WorkerSet is a prefill set). pub fn has_prefill(&self) -> bool { self.worker_sets .iter() .any(|entry| entry.value().is_prefill_set()) } /// Check if any WorkerSet has a chat engine. pub fn has_chat_engine(&self) -> bool { self.worker_sets .iter() .any(|entry| entry.value().has_chat_engine()) } /// Check if any WorkerSet has a completions engine. pub fn has_completions_engine(&self) -> bool { self.worker_sets .iter() .any(|entry| entry.value().has_completions_engine()) } /// Check if any WorkerSet has an embeddings engine. pub fn has_embeddings_engine(&self) -> bool { self.worker_sets .iter() .any(|entry| entry.value().has_embeddings_engine()) } /// Check if any WorkerSet has a tensor engine. pub fn has_tensor_engine(&self) -> bool { self.worker_sets .iter() .any(|entry| entry.value().has_tensor_engine()) } /// Check if any WorkerSet has an images engine. pub fn has_images_engine(&self) -> bool { self.worker_sets .iter() .any(|entry| entry.value().has_images_engine()) } /// Check if any WorkerSet has a videos engine. pub fn has_videos_engine(&self) -> bool { self.worker_sets .iter() .any(|entry| entry.value().has_videos_engine()) } /// Check if any WorkerSet has an audios engine. pub fn has_audios_engine(&self) -> bool { self.worker_sets .iter() .any(|entry| entry.value().has_audios_engine()) } /// Whether this model should be visible in /v1/models. pub fn is_displayable(&self) -> bool { let has_serving_engine = |ws: &WorkerSet| { ws.has_chat_engine() || ws.has_completions_engine() || ws.has_embeddings_engine() || ws.has_images_engine() || ws.has_tensor_engine() || ws.has_videos_engine() || ws.has_audios_engine() }; let has_any_serving_engine = self.worker_sets.iter().any(|entry| { let ws = entry.value(); has_serving_engine(ws.as_ref()) }); self.worker_sets.iter().any(|entry| { let ws = entry.value(); if ws.worker_count() == 0 || !ws.can_serve_requests() { return false; } has_serving_engine(ws.as_ref()) || (!has_any_serving_engine && ws.is_prefill_set()) }) } // -- Engine accessors: select a WorkerSet, return its engine -- pub fn get_chat_engine( &self, ) -> Result { self.select_worker_set_with(|ws| ws.chat_engine.clone()) .ok_or_else(|| self.engine_error(self.has_chat_engine())) } pub fn get_completions_engine( &self, ) -> Result { self.select_worker_set_with(|ws| ws.completions_engine.clone()) .ok_or_else(|| self.engine_error(self.has_completions_engine())) } pub fn get_embeddings_engine( &self, ) -> Result { self.select_worker_set_with(|ws| ws.embeddings_engine.clone()) .ok_or_else(|| self.engine_error(self.has_embeddings_engine())) } pub fn get_images_engine(&self) -> Result { self.select_worker_set_with(|ws| ws.images_engine.clone()) .ok_or_else(|| self.engine_error(self.has_images_engine())) } pub fn get_videos_engine(&self) -> Result { self.select_worker_set_with(|ws| ws.videos_engine.clone()) .ok_or_else(|| self.engine_error(self.has_videos_engine())) } pub fn get_audios_engine(&self) -> Result { self.select_worker_set_with(|ws| ws.audios_engine.clone()) .ok_or_else(|| self.engine_error(self.has_audios_engine())) } pub fn get_tensor_engine(&self) -> Result { self.select_worker_set_with(|ws| ws.tensor_engine.clone()) .ok_or_else(|| self.engine_error(self.has_tensor_engine())) } // -- Combined engine + parsing options (atomically from one WorkerSet) -- pub fn get_chat_engine_with_parsing( &self, ) -> Result<(OpenAIChatCompletionsStreamingEngine, ParsingOptions), ModelManagerError> { self.select_worker_set_with(|ws| ws.chat_engine.clone().map(|e| (e, ws.parsing_options()))) .ok_or_else(|| self.engine_error(self.has_chat_engine())) } pub fn get_completions_engine_with_parsing( &self, ) -> Result<(OpenAICompletionsStreamingEngine, ParsingOptions), ModelManagerError> { self.select_worker_set_with(|ws| { ws.completions_engine .clone() .map(|e| (e, ws.parsing_options())) }) .ok_or_else(|| self.engine_error(self.has_completions_engine())) } // -- Worker monitoring (aggregated across WorkerSets) -- /// Get load threshold config from the first WorkerSet that has a monitor. /// When `config` is Some, updates ALL monitors (each WorkerSet has its own). pub fn load_threshold_config( &self, config: Option<&LoadThresholdConfig>, ) -> Option { let mut result = None; for entry in self.worker_sets.iter() { if let Some(ref monitor) = entry.value().worker_monitor { if let Some(cfg) = config { monitor.set_load_threshold_config(cfg); } if result.is_none() { result = Some(monitor.load_threshold_config()); } } } result } /// Get the worker monitor for a specific namespace's WorkerSet. pub fn get_worker_monitor_for_namespace(&self, namespace: &str) -> Option { self.worker_sets .get(namespace) .and_then(|entry| entry.value().worker_monitor.clone()) } /// Total worker count across all WorkerSets. pub fn total_workers(&self) -> usize { self.worker_sets .iter() .map(|entry| entry.value().worker_count()) .sum() } // -- Internal helpers -- /// Return the appropriate error when no servable WorkerSet was found. /// If the engine exists but no WorkerSet can serve (zero workers, prefill not activated, /// etc.), return ModelUnavailable (maps to 503). Otherwise ModelNotFound (maps to 404). fn engine_error(&self, engine_exists: bool) -> ModelManagerError { if engine_exists { ModelManagerError::ModelUnavailable(self.name.clone()) } else { ModelManagerError::ModelNotFound(self.name.clone()) } } // -- Internal selection -- /// Select a WorkerSet and extract a value from it. /// /// When there's only one set (steady state), returns from that set directly. /// With multiple sets, uses weighted random selection proportional /// to worker count, filtering to sets that have the requested engine. /// /// The `extract` closure should return `Some(value)` if the WorkerSet has the /// desired engine, or `None` if it doesn't. fn select_worker_set_with(&self, extract: F) -> Option where F: Fn(&WorkerSet) -> Option, { // Fast path: single set (same zero-worker filtering as the multi-set path below) if self.worker_sets.len() == 1 { return self.worker_sets.iter().next().and_then(|entry| { let ws = entry.value(); if ws.worker_count() == 0 || !ws.can_serve_requests() { return None; } extract(ws) }); } // Collect eligible sets with their worker counts, skipping sets with no workers // or sets whose prefill router has died under enforce_disagg. // In-process models (no discovery watcher) return count=1, so they always participate. // Discovery models with count=0 have no available workers and are skipped. let eligible: Vec<(T, usize)> = self .worker_sets .iter() .filter_map(|entry| { let ws = entry.value(); let count = ws.worker_count(); if count == 0 || !ws.can_serve_requests() { return None; } extract(ws).map(|val| (val, count)) }) .collect(); if eligible.is_empty() { return None; } if eligible.len() == 1 { return eligible.into_iter().next().map(|(val, _)| val); } // Weighted random selection proportional to worker count let total_weight: usize = eligible.iter().map(|(_, w)| w).sum(); let mut pick = rand::rng().random_range(0..total_weight); for (val, weight) in eligible { if pick < weight { return Some(val); } pick -= weight; } // Should not reach here, but fallback to None None } } #[cfg(test)] mod tests { use super::*; use crate::model_card::ModelDeploymentCard; use tokio::sync::watch; fn make_worker_set(namespace: &str, mdcsum: &str) -> Arc { Arc::new(WorkerSet::new( namespace.to_string(), mdcsum.to_string(), ModelDeploymentCard::default(), )) } /// Create a WorkerSet backed by a watch channel so worker_count reflects the vec length. fn make_worker_set_with_count( namespace: &str, mdcsum: &str, worker_ids: Vec, ) -> (Arc, watch::Sender>) { let (tx, rx) = watch::channel(worker_ids); let mut ws = WorkerSet::new( namespace.to_string(), mdcsum.to_string(), ModelDeploymentCard::default(), ); ws.set_instance_watcher(rx); (Arc::new(ws), tx) } #[test] fn test_model_new() { let model = Model::new("llama".to_string()); assert_eq!(model.name(), "llama"); assert!(model.is_empty()); assert_eq!(model.worker_set_count(), 0); } #[test] fn test_add_remove_worker_set() { let model = Model::new("llama".to_string()); let ws = make_worker_set("ns1", "abc"); model.add_worker_set("ns1".to_string(), ws); assert!(!model.is_empty()); assert_eq!(model.worker_set_count(), 1); assert!(model.has_worker_set("ns1")); assert!(!model.has_worker_set("ns2")); let removed = model.remove_worker_set("ns1"); assert!(removed.is_some()); assert!(model.is_empty()); let removed_again = model.remove_worker_set("ns1"); assert!(removed_again.is_none()); } #[test] fn test_get_worker_set() { let model = Model::new("llama".to_string()); let ws = make_worker_set("ns1", "abc"); model.add_worker_set("ns1".to_string(), ws); let retrieved = model.get_worker_set("ns1"); assert!(retrieved.is_some()); assert_eq!(retrieved.unwrap().namespace(), "ns1"); assert!(model.get_worker_set("ns2").is_none()); } #[test] fn test_multiple_worker_sets_same_checksum() { let model = Model::new("llama".to_string()); model.add_worker_set("ns1".to_string(), make_worker_set("ns1", "abc")); model.add_worker_set("ns2".to_string(), make_worker_set("ns2", "abc")); assert_eq!(model.worker_set_count(), 2); assert!(model.has_worker_set("ns1")); assert!(model.has_worker_set("ns2")); model.remove_worker_set("ns1"); assert_eq!(model.worker_set_count(), 1); assert!(!model.has_worker_set("ns1")); assert!(model.has_worker_set("ns2")); } #[test] fn test_multiple_worker_sets_different_checksums() { // Different namespaces are allowed to have different checksums let model = Model::new("llama".to_string()); model.add_worker_set("ns1".to_string(), make_worker_set("ns1", "abc")); model.add_worker_set("ns2".to_string(), make_worker_set("ns2", "def")); assert_eq!(model.worker_set_count(), 2); assert!(model.has_worker_set("ns1")); assert!(model.has_worker_set("ns2")); } #[test] fn test_is_checksum_compatible_no_existing_worker_set() { let model = Model::new("llama".to_string()); // No WorkerSet exists yet — any checksum is compatible assert!(model.is_checksum_compatible("ns1", "abc")); assert!(model.is_checksum_compatible("ns1", "xyz")); } #[test] fn test_is_checksum_compatible_matching_checksum() { let model = Model::new("llama".to_string()); model.add_worker_set("ns1".to_string(), make_worker_set("ns1", "abc")); // Same ws_key, same checksum → compatible assert!(model.is_checksum_compatible("ns1", "abc")); } #[test] fn test_is_checksum_compatible_mismatched_checksum() { let model = Model::new("llama".to_string()); model.add_worker_set("ns1".to_string(), make_worker_set("ns1", "abc")); // Same ws_key, different checksum → incompatible assert!(!model.is_checksum_compatible("ns1", "def")); } #[test] fn test_is_checksum_compatible_different_ws_key() { let model = Model::new("llama".to_string()); model.add_worker_set("ns1".to_string(), make_worker_set("ns1", "abc")); // Different ws_key — no existing WorkerSet for "ns2", so any checksum is fine assert!(model.is_checksum_compatible("ns2", "def")); assert!(model.is_checksum_compatible("ns2", "abc")); } #[test] fn test_no_engines_means_prefill() { let model = Model::new("llama".to_string()); model.add_worker_set("ns1".to_string(), make_worker_set("ns1", "abc")); // WorkerSets with no engines are treated as prefill sets assert!(model.has_prefill()); assert!(!model.has_decode_engine()); assert!(!model.has_chat_engine()); assert!(!model.has_completions_engine()); assert!(!model.has_embeddings_engine()); assert!(!model.has_tensor_engine()); assert!(!model.has_images_engine()); } #[test] fn test_get_engine_returns_error_without_engines() { let model = Model::new("llama".to_string()); model.add_worker_set("ns1".to_string(), make_worker_set("ns1", "abc")); assert!(model.get_chat_engine().is_err()); assert!(model.get_completions_engine().is_err()); assert!(model.get_embeddings_engine().is_err()); assert!(model.get_images_engine().is_err()); assert!(model.get_tensor_engine().is_err()); } #[test] fn test_select_worker_set_with_extracts_namespace() { // Test that select_worker_set_with works by going through the public API. // Since we can't create real engines in tests, we verify that selection // returns None/Err when no engines are configured, which exercises the // filtering and selection code paths. let model = Model::new("llama".to_string()); // Empty model assert!(model.get_chat_engine().is_err()); // Single set (fast path) model.add_worker_set("ns1".to_string(), make_worker_set("ns1", "abc")); assert!(model.get_chat_engine().is_err()); // No engine → filtered out // Multiple sets (weighted path) model.add_worker_set("ns2".to_string(), make_worker_set("ns2", "abc")); assert!(model.get_chat_engine().is_err()); // Still no engines → all filtered out } #[test] fn test_total_workers_no_watcher() { // In-process WorkerSets (no watcher) default to worker_count=1 let model = Model::new("llama".to_string()); assert_eq!(model.total_workers(), 0); // empty model model.add_worker_set("ns1".to_string(), make_worker_set("ns1", "abc")); assert_eq!(model.total_workers(), 1); model.add_worker_set("ns2".to_string(), make_worker_set("ns2", "abc")); assert_eq!(model.total_workers(), 2); } #[test] fn test_total_workers_with_watcher() { let model = Model::new("llama".to_string()); let (ws1, _tx1) = make_worker_set_with_count("ns1", "abc", vec![1, 2, 3]); let (ws2, _tx2) = make_worker_set_with_count("ns2", "abc", vec![10, 20]); model.add_worker_set("ns1".to_string(), ws1); model.add_worker_set("ns2".to_string(), ws2); assert_eq!(model.total_workers(), 5); // 3 + 2 } #[test] fn test_total_workers_updates_dynamically() { let model = Model::new("llama".to_string()); let (ws1, tx1) = make_worker_set_with_count("ns1", "abc", vec![1, 2]); model.add_worker_set("ns1".to_string(), ws1); assert_eq!(model.total_workers(), 2); // Workers leave tx1.send(vec![1]).unwrap(); assert_eq!(model.total_workers(), 1); // All workers gone tx1.send(vec![]).unwrap(); assert_eq!(model.total_workers(), 0); } #[test] fn test_zero_worker_single_set_filtered() { // Single WorkerSet with 0 workers should be filtered by select_worker_set_with. // We test via select_worker_set_with's internal behavior: even though the set // exists and is_prefill_set() returns true, engine accessors should fail because // the zero-worker filter runs before the extract closure. let model = Model::new("llama".to_string()); let (ws, _tx) = make_worker_set_with_count("ns1", "abc", vec![]); model.add_worker_set("ns1".to_string(), ws); // WorkerSet exists but has 0 workers → selection filtered out → Err assert!(model.get_chat_engine().is_err()); assert!(model.get_completions_engine().is_err()); } #[test] fn test_zero_worker_multi_set_filtered() { // With multiple sets, only those with workers > 0 participate in selection. let model = Model::new("llama".to_string()); let (ws1, _tx1) = make_worker_set_with_count("ns1", "abc", vec![]); let (ws2, _tx2) = make_worker_set_with_count("ns2", "abc", vec![]); model.add_worker_set("ns1".to_string(), ws1); model.add_worker_set("ns2".to_string(), ws2); // Both have 0 workers → all filtered → Err assert!(model.get_chat_engine().is_err()); } // -- Disaggregated prefill death tests -- use crate::kv_router::PrefillRouter; /// Build a WorkerSet with a deactivated PrefillRouter simulating "was activated, now dead". /// worker_count defaults to 1 (no instance_count_rx -> in-process default). fn make_worker_set_with_dead_prefill(namespace: &str, enforce_disagg: bool) -> Arc { let mut ws = WorkerSet::new( namespace.to_string(), "abc".to_string(), crate::model_card::ModelDeploymentCard::default(), ); let pr = PrefillRouter::disabled( std::sync::Arc::new(crate::discovery::ModelManager::new()), dynamo_runtime::pipeline::RouterMode::RoundRobin, enforce_disagg, ); pr.deactivate(); ws.prefill_router = Some(pr); Arc::new(ws) } /// Baseline: a WorkerSet without a PrefillRouter is always displayable /// (worker_count=1, is_prefill_set=true, no can_serve_requests block). #[test] fn test_is_displayable_true_basic() { let model = Model::new("llama".to_string()); model.add_worker_set("ns1".to_string(), make_worker_set("ns1", "abc")); assert!( model.is_displayable(), "model with an unconstrained WorkerSet must be displayable" ); } /// When the prefill engine dies and enforce_disagg is set, the model must be /// hidden from /v1/models. #[test] fn test_is_displayable_false_when_prefill_dies_enforce_disagg() { let model = Model::new("llama".to_string()); model.add_worker_set( "ns1".to_string(), make_worker_set_with_dead_prefill("ns1", true), ); assert!( !model.is_displayable(), "model must be hidden when prefill died and enforce_disagg=true" ); } /// When enforce_disagg is false the deployment can fall back to aggregated mode, /// so the model should remain visible in /v1/models. #[test] fn test_is_displayable_true_when_prefill_dies_no_enforce() { let model = Model::new("llama".to_string()); model.add_worker_set( "ns1".to_string(), make_worker_set_with_dead_prefill("ns1", false), ); assert!( model.is_displayable(), "model must remain visible when prefill died but enforce_disagg=false (fallback)" ); } /// A single WorkerSet with a deactivated prefill router (enforce_disagg=true) must be /// skipped by select_worker_set_with(), causing engine accessors to return Err. #[test] fn test_dead_prefill_single_set_not_selectable() { let model = Model::new("llama".to_string()); model.add_worker_set( "ns1".to_string(), make_worker_set_with_dead_prefill("ns1", true), ); assert!(model.get_chat_engine().is_err()); assert!(model.get_completions_engine().is_err()); } /// With two WorkerSets -- one healthy, one with dead prefill -- the healthy set /// keeps the model displayable. Removing the healthy set hides the model. #[test] fn test_dead_prefill_multi_set_skips_dead_namespace() { let model = Model::new("llama".to_string()); // Healthy set (no prefill constraint) model.add_worker_set("healthy".to_string(), make_worker_set("healthy", "abc")); // Dead set (deactivated prefill + enforce_disagg) model.add_worker_set( "dead".to_string(), make_worker_set_with_dead_prefill("dead", true), ); assert!( model.is_displayable(), "model must be displayable when at least one healthy set exists" ); // Removing the healthy set leaves only the dead set -- model must be hidden. model.remove_worker_set("healthy"); assert!( !model.is_displayable(), "model must be hidden when only the dead prefill set remains" ); } }