// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 use std::sync::Arc; use std::sync::atomic::Ordering; use anyhow::Result; use tokio::sync::oneshot; use dynamo_kv_router::{PrefillLoadEstimator, config::KvRouterConfig}; use dynamo_runtime::{ component::{Client, Endpoint}, pipeline::{PushRouter, RouterMode}, protocols::annotated::Annotated, }; use super::{InnerPrefillRouter, PrefillRouter}; use crate::{ discovery::ModelManager, kv_router::KvPushRouter, protocols::common::{ llm_backend::{LLMEngineOutput, PreprocessedRequest}, timing::WORKER_TYPE_PREFILL, }, }; impl PrefillRouter { /// Create a disabled prefill router that will never activate (passthrough only) pub fn disabled( model_manager: Arc, router_mode: RouterMode, enforce_disagg: bool, ) -> Arc { Arc::new(Self { prefill_router: std::sync::OnceLock::new(), model_manager, endpoint_id: std::sync::OnceLock::new(), cancel_token: tokio_util::sync::CancellationToken::new(), router_mode, enforce_disagg, prefill_load_estimator: None, model_name: String::new(), // Not used for disabled router namespace: String::new(), // Not used for disabled router is_eagle: false, deactivated: std::sync::atomic::AtomicBool::new(false), activated: std::sync::atomic::AtomicBool::new(false), }) } #[expect(clippy::too_many_arguments)] pub fn new( activation_rx: oneshot::Receiver, model_manager: Arc, router_mode: RouterMode, kv_cache_block_size: u32, kv_router_config: Option, prefill_load_estimator: Option>, enforce_disagg: bool, model_name: String, namespace: String, is_eagle: bool, ) -> Arc { let prefill_router = std::sync::OnceLock::new(); let cancel_token = tokio_util::sync::CancellationToken::new(); let router = Arc::new(Self { prefill_router, model_manager: model_manager.clone(), endpoint_id: std::sync::OnceLock::new(), cancel_token: cancel_token.clone(), router_mode, enforce_disagg, prefill_load_estimator, model_name, namespace, is_eagle, deactivated: std::sync::atomic::AtomicBool::new(false), activated: std::sync::atomic::AtomicBool::new(false), }); // Spawn background task to wait for activation let router_clone = router.clone(); tokio::spawn(async move { tokio::select! { result = activation_rx => { let Ok(endpoint) = result else { tracing::debug!("Prefill router activation channel closed without receiving endpoint"); return; }; if let Err(e) = router_clone.activate( endpoint, model_manager, kv_cache_block_size, kv_router_config, router_clone.prefill_load_estimator.clone(), ).await { tracing::error!(error = %e, "Failed to activate prefill router"); } } _ = cancel_token.cancelled() => { tracing::debug!("Prefill router activation cancelled"); } } }); router } /// Activate the prefill router with the provided endpoint async fn activate( &self, endpoint: Endpoint, model_manager: Arc, kv_cache_block_size: u32, kv_router_config: Option, prefill_load_estimator: Option>, ) -> Result<()> { tracing::info!( router_mode = ?self.router_mode, "Activating prefill router" ); // Store endpoint_id for later use in resolve_prefill_worker let _ = self.endpoint_id.set(endpoint.id()); // Start runtime config watcher for this endpoint (needed for get_disaggregated_endpoint) // This must be done before creating the router so bootstrap info is available model_manager .get_or_create_runtime_config_watcher(&endpoint) .await?; let inner_router = if self.router_mode.is_kv_routing() { // Create KV chooser using the endpoint (this is a prefill router) let kv_chooser = model_manager .kv_chooser_for( &endpoint, kv_cache_block_size, kv_router_config, prefill_load_estimator, WORKER_TYPE_PREFILL, Some(self.model_name.clone()), self.is_eagle, ) .await?; // Extract client from kv_chooser to ensure shared state let client = kv_chooser.client().clone(); self.register_prefill_client(model_manager.as_ref(), &client); // Build the PushRouter for prefill with KV mode using the shared client let push_router = PushRouter::>::from_client_with_monitor( client, RouterMode::KV, None, // worker_monitor ) .await?; // Wrap it in KvPushRouter InnerPrefillRouter::KvRouter(Arc::new(KvPushRouter::new(push_router, kv_chooser))) } else { // Create client for simple router let client = endpoint.client().await?; self.register_prefill_client(model_manager.as_ref(), &client); // Create simple push router with the frontend's router mode // Note: Per-worker metrics (active_prefill_tokens, active_decode_blocks) are only // available in KV routing mode where the router has actual bookkeeping. let push_router = PushRouter::>::from_client_with_monitor( client, self.router_mode, None, // worker_monitor ) .await?; InnerPrefillRouter::SimpleRouter(Arc::new(push_router)) }; // Set the router (ignore error if already set) let _ = self.prefill_router.set(inner_router); self.activated.store(true, Ordering::Release); tracing::info!( router_mode = ?self.router_mode, "Prefill router activated successfully" ); Ok(()) } fn register_prefill_client(&self, model_manager: &ModelManager, client: &Client) { if let Some(monitor) = model_manager.get_worker_monitor_for_namespace(&self.model_name, &self.namespace) { monitor.set_prefill_client(client.clone()); } } // -- Prefill death handling -- /// Deactivate the prefill router. Called when all prefill workers are removed. /// After deactivation, requests fall back to aggregated mode (or fail if enforce_disagg). /// The inner router is preserved so that when workers rejoin (same endpoint/discovery), /// the Client's discovery subscription picks them up automatically. pub fn deactivate(&self) { self.deactivated.store(true, Ordering::Release); tracing::info!( model_name = %self.model_name, namespace = %self.namespace, enforce_disagg = self.enforce_disagg, "Prefill router deactivated (all prefill workers removed)" ); } /// Reactivate a deactivated router. Called when prefill workers rejoin. /// The inner router's Client re-discovers workers via its discovery subscription. /// /// Note: there is a brief race between flipping `deactivated=false` (making /// `can_serve_requests()` return true) and the Client actually rediscovering /// workers. Requests arriving in this window may fail at prefill resolution. /// This is bounded by discovery propagation time (typically sub-second). /// /// Also note: reactivation reuses the existing inner router built from the /// original endpoint. If prefill rejoins under a different endpoint identity /// (e.g., reconfigured deployment), the stale Client would not discover the /// new workers. This is acceptable for normal restart scenarios where the /// endpoint identity is stable. pub fn reactivate(&self) { self.deactivated.store(false, Ordering::Release); tracing::info!( model_name = %self.model_name, namespace = %self.namespace, "Prefill router reactivated (prefill workers rejoined)" ); } /// Whether this router is currently deactivated (prefill workers died). pub fn is_deactivated(&self) -> bool { self.deactivated.load(Ordering::Acquire) } /// Whether this router can serve requests in its current state. /// - !enforce_disagg (aggregated passthrough): always servable unless deactivated /// - enforce_disagg: only servable when prefill has activated AND is not deactivated, /// so a cold-started strict-disagg model isn't listed before prefill rendezvoused. pub fn can_serve_requests(&self) -> bool { if self.is_deactivated() { return !self.enforce_disagg; } if !self.enforce_disagg { return true; } self.activated.load(Ordering::Acquire) } /// Mark this router as activated for testing purposes. /// In production, `activate()` sets this flag when the inner router is populated. #[cfg(test)] pub(crate) fn mark_activated_for_test(&self) { self.activated.store(true, Ordering::Release); } }