use crate::core::WorkerManager; use crate::protocols::worker_spec::WorkerConfigRequest; use crate::server::AppContext; use futures::{StreamExt, TryStreamExt}; use k8s_openapi::api::core::v1::Pod; use kube::{ api::Api, runtime::watcher::{watcher, Config}, runtime::WatchStreamExt, Client, }; use std::collections::{HashMap, HashSet}; use rustls; use std::sync::{Arc, Mutex}; use std::time::Duration; use tokio::task; use tokio::time; use tracing::{debug, error, info, warn}; #[derive(Debug, Clone)] pub struct ServiceDiscoveryConfig { pub enabled: bool, pub selector: HashMap, pub check_interval: Duration, pub port: u16, pub namespace: Option, // PD mode specific configuration pub pd_mode: bool, pub prefill_selector: HashMap, pub decode_selector: HashMap, // Bootstrap port annotation specific to mooncake implementation pub bootstrap_port_annotation: String, } impl Default for ServiceDiscoveryConfig { fn default() -> Self { ServiceDiscoveryConfig { enabled: false, selector: HashMap::new(), check_interval: Duration::from_secs(60), port: 8000, namespace: None, pd_mode: false, prefill_selector: HashMap::new(), decode_selector: HashMap::new(), bootstrap_port_annotation: "sglang.ai/bootstrap-port".to_string(), } } } #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum PodType { Prefill, Decode, Regular, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct PodInfo { pub name: String, pub ip: String, pub status: String, pub is_ready: bool, pub pod_type: Option, pub bootstrap_port: Option, } impl PodInfo { fn matches_selector(pod: &Pod, selector: &HashMap) -> bool { if selector.is_empty() { return false; } pod.metadata .labels .as_ref() .is_some_and(|labels| selector.iter().all(|(k, v)| labels.get(k) == Some(v))) } pub fn should_include(pod: &Pod, config: &ServiceDiscoveryConfig) -> bool { if config.pd_mode { if config.prefill_selector.is_empty() && config.decode_selector.is_empty() { warn!("PD mode enabled but both prefill_selector and decode_selector are empty"); return false; } Self::matches_selector(pod, &config.prefill_selector) || Self::matches_selector(pod, &config.decode_selector) } else { if config.selector.is_empty() { warn!("Regular mode enabled but selector is empty"); return false; } Self::matches_selector(pod, &config.selector) } } pub fn from_pod(pod: &Pod, config: Option<&ServiceDiscoveryConfig>) -> Option { let name = pod.metadata.name.clone()?; let status = pod.status.clone()?; let pod_ip = status.pod_ip?; let is_ready = if let Some(conditions) = &status.conditions { conditions .iter() .any(|condition| condition.type_ == "Ready" && condition.status == "True") } else { false }; let pod_status = status.phase.unwrap_or_else(|| "Unknown".to_string()); let pod_type = if let Some(config) = config { if config.pd_mode { if Self::matches_selector(pod, &config.prefill_selector) { Some(PodType::Prefill) } else if Self::matches_selector(pod, &config.decode_selector) { Some(PodType::Decode) } else { Some(PodType::Regular) } } else { Some(PodType::Regular) } } else { None }; let bootstrap_port = if matches!(pod_type, Some(PodType::Prefill)) { if let Some(config) = config { pod.metadata .annotations .as_ref() .and_then(|annotations| annotations.get(&config.bootstrap_port_annotation)) .and_then(|port_str| port_str.parse::().ok()) } else { None } } else { None }; Some(PodInfo { name, ip: pod_ip, status: pod_status, is_ready, pod_type, bootstrap_port, }) } pub fn is_healthy(&self) -> bool { self.is_ready && self.status == "Running" } pub fn worker_url(&self, port: u16) -> String { format!("http://{}:{}", self.ip, port) } } pub async fn start_service_discovery( config: ServiceDiscoveryConfig, app_context: Arc, ) -> Result, kube::Error> { if !config.enabled { return Err(kube::Error::Api(kube::error::ErrorResponse { status: "Disabled".to_string(), message: "Service discovery is disabled".to_string(), reason: "ConfigurationError".to_string(), code: 400, })); } let _ = rustls::crypto::ring::default_provider().install_default(); let client = Client::try_default().await?; // Log the appropriate selectors based on mode if config.pd_mode { let prefill_selector = config .prefill_selector .iter() .map(|(k, v)| format!("{}={}", k, v)) .collect::>() .join(","); let decode_selector = config .decode_selector .iter() .map(|(k, v)| format!("{}={}", k, v)) .collect::>() .join(","); info!( "Starting K8s service discovery | PD mode | prefill: '{}' | decode: '{}'", prefill_selector, decode_selector ); } else { let label_selector = config .selector .iter() .map(|(k, v)| format!("{}={}", k, v)) .collect::>() .join(","); info!( "Starting K8s service discovery | selector: '{}'", label_selector ); } let handle = task::spawn(async move { let tracked_pods = Arc::new(Mutex::new(HashSet::new())); let pods: Api = if let Some(namespace) = &config.namespace { Api::namespaced(client, namespace) } else { Api::all(client) }; debug!("K8s service discovery initialized"); let config_arc = Arc::new(config.clone()); let port = config.port; let mut retry_delay = Duration::from_secs(1); const MAX_RETRY_DELAY: Duration = Duration::from_secs(300); loop { let watcher_config = Config::default(); let watcher_stream = watcher(pods.clone(), watcher_config).applied_objects(); let config_clone = Arc::clone(&config_arc); let tracked_pods_clone = Arc::clone(&tracked_pods); let filtered_stream = watcher_stream.filter_map(move |obj_res| { let config_inner = Arc::clone(&config_clone); async move { match obj_res { Ok(pod) => { if PodInfo::should_include(&pod, &config_inner) { Some(Ok(pod)) } else { None } } Err(e) => Some(Err(e)), } } }); let tracked_pods_clone2 = Arc::clone(&tracked_pods_clone); let app_context_clone = Arc::clone(&app_context); let config_clone2 = Arc::clone(&config_arc); match filtered_stream .try_for_each(move |pod| { let tracked_pods_inner = Arc::clone(&tracked_pods_clone2); let app_context_inner = Arc::clone(&app_context_clone); let config_inner = Arc::clone(&config_clone2); async move { let pod_info = PodInfo::from_pod(&pod, Some(&config_inner)); if let Some(pod_info) = pod_info { if pod.metadata.deletion_timestamp.is_some() { handle_pod_deletion( &pod_info, tracked_pods_inner, app_context_inner, port, ) .await; } else { handle_pod_event( &pod_info, tracked_pods_inner, app_context_inner, port, config_inner.pd_mode, ) .await; } } Ok(()) } }) .await { Ok(_) => { retry_delay = Duration::from_secs(1); } Err(err) => { error!("Error in Kubernetes watcher: {}", err); warn!( "Retrying in {} seconds with exponential backoff", retry_delay.as_secs() ); time::sleep(retry_delay).await; retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY); } } warn!( "Kubernetes watcher exited, restarting in {} seconds", config_arc.check_interval.as_secs() ); time::sleep(config_arc.check_interval).await; } }); Ok(handle) } async fn handle_pod_event( pod_info: &PodInfo, tracked_pods: Arc>>, app_context: Arc, port: u16, pd_mode: bool, ) { let worker_url = pod_info.worker_url(port); if pod_info.is_healthy() { let should_add = { let mut tracker = match tracked_pods.lock() { Ok(tracker) => tracker, Err(e) => { error!("Failed to acquire tracked_pods lock: {}", e); return; } }; if tracker.contains(pod_info) { false } else { tracker.insert(pod_info.clone()); true } }; if should_add { info!( "Adding pod: {} | type: {:?} | url: {}", pod_info.name, pod_info.pod_type, worker_url ); let worker_type = if pd_mode { match &pod_info.pod_type { Some(PodType::Prefill) => Some("prefill".to_string()), Some(PodType::Decode) => Some("decode".to_string()), Some(PodType::Regular) | None => None, } } else { None }; let bootstrap_port = if pd_mode { match &pod_info.pod_type { Some(PodType::Prefill) => pod_info.bootstrap_port, _ => None, } } else { None }; let config = WorkerConfigRequest { url: worker_url.clone(), model_id: None, worker_type, priority: None, cost: None, labels: HashMap::new(), bootstrap_port, tokenizer_path: None, reasoning_parser: None, tool_parser: None, chat_template: None, api_key: None, }; let result = WorkerManager::add_worker_from_config(&config, &app_context).await; match result { Ok(_) => { debug!("Worker added: {}", worker_url); } Err(e) => { error!("Failed to add worker {} to router: {}", worker_url, e); if let Ok(mut tracker) = tracked_pods.lock() { tracker.remove(pod_info); } } } } } } async fn handle_pod_deletion( pod_info: &PodInfo, tracked_pods: Arc>>, app_context: Arc, port: u16, ) { let worker_url = pod_info.worker_url(port); let was_tracked = { let mut tracked = match tracked_pods.lock() { Ok(tracked) => tracked, Err(e) => { error!("Failed to acquire tracked_pods lock during deletion: {}", e); return; } }; tracked.remove(pod_info) }; if was_tracked { info!( "Removing pod: {} | type: {:?} | url: {}", pod_info.name, pod_info.pod_type, worker_url ); if let Err(e) = WorkerManager::remove_worker(&worker_url, &app_context) { error!("Failed to remove worker {}: {}", worker_url, e); } } else { debug!( "Pod deletion event for untracked/already removed pod: {} (type: {:?}). Worker URL: {}", pod_info.name, pod_info.pod_type, worker_url ); } } #[cfg(test)] mod tests { use super::*; use k8s_openapi::api::core::v1::{Pod, PodCondition, PodSpec, PodStatus}; use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; use k8s_openapi::apimachinery::pkg::apis::meta::v1::Time; fn create_k8s_pod( name: Option<&str>, ip: Option<&str>, phase: Option<&str>, ready_status: Option<&str>, deletion_timestamp: Option