"lib/bindings/vscode:/vscode.git/clone" did not exist on "93ca9df1d1e07b5dc6a5707f97707da2e0133896"
Unverified Commit 4106b90f authored by mohammedabdulwahhab's avatar mohammedabdulwahhab Committed by GitHub
Browse files

fix: support dynamic metadata updates in Kubernetes backed discovery (#4988)


Signed-off-by: default avatarmohammedabdulwahhab <furkhan324@berkeley.edu>
parent d2faf0e6
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
mod crd;
mod daemon;
mod utils;
pub use crd::{DynamoWorkerMetadata, DynamoWorkerMetadataSpec};
pub use utils::hash_pod_name;
use crd::{apply_cr, build_cr};
use daemon::DiscoveryDaemon;
use utils::PodInfo;
......@@ -27,6 +30,8 @@ pub struct KubeDiscoveryClient {
instance_id: u64,
metadata: Arc<RwLock<DiscoveryMetadata>>,
metadata_watch: tokio::sync::watch::Receiver<Arc<MetadataSnapshot>>,
kube_client: KubeClient,
pod_info: PodInfo,
}
impl KubeDiscoveryClient {
......@@ -43,10 +48,11 @@ impl KubeDiscoveryClient {
let instance_id = hash_pod_name(&pod_info.pod_name);
tracing::info!(
"Initializing KubeDiscoveryClient: pod_name={}, instance_id={:x}, namespace={}",
"Initializing KubeDiscoveryClient: pod_name={}, instance_id={:x}, namespace={}, pod_uid={}",
pod_info.pod_name,
instance_id,
pod_info.pod_namespace
pod_info.pod_namespace,
pod_info.pod_uid
);
let kube_client = KubeClient::try_default()
......@@ -57,7 +63,7 @@ impl KubeDiscoveryClient {
let (watch_tx, watch_rx) = tokio::sync::watch::channel(Arc::new(MetadataSnapshot::empty()));
// Create and spawn daemon
let daemon = DiscoveryDaemon::new(kube_client, pod_info, cancel_token)?;
let daemon = DiscoveryDaemon::new(kube_client.clone(), pod_info.clone(), cancel_token)?;
tokio::spawn(async move {
if let Err(e) = daemon.run(watch_tx).await {
......@@ -71,6 +77,8 @@ impl KubeDiscoveryClient {
instance_id,
metadata,
metadata_watch: watch_rx,
kube_client,
pod_info,
})
}
}
......@@ -91,12 +99,17 @@ impl Discovery for KubeDiscoveryClient {
instance_id
);
// Write to local metadata
// Write to local metadata and persist to CR
// IMPORTANT: Hold the write lock across the CR write to prevent race conditions
let mut metadata = self.metadata.write().await;
// Clone state for rollback in case CR persistence fails
let original_state = metadata.clone();
match &instance {
DiscoveryInstance::Endpoint(inst) => {
tracing::info!(
"Registered endpoint: namespace={}, component={}, endpoint={}, instance_id={:x}",
"Registering endpoint: namespace={}, component={}, endpoint={}, instance_id={:x}",
inst.namespace,
inst.component,
inst.endpoint,
......@@ -111,7 +124,7 @@ impl Discovery for KubeDiscoveryClient {
..
} => {
tracing::info!(
"Registered model card: namespace={}, component={}, endpoint={}, instance_id={:x}",
"Registering model card: namespace={}, component={}, endpoint={}, instance_id={:x}",
namespace,
component,
endpoint,
......@@ -121,23 +134,79 @@ impl Discovery for KubeDiscoveryClient {
}
}
// Build and apply the CR with the updated metadata
// This persists the metadata to Kubernetes for other pods to discover
let cr = build_cr(&self.pod_info.pod_name, &self.pod_info.pod_uid, &metadata)?;
if let Err(e) = apply_cr(&self.kube_client, &self.pod_info.pod_namespace, &cr).await {
// Rollback local state on CR persistence failure
tracing::warn!(
"Failed to persist metadata to CR, rolling back local state: {}",
e
);
*metadata = original_state;
return Err(e);
}
tracing::debug!("Persisted metadata to DynamoWorkerMetadata CR");
Ok(instance)
}
async fn unregister(&self, instance: DiscoveryInstance) -> Result<()> {
// TODO: need to handle meta data change propagation to other pods
// Current implementation delete the entry from local metadata but
// it doesn't invalidate the cached service metadata on other pods
let instance_id = self.instance_id();
// Write to local metadata and persist to CR
// IMPORTANT: Hold the write lock across the CR write to prevent race conditions
let mut metadata = self.metadata.write().await;
// Clone state for rollback in case CR persistence fails
let original_state = metadata.clone();
match &instance {
DiscoveryInstance::Endpoint(_inst) => {
DiscoveryInstance::Endpoint(inst) => {
tracing::info!(
"Unregistering endpoint: namespace={}, component={}, endpoint={}, instance_id={:x}",
inst.namespace,
inst.component,
inst.endpoint,
instance_id
);
metadata.unregister_endpoint(&instance)?;
}
DiscoveryInstance::Model { .. } => {
DiscoveryInstance::Model {
namespace,
component,
endpoint,
..
} => {
tracing::info!(
"Unregistering model card: namespace={}, component={}, endpoint={}, instance_id={:x}",
namespace,
component,
endpoint,
instance_id
);
metadata.unregister_model_card(&instance)?;
}
}
// Build and apply the CR with the updated metadata
// This persists the removal to Kubernetes for other pods to see
let cr = build_cr(&self.pod_info.pod_name, &self.pod_info.pod_uid, &metadata)?;
if let Err(e) = apply_cr(&self.kube_client, &self.pod_info.pod_namespace, &cr).await {
// Rollback local state on CR persistence failure
tracing::warn!(
"Failed to persist metadata removal to CR, rolling back local state: {}",
e
);
*metadata = original_state;
return Err(e);
}
tracing::debug!("Persisted metadata removal to DynamoWorkerMetadata CR");
Ok(())
}
......@@ -188,15 +257,59 @@ impl Discovery for KubeDiscoveryClient {
// Spawn task to process snapshots
tokio::spawn(async move {
let mut known_instances = HashSet::<u64>::new();
// Initialize known_instances from current snapshot state
// This is critical: watch_rx.changed() only fires on FUTURE changes,
// so we must capture the current state first to detect removals correctly
let initial_snapshot = watch_rx.borrow_and_update().clone();
let mut known_instances: HashSet<u64> = initial_snapshot
.instances
.iter()
.filter_map(|(&instance_id, metadata)| {
let filtered = metadata.filter(&query);
if !filtered.is_empty() {
Some(instance_id)
} else {
None
}
})
.collect();
tracing::debug!(
stream_id = %stream_id,
initial_instances = known_instances.len(),
"Watch started for query={:?}",
query
);
// Emit initial Added events for all existing instances (the "list" part of list_and_watch)
for &instance_id in &known_instances {
if let Some(metadata) = initial_snapshot.instances.get(&instance_id) {
let instances = metadata.filter(&query);
for instance in instances {
tracing::info!(
stream_id = %stream_id,
instance_id = format!("{:x}", instance.instance_id()),
"Emitting initial Added event"
);
if event_tx.send(Ok(DiscoveryEvent::Added(instance))).is_err() {
tracing::debug!(
stream_id = %stream_id,
"Watch receiver dropped during initial sync"
);
return;
}
}
}
}
loop {
tracing::trace!(
stream_id = %stream_id,
known_count = known_instances.len(),
"Watch loop waiting for changes"
);
// Wait for next snapshot or cancellation
let watch_result = if let Some(ref token) = cancel_token {
tokio::select! {
......@@ -218,6 +331,14 @@ impl Discovery for KubeDiscoveryClient {
// Get latest snapshot
let snapshot = watch_rx.borrow_and_update().clone();
tracing::debug!(
stream_id = %stream_id,
seq = snapshot.sequence,
snapshot_instances = snapshot.instances.len(),
known_instances = known_instances.len(),
"Watch received snapshot update"
);
// Filter snapshot by query
let current_instances: HashSet<u64> = snapshot
.instances
......@@ -232,6 +353,13 @@ impl Discovery for KubeDiscoveryClient {
})
.collect();
tracing::trace!(
stream_id = %stream_id,
current_ids = ?current_instances.iter().map(|id| format!("{:x}", id)).collect::<Vec<_>>(),
known_ids = ?known_instances.iter().map(|id| format!("{:x}", id)).collect::<Vec<_>>(),
"Comparing instance sets"
);
// Compute diff
let added: Vec<u64> = current_instances
.difference(&known_instances)
......@@ -243,8 +371,16 @@ impl Discovery for KubeDiscoveryClient {
.copied()
.collect();
// Only log if there are changes
if !added.is_empty() || !removed.is_empty() {
// Log diff results (even if empty, for debugging)
if added.is_empty() && removed.is_empty() {
tracing::debug!(
stream_id = %stream_id,
seq = snapshot.sequence,
current_count = current_instances.len(),
known_count = known_instances.len(),
"Watch snapshot received but no diff detected"
);
} else {
tracing::debug!(
stream_id = %stream_id,
seq = snapshot.sequence,
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Custom Resource Definition for DynamoWorkerMetadata
//!
//! This module defines the Rust types for the DynamoWorkerMetadata CRD,
//! which stores discovery metadata for Dynamo worker pods in Kubernetes.
//!
//! The CRD schema is defined in the Helm chart at:
//! `deploy/cloud/helm/crds/templates/nvidia.com_dynamoworkermetadatas.yaml`
use anyhow::Result;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
use kube::{
Api, Client as KubeClient, CustomResource,
api::{Patch, PatchParams},
};
use serde::{Deserialize, Serialize};
use crate::discovery::DiscoveryMetadata;
/// Field manager name for server-side apply - identifies this client as the owner of fields it sets
const FIELD_MANAGER: &str = "dynamo-worker";
/// Spec for DynamoWorkerMetadata custom resource
/// The `data` field stores the serialized `DiscoveryMetadata` as a JSON blob.
#[derive(CustomResource, Clone, Debug, Deserialize, Serialize)]
#[kube(
group = "nvidia.com",
version = "v1alpha1",
kind = "DynamoWorkerMetadata",
namespaced,
schema = "disabled"
)]
pub struct DynamoWorkerMetadataSpec {
/// Raw JSON blob containing the DiscoveryMetadata
pub data: serde_json::Value,
}
impl DynamoWorkerMetadataSpec {
pub fn new(data: serde_json::Value) -> Self {
Self { data }
}
}
/// Build a DynamoWorkerMetadata CR with owner reference set to the pod
/// # Arguments
/// * `pod_name` - Name of the pod (used as CR name and in owner reference)
/// * `pod_uid` - UID of the pod (for owner reference - enables garbage collection)
/// * `metadata` - The DiscoveryMetadata to serialize into the CR's data field
///
/// # Returns
/// A `DynamoWorkerMetadata` CR ready to be applied to the cluster
pub fn build_cr(
pod_name: &str,
pod_uid: &str,
metadata: &DiscoveryMetadata,
) -> Result<DynamoWorkerMetadata> {
let data = serde_json::to_value(metadata)?;
let spec = DynamoWorkerMetadataSpec::new(data);
let mut cr = DynamoWorkerMetadata::new(pod_name, spec);
// Set owner reference to the pod for automatic garbage collection
cr.metadata.owner_references = Some(vec![OwnerReference {
api_version: "v1".to_string(),
kind: "Pod".to_string(),
name: pod_name.to_string(),
uid: pod_uid.to_string(),
// Mark pod as the controlling owner - CR will be garbage collected when pod is deleted
controller: Some(true),
// Don't block pod deletion - allow CR cleanup to happen asynchronously
block_owner_deletion: Some(false),
}]);
Ok(cr)
}
/// Apply (create or update) a DynamoWorkerMetadata CR using server-side apply
///
/// This function uses Kubernetes server-side apply which:
/// - Creates the CR if it doesn't exist
/// - Updates the CR if it does exist
/// - Is idempotent and safe to call multiple times
///
/// # Arguments
/// * `kube_client` - Kubernetes client
/// * `namespace` - Namespace to create/update the CR in
/// * `cr` - The DynamoWorkerMetadata CR to apply
pub async fn apply_cr(
kube_client: &KubeClient,
namespace: &str,
cr: &DynamoWorkerMetadata,
) -> Result<()> {
let api: Api<DynamoWorkerMetadata> = Api::namespaced(kube_client.clone(), namespace);
let cr_name = cr
.metadata
.name
.as_ref()
.ok_or_else(|| anyhow::anyhow!("CR must have a name"))?;
// force() allows us to take ownership of this field even if another controller owns it
// in practice the CR will only have one writer (the pod owner)
let params = PatchParams::apply(FIELD_MANAGER).force();
api.patch(cr_name, &params, &Patch::Apply(cr))
.await
.map_err(|e| anyhow::anyhow!("Failed to apply DynamoWorkerMetadata CR: {}", e))?;
tracing::debug!(
"Applied DynamoWorkerMetadata CR: name={}, namespace={}",
cr_name,
namespace
);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use kube::Resource;
#[test]
fn test_crd_metadata() {
// Verify the CRD metadata is correct
assert_eq!(DynamoWorkerMetadata::group(&()), "nvidia.com");
assert_eq!(DynamoWorkerMetadata::version(&()), "v1alpha1");
assert_eq!(DynamoWorkerMetadata::kind(&()), "DynamoWorkerMetadata");
assert_eq!(DynamoWorkerMetadata::plural(&()), "dynamoworkermetadatas");
}
#[test]
fn test_serialization_roundtrip() {
let data = serde_json::json!({
"endpoints": {
"ns/comp/ep": {
"type": "Endpoint",
"namespace": "ns",
"component": "comp",
"endpoint": "ep",
"instance_id": 12345,
"transport": { "Nats": "nats://localhost:4222" }
}
},
"model_cards": {}
});
let spec = DynamoWorkerMetadataSpec::new(data.clone());
let cr = DynamoWorkerMetadata::new("test-pod", spec);
let json = serde_json::to_string(&cr).expect("Failed to serialize CR");
let deserialized: DynamoWorkerMetadata =
serde_json::from_str(&json).expect("Failed to deserialize CR");
assert_eq!(deserialized.spec.data, data);
}
}
......@@ -12,23 +12,18 @@ use kube::{
};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::sync::Notify;
use tokio::time::{Duration, timeout};
use super::utils::{PodInfo, extract_endpoint_info, hash_pod_name};
use super::crd::DynamoWorkerMetadata;
use super::utils::{PodInfo, extract_endpoint_info};
const SNAPSHOT_POLL_INTERVAL_MS: u64 = 5000;
const MAX_CONCURRENT_FETCHES: usize = 20;
const METADATA_FETCH_TIMEOUT_SECS: u64 = 5;
const DEBOUNCE_DURATION: Duration = Duration::from_millis(500);
/// Discovers and aggregates metadata from pods in the cluster
/// Discovers and aggregates metadata from DynamoWorkerMetadata CRs in the cluster
#[derive(Clone)]
pub(super) struct DiscoveryDaemon {
/// Kubernetes client
kube_client: KubeClient,
/// HTTP client for fetching remote metadata
http_client: reqwest::Client,
/// Cache of remote pod metadata (instance_id -> metadata)
cache: Arc<RwLock<HashMap<u64, Arc<DiscoveryMetadata>>>>,
// This pod's info
pod_info: PodInfo,
cancel_token: CancellationToken,
......@@ -40,35 +35,36 @@ impl DiscoveryDaemon {
pod_info: PodInfo,
cancel_token: CancellationToken,
) -> Result<Self> {
let http_client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(METADATA_FETCH_TIMEOUT_SECS))
.build()
.map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {}", e))?;
Ok(Self {
kube_client,
http_client,
cache: Arc::new(RwLock::new(HashMap::new())),
pod_info,
cancel_token,
})
}
/// Run the discovery daemon
///
/// Watches both EndpointSlices (to know which pods are ready) and
/// DynamoWorkerMetadata CRs (to get the metadata for each pod).
/// A pod is included in the snapshot only if:
/// 1. It appears as ready in an EndpointSlice
/// 2. It has a corresponding DynamoWorkerMetadata CR
pub async fn run(
self,
watch_tx: tokio::sync::watch::Sender<Arc<MetadataSnapshot>>,
) -> Result<()> {
tracing::info!("Discovery daemon starting");
// Create reflector for ALL EndpointSlices in our namespace
// Create notify for watch-driven updates (shared by both reflectors)
let notify = Arc::new(Notify::new());
// --- EndpointSlice Reflector ---
let endpoint_slices: Api<EndpointSlice> =
Api::namespaced(self.kube_client.clone(), &self.pod_info.pod_namespace);
let (reader, writer) = reflector::store();
let (ep_reader, ep_writer) = reflector::store();
// Apply label selector to only watch discovery-enabled EndpointSlices
let watch_config = Config::default()
let ep_watch_config = Config::default()
.labels("nvidia.com/dynamo-discovery-backend=kubernetes")
.labels("nvidia.com/dynamo-discovery-enabled=true");
......@@ -76,83 +72,93 @@ impl DiscoveryDaemon {
"Daemon watching EndpointSlices with labels: nvidia.com/dynamo-discovery-backend=kubernetes, nvidia.com/dynamo-discovery-enabled=true"
);
// Spawn reflector task (runs independently)
let reflector_stream = reflector(writer, watcher(endpoint_slices, watch_config))
let notify_ep = notify.clone();
let ep_reflector_stream = reflector(ep_writer, watcher(endpoint_slices, ep_watch_config))
.default_backoff()
.touched_objects()
.for_each(|res| {
.for_each(move |res| {
match res {
Ok(obj) => {
tracing::debug!(
slice_name = obj.metadata.name.as_deref().unwrap_or("unknown"),
"Daemon reflector updated EndpointSlice"
"EndpointSlice reflector updated"
);
notify_ep.notify_one();
}
Err(e) => {
tracing::warn!("Daemon reflector error: {}", e);
tracing::warn!("EndpointSlice reflector error: {}", e);
notify_ep.notify_one();
}
}
// for_each expects a Future; ready(()) is an immediately-complete one
futures::future::ready(())
});
tokio::spawn(reflector_stream);
tokio::spawn(ep_reflector_stream);
// --- DynamoWorkerMetadata CR Reflector ---
let metadata_crs: Api<DynamoWorkerMetadata> =
Api::namespaced(self.kube_client.clone(), &self.pod_info.pod_namespace);
let (cr_reader, cr_writer) = reflector::store();
// Polling loop
// Watch all DynamoWorkerMetadata CRs in the namespace
let cr_watch_config = Config::default();
tracing::info!(
"Daemon watching DynamoWorkerMetadata CRs in namespace: {}",
self.pod_info.pod_namespace
);
let notify_cr = notify.clone();
let cr_reflector_stream = reflector(cr_writer, watcher(metadata_crs, cr_watch_config))
.default_backoff()
.touched_objects()
.for_each(move |res| {
match res {
Ok(obj) => {
tracing::debug!(
cr_name = obj.metadata.name.as_deref().unwrap_or("unknown"),
"DynamoWorkerMetadata CR reflector updated"
);
notify_cr.notify_one();
}
Err(e) => {
tracing::warn!("DynamoWorkerMetadata CR reflector error: {}", e);
notify_cr.notify_one();
}
}
// for_each expects a Future; ready(()) is an immediately-complete one
futures::future::ready(())
});
tokio::spawn(cr_reflector_stream);
// Event-driven loop with debouncing
let mut sequence = 0u64;
let mut prev_instance_ids: HashSet<u64> = HashSet::new();
let mut interval =
tokio::time::interval(std::time::Duration::from_millis(SNAPSHOT_POLL_INTERVAL_MS));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut prev_snapshot = MetadataSnapshot::empty();
loop {
tokio::select! {
_ = interval.tick() => {
match self.aggregate_snapshot(&reader, sequence).await {
_ = notify.notified() => {
// Debounce: K8s can emit many events in quick succession
// Wait briefly to batch them into a single snapshot update.
tokio::time::sleep(DEBOUNCE_DURATION).await;
// Drain any permit that accumulated during the sleep
let _ = timeout(Duration::ZERO, notify.notified()).await;
tracing::trace!("Debounce window elapsed, processing snapshot");
match self.aggregate_snapshot(&ep_reader, &cr_reader, sequence).await {
Ok(snapshot) => {
// Compare instance IDs to detect changes
let current_instance_ids: HashSet<u64> =
snapshot.instances.keys().copied().collect();
let instances_changed = current_instance_ids != prev_instance_ids;
if instances_changed {
// Compute what was added and removed
let added: Vec<u64> = current_instance_ids
.difference(&prev_instance_ids)
.copied()
.collect();
let removed: Vec<u64> = prev_instance_ids
.difference(&current_instance_ids)
.copied()
.collect();
tracing::info!(
"Daemon snapshot (seq={}): instances changed, total={}, added=[{}], removed=[{}]",
sequence,
current_instance_ids.len(),
added.iter().map(|id| format!("{:x}", id)).collect::<Vec<_>>().join(", "),
removed.iter().map(|id| format!("{:x}", id)).collect::<Vec<_>>().join(", ")
);
// Prune cache for removed instances
if !removed.is_empty() {
self.prune_cache(&removed).await;
}
if snapshot.has_changes_from(&prev_snapshot) {
prev_snapshot = snapshot.clone();
// Broadcast the snapshot (only when changed)
if watch_tx.send(Arc::new(snapshot)).is_err() {
tracing::debug!("No watch subscribers, daemon stopping");
break;
}
prev_instance_ids = current_instance_ids;
} else {
tracing::trace!(
"Daemon snapshot (seq={}): no changes, {} instances",
sequence,
current_instance_ids.len()
);
}
sequence += 1;
......@@ -174,57 +180,85 @@ impl DiscoveryDaemon {
Ok(())
}
/// Aggregate metadata from all pods into a snapshot
/// Aggregate metadata from EndpointSlices and DynamoWorkerMetadata CRs into a snapshot
///
/// A pod is included in the snapshot only if:
/// 1. It appears as ready in an EndpointSlice
/// 2. It has a corresponding DynamoWorkerMetadata CR (CR name = pod name)
async fn aggregate_snapshot(
&self,
reader: &reflector::Store<EndpointSlice>,
ep_reader: &reflector::Store<EndpointSlice>,
cr_reader: &reflector::Store<DynamoWorkerMetadata>,
sequence: u64,
) -> Result<MetadataSnapshot> {
let start = std::time::Instant::now();
// Extract ALL ready endpoints (instance_id, pod_name, pod_ip, system_port) directly from reflector
let all_endpoints: Vec<(u64, String, String, u16)> = reader
// Extract ready pods from EndpointSlices: (instance_id, pod_name)
let ready_pods: Vec<(u64, String)> = ep_reader
.state()
.iter()
.flat_map(|arc_slice| extract_endpoint_info(arc_slice.as_ref()))
.collect();
tracing::trace!(
"Daemon found {} ready endpoints to fetch",
all_endpoints.len()
"Daemon found {} ready pods from EndpointSlices",
ready_pods.len()
);
// Concurrent fetch: Fetch metadata for all endpoints in parallel
let fetch_futures =
all_endpoints
.into_iter()
.map(|(instance_id, pod_name, pod_ip, system_port)| {
let daemon = self.clone();
async move {
match daemon.fetch_metadata(&pod_name, &pod_ip, system_port).await {
Ok(metadata) => Some((instance_id, metadata)),
Err(e) => {
tracing::warn!(
"Failed to fetch metadata for pod {} (instance_id={:x}): {}",
pod_name,
instance_id,
e
);
None
}
}
}
});
// Single read of CR state to extract metadata and generations atomically
// We store (metadata, generation) tuples keyed by CR name (= pod name)
let cr_state = cr_reader.state();
let mut cr_map: HashMap<String, (Arc<DiscoveryMetadata>, i64)> = HashMap::new();
for arc_cr in cr_state.iter() {
let Some(cr_name) = arc_cr.metadata.name.as_ref() else {
continue;
};
let generation = arc_cr.metadata.generation.unwrap_or(0);
// Deserialize the data field to DiscoveryMetadata
match serde_json::from_value::<DiscoveryMetadata>(arc_cr.spec.data.clone()) {
Ok(metadata) => {
tracing::trace!("Loaded metadata from CR '{}'", cr_name);
cr_map.insert(cr_name.clone(), (Arc::new(metadata), generation));
}
Err(e) => {
tracing::warn!(
"Failed to deserialize metadata from CR '{}': {}",
cr_name,
e
);
}
}
}
// Execute fetches concurrently with bounded parallelism
let results: Vec<_> = futures::stream::iter(fetch_futures)
.buffer_unordered(MAX_CONCURRENT_FETCHES)
.collect()
.await;
tracing::trace!("Daemon loaded {} DynamoWorkerMetadata CRs", cr_map.len());
// Build the snapshot
let instances: HashMap<u64, Arc<DiscoveryMetadata>> =
results.into_iter().flatten().collect();
// Correlate: ready pod + CR exists = include in snapshot
// Both instances and generations are keyed by instance_id with matching keys
let mut instances: HashMap<u64, Arc<DiscoveryMetadata>> = HashMap::new();
let mut generations: HashMap<u64, i64> = HashMap::new();
for (instance_id, pod_name) in ready_pods {
// CR name is the pod name
if let Some((metadata, generation)) = cr_map.get(&pod_name) {
instances.insert(instance_id, metadata.clone());
generations.insert(instance_id, *generation);
tracing::trace!(
"Included pod '{}' (instance_id={:x}, generation={}) in snapshot",
pod_name,
instance_id,
generation
);
} else {
tracing::trace!(
"Skipping pod '{}' (instance_id={:x}): no DynamoWorkerMetadata CR found",
pod_name,
instance_id
);
}
}
let elapsed = start.elapsed();
......@@ -237,83 +271,9 @@ impl DiscoveryDaemon {
Ok(MetadataSnapshot {
instances,
generations,
sequence,
timestamp: std::time::Instant::now(),
})
}
/// Fetch metadata for a single pod (with caching)
async fn fetch_metadata(
&self,
pod_name: &str,
pod_ip: &str,
system_port: u16,
) -> Result<Arc<DiscoveryMetadata>> {
let instance_id = hash_pod_name(pod_name);
// Check cache
{
let cache = self.cache.read().await;
if let Some(cached) = cache.get(&instance_id) {
tracing::trace!(
"Cache hit for pod_name={}, instance_id={:x}",
pod_name,
instance_id
);
return Ok(cached.clone());
}
}
// Cache miss: fetch from HTTP
let url = format!("http://{}:{}/metadata", pod_ip, system_port);
tracing::debug!("Fetching metadata from {url}");
let response = self
.http_client
.get(&url)
.send()
.await
.map_err(|e| anyhow::anyhow!("Failed to fetch metadata from {}: {}", url, e))?;
let metadata: DiscoveryMetadata = response
.json()
.await
.map_err(|e| anyhow::anyhow!("Failed to parse metadata from {}: {}", url, e))?;
let metadata = Arc::new(metadata);
// Cache it
{
let mut cache = self.cache.write().await;
// Check again in case another task inserted while we were fetching
if let Some(existing) = cache.get(&instance_id) {
tracing::debug!(
"Another task cached metadata for instance_id={:x} while we were fetching",
instance_id
);
return Ok(existing.clone());
}
cache.insert(instance_id, metadata.clone());
tracing::debug!(
"Cached metadata for pod_name={}, instance_id={:x}",
pod_name,
instance_id
);
}
Ok(metadata)
}
/// Prune cache entries for removed instances
async fn prune_cache(&self, removed_ids: &[u64]) {
let mut cache = self.cache.write().await;
for id in removed_ids {
if cache.remove(id).is_some() {
tracing::debug!("Pruned cache for removed instance_id={:x}", id);
}
}
}
}
......@@ -13,38 +13,12 @@ pub fn hash_pod_name(pod_name: &str) -> u64 {
hasher.finish()
}
/// Extract the system port from an EndpointSlice's ports
/// Looks for a port with name "system", returns None if not found
fn extract_system_port(slice: &EndpointSlice) -> Option<u16> {
slice.ports.as_ref().and_then(|ports| {
ports
.iter()
.find(|p| p.name.as_deref() == Some("system"))
.and_then(|p| p.port.map(|port| port as u16))
})
}
/// Extract endpoint information from an EndpointSlice
/// Returns (instance_id, pod_name, pod_ip, system_port) tuples for ready endpoints
pub(super) fn extract_endpoint_info(slice: &EndpointSlice) -> Vec<(u64, String, String, u16)> {
// Extract system port from EndpointSlice ports
let system_port = match extract_system_port(slice) {
Some(port) => port,
None => {
let slice_name = slice.metadata.name.as_deref().unwrap_or("unknown");
tracing::warn!(
"EndpointSlice '{}' did not have a system port defined",
slice_name
);
return Vec::new();
}
};
/// Returns (instance_id, pod_name) tuples for ready endpoints
pub(super) fn extract_endpoint_info(slice: &EndpointSlice) -> Vec<(u64, String)> {
let mut result = Vec::new();
let endpoints = &slice.endpoints;
for endpoint in endpoints {
// Check if endpoint is ready
for endpoint in &slice.endpoints {
let is_ready = endpoint
.conditions
.as_ref()
......@@ -55,7 +29,6 @@ pub(super) fn extract_endpoint_info(slice: &EndpointSlice) -> Vec<(u64, String,
continue;
}
// Get pod name from targetRef
let pod_name = match endpoint.target_ref.as_ref() {
Some(target_ref) => target_ref.name.as_deref().unwrap_or(""),
None => continue,
......@@ -67,10 +40,7 @@ pub(super) fn extract_endpoint_info(slice: &EndpointSlice) -> Vec<(u64, String,
let instance_id = hash_pod_name(pod_name);
// Get first IP only (avoid duplicate instance IDs)
if let Some(ip) = endpoint.addresses.first() {
result.push((instance_id, pod_name.to_string(), ip.clone(), system_port));
}
result.push((instance_id, pod_name.to_string()));
}
result
......@@ -81,15 +51,24 @@ pub(super) fn extract_endpoint_info(slice: &EndpointSlice) -> Vec<(u64, String,
pub(super) struct PodInfo {
pub pod_name: String,
pub pod_namespace: String,
pub pod_uid: String,
pub system_port: u16,
}
impl PodInfo {
/// Discover pod information from environment variables
///
/// Required environment variables:
/// - `POD_NAME`: Name of the pod (required)
/// - `POD_UID`: UID of the pod (required for CR owner reference)
/// - `POD_NAMESPACE`: Namespace of the pod (defaults to "default")
pub fn from_env() -> Result<Self> {
let pod_name = std::env::var("POD_NAME")
.map_err(|_| anyhow::anyhow!("POD_NAME environment variable not set"))?;
let pod_uid = std::env::var("POD_UID")
.map_err(|_| anyhow::anyhow!("POD_UID environment variable not set"))?;
let pod_namespace = std::env::var("POD_NAMESPACE").unwrap_or_else(|_| {
tracing::warn!("POD_NAMESPACE not set, defaulting to 'default'");
"default".to_string()
......@@ -102,6 +81,7 @@ impl PodInfo {
Ok(Self {
pod_name,
pod_namespace,
pod_uid,
system_port,
})
}
......
......@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use anyhow::Result;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use super::{DiscoveryInstance, DiscoveryQuery};
......@@ -223,6 +223,9 @@ fn filter_instances(
pub struct MetadataSnapshot {
/// Map of instance_id -> metadata
pub instances: HashMap<u64, Arc<DiscoveryMetadata>>,
/// Map of instance_id -> CR generation for change detection
/// Keys match `instances` keys exactly - only ready pods with CRs are included
pub generations: HashMap<u64, i64>,
/// Sequence number for debugging
pub sequence: u64,
/// Timestamp for observability
......@@ -233,11 +236,56 @@ impl MetadataSnapshot {
pub fn empty() -> Self {
Self {
instances: HashMap::new(),
generations: HashMap::new(),
sequence: 0,
timestamp: std::time::Instant::now(),
}
}
/// Compare with previous snapshot and return true if changed.
/// Logs diagnostic info about what changed.
/// This is done on the basis of the generation of the DynamoWorkerMetadata CRs that are owned by ready workers
pub fn has_changes_from(&self, prev: &MetadataSnapshot) -> bool {
if self.generations == prev.generations {
tracing::trace!(
"Snapshot (seq={}): no changes, {} instances",
self.sequence,
self.instances.len()
);
return false;
}
// Compute diff for logging
let curr_ids: HashSet<u64> = self.generations.keys().copied().collect();
let prev_ids: HashSet<u64> = prev.generations.keys().copied().collect();
let added: Vec<_> = curr_ids
.difference(&prev_ids)
.map(|id| format!("{:x}", id))
.collect();
let removed: Vec<_> = prev_ids
.difference(&curr_ids)
.map(|id| format!("{:x}", id))
.collect();
let updated: Vec<_> = self
.generations
.iter()
.filter(|(k, v)| prev.generations.get(*k).is_some_and(|pv| pv != *v))
.map(|(k, _)| format!("{:x}", k))
.collect();
tracing::info!(
"Snapshot (seq={}): {} instances, added={:?}, removed={:?}, updated={:?}",
self.sequence,
self.instances.len(),
added,
removed,
updated
);
true
}
/// Filter all instances in the snapshot by query
pub fn filter(&self, query: &DiscoveryQuery) -> Vec<DiscoveryInstance> {
self.instances
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment