// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 use anyhow::Result; use k8s_openapi::api::core::v1::Pod; use k8s_openapi::api::discovery::v1::EndpointSlice; use std::collections::hash_map::DefaultHasher; use std::fs; use std::hash::{Hash, Hasher}; use std::path::Path; use crate::config::environment_names::discovery; const INSTANCE_ID_MASK: u64 = 0x001F_FFFF_FFFF_FFFFu64; const MAIN_CONTAINER_NAME: &str = "main"; /// Kube discovery mode. /// /// - `Pod`: default. One identity per pod. /// - `Container`: each container independently registers with the discovery plane. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(super) enum KubeDiscoveryMode { Pod, Container, } impl KubeDiscoveryMode { pub fn from_env() -> Result { match std::env::var(discovery::DYN_KUBE_DISCOVERY_MODE).as_deref() { Ok("container") => Ok(Self::Container), Ok("pod") | Err(_) => Ok(Self::Pod), Ok(other) => anyhow::bail!( "Invalid DYN_KUBE_DISCOVERY_MODE value '{}'. Valid values: 'pod', 'container'", other ), } } } /// A resolved discovery target identifying either a pod or a specific container within a pod. #[derive(Debug, Clone, PartialEq, Eq)] pub(super) enum KubeDiscoveryTarget { Pod(String), Container(String, String), } impl KubeDiscoveryTarget { /// CR name for this target, used as the DynamoWorkerMetadata resource name. pub fn cr_name(&self) -> String { match self { Self::Pod(pod_name) => pod_name.clone(), Self::Container(pod_name, container_name) if container_name == MAIN_CONTAINER_NAME => { pod_name.clone() } Self::Container(pod_name, container_name) => { format!("{}-{}", pod_name, container_name) } } } /// Deterministic instance ID derived from cr_name. pub fn instance_id(&self) -> u64 { let mut hasher = DefaultHasher::new(); self.cr_name().hash(&mut hasher); hasher.finish() & INSTANCE_ID_MASK } pub fn pod_name(&self) -> &str { match self { Self::Pod(pod_name) | Self::Container(pod_name, _) => pod_name, } } } /// Hash a pod name to get a consistent instance ID (pod-level). /// /// Used by C bindings (EPP) for pod-level worker ID mapping. pub fn hash_pod_name(pod_name: &str) -> u64 { let mut hasher = DefaultHasher::new(); pod_name.hash(&mut hasher); hasher.finish() & INSTANCE_ID_MASK } /// Extract (instance_id, pod_name) tuples from an EndpointSlice for ready endpoints. pub(super) fn extract_endpoint_info(slice: &EndpointSlice) -> Vec<(u64, String)> { let mut result = Vec::new(); for endpoint in &slice.endpoints { let is_ready = endpoint .conditions .as_ref() .and_then(|c| c.ready) .unwrap_or(false); if !is_ready { continue; } let pod_name = match endpoint.target_ref.as_ref() { Some(target_ref) => target_ref.name.as_deref().unwrap_or(""), None => continue, }; if pod_name.is_empty() { continue; } let target = KubeDiscoveryTarget::Pod(pod_name.to_string()); result.push((target.instance_id(), target.cr_name())); } result } /// Extract (instance_id, cr_name) tuples from a Pod for each ready container. pub(super) fn extract_ready_containers(pod: &Pod) -> Vec<(u64, String)> { let pod_name = match pod.metadata.name.as_deref() { Some(name) => name, None => return vec![], }; let container_statuses = match pod .status .as_ref() .and_then(|s| s.container_statuses.as_ref()) { Some(statuses) => statuses, None => return vec![], }; container_statuses .iter() .filter(|cs| cs.ready) .map(|cs| { let target = KubeDiscoveryTarget::Container(pod_name.to_string(), cs.name.clone()); (target.instance_id(), target.cr_name()) }) .collect() } /// Pod information extracted from environment. #[derive(Debug, Clone)] pub(super) struct PodInfo { pub pod_name: String, pub pod_namespace: String, pub pod_uid: String, pub system_port: u16, /// Kube discovery mode for this process, read from DYN_KUBE_DISCOVERY_MODE. pub mode: KubeDiscoveryMode, /// Discovery target for this process, derived from mode + pod/container identity. pub target: KubeDiscoveryTarget, } const DEFAULT_PODINFO_PATH: &str = "/etc/podinfo"; impl PodInfo { fn read_from_file_or_env(file_path: &Path, env_var: &str) -> Option { if let Ok(content) = fs::read_to_string(file_path) { let value = content.trim().to_string(); if !value.is_empty() { return Some(value); } } std::env::var(env_var).ok() } pub fn from_env() -> Result { let podinfo_path = Path::new(DEFAULT_PODINFO_PATH); let pod_name = Self::read_from_file_or_env(&podinfo_path.join("pod_name"), "POD_NAME") .ok_or_else(|| anyhow::anyhow!("POD_NAME not available from file or environment"))?; let pod_uid = Self::read_from_file_or_env(&podinfo_path.join("pod_uid"), "POD_UID") .ok_or_else(|| anyhow::anyhow!("POD_UID not available from file or environment"))?; let pod_namespace = Self::read_from_file_or_env(&podinfo_path.join("pod_namespace"), "POD_NAMESPACE") .unwrap_or_else(|| { tracing::warn!("POD_NAMESPACE not set, defaulting to 'default'"); "default".to_string() }); let mode = KubeDiscoveryMode::from_env()?; let target = match mode { KubeDiscoveryMode::Pod => KubeDiscoveryTarget::Pod(pod_name.clone()), KubeDiscoveryMode::Container => { let container_name = std::env::var("CONTAINER_NAME").map_err(|_| { anyhow::anyhow!( "CONTAINER_NAME is required when DYN_KUBE_DISCOVERY_MODE=container" ) })?; KubeDiscoveryTarget::Container(pod_name.clone(), container_name) } }; if podinfo_path.join("pod_name").exists() { tracing::info!( "Pod identity loaded from Downward API volume mount at {}", DEFAULT_PODINFO_PATH ); } else { tracing::info!("Pod identity loaded from environment variables"); } let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default(); let system_port = config.system_port as u16; Ok(Self { pod_name, pod_namespace, pod_uid, system_port, mode, target, }) } } #[cfg(test)] mod tests { use super::*; #[test] fn test_pod_mode_backward_compat() { // Pod mode must produce the same instance_id as hash_pod_name // so existing deployments see no identity change on upgrade. let target = KubeDiscoveryTarget::Pod("worker-0".into()); assert_eq!(target.instance_id(), hash_pod_name("worker-0")); assert_eq!(target.cr_name(), "worker-0"); } #[test] fn test_container_mode_main_uses_pod_identity() { // A container named "main" uses pod-level identity so that // container-mode frontends can discover pod-mode workers. let target = KubeDiscoveryTarget::Container("worker-0".into(), "main".into()); assert_eq!(target.instance_id(), hash_pod_name("worker-0")); assert_eq!(target.cr_name(), "worker-0"); } #[test] fn test_container_mode_engine_gets_unique_identity() { // Non-main containers get per-container identity so that // failover engine containers are independently discoverable. let e0 = KubeDiscoveryTarget::Container("worker-0".into(), "engine-0".into()); let e1 = KubeDiscoveryTarget::Container("worker-0".into(), "engine-1".into()); assert_eq!(e0.cr_name(), "worker-0-engine-0"); assert_eq!(e1.cr_name(), "worker-0-engine-1"); assert_ne!(e0.instance_id(), e1.instance_id()); assert_ne!(e0.instance_id(), hash_pod_name("worker-0")); } }