daemon.rs 10.2 KB
Newer Older
1
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
4
5
6
7
8
9
10
11
12
13
14
// SPDX-License-Identifier: Apache-2.0

use crate::CancellationToken;
use crate::discovery::{DiscoveryMetadata, MetadataSnapshot};
use anyhow::Result;
use futures::StreamExt;
use k8s_openapi::api::discovery::v1::EndpointSlice;
use kube::{
    Api, Client as KubeClient,
    runtime::{WatchStreamExt, reflector, watcher, watcher::Config},
};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
15
16
use tokio::sync::Notify;
use tokio::time::{Duration, timeout};
17

18
19
use super::crd::DynamoWorkerMetadata;
use super::utils::{PodInfo, extract_endpoint_info};
20

21
const DEBOUNCE_DURATION: Duration = Duration::from_millis(500);
22

23
/// Discovers and aggregates metadata from DynamoWorkerMetadata CRs in the cluster
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#[derive(Clone)]
pub(super) struct DiscoveryDaemon {
    kube_client: KubeClient,
    // This pod's info
    pod_info: PodInfo,
    cancel_token: CancellationToken,
}

impl DiscoveryDaemon {
    pub fn new(
        kube_client: KubeClient,
        pod_info: PodInfo,
        cancel_token: CancellationToken,
    ) -> Result<Self> {
        Ok(Self {
            kube_client,
            pod_info,
            cancel_token,
        })
    }

    /// Run the discovery daemon
46
47
48
49
50
51
    ///
    /// Watches both EndpointSlices (to know which pods are ready) and
    /// DynamoWorkerMetadata CRs (to get the metadata for each pod).
    /// A pod is included in the snapshot only if:
    /// 1. It appears as ready in an EndpointSlice
    /// 2. It has a corresponding DynamoWorkerMetadata CR
52
53
54
55
56
57
    pub async fn run(
        self,
        watch_tx: tokio::sync::watch::Sender<Arc<MetadataSnapshot>>,
    ) -> Result<()> {
        tracing::info!("Discovery daemon starting");

58
59
60
61
        // Create notify for watch-driven updates (shared by both reflectors)
        let notify = Arc::new(Notify::new());

        // --- EndpointSlice Reflector ---
62
63
64
        let endpoint_slices: Api<EndpointSlice> =
            Api::namespaced(self.kube_client.clone(), &self.pod_info.pod_namespace);

65
        let (ep_reader, ep_writer) = reflector::store();
66

67
        let ep_watch_config = Config::default()
68
69
            .labels("nvidia.com/dynamo-discovery-backend=kubernetes")
            .labels("nvidia.com/dynamo-discovery-enabled=true");
70
71

        tracing::info!(
72
            "Daemon watching EndpointSlices with labels: nvidia.com/dynamo-discovery-backend=kubernetes, nvidia.com/dynamo-discovery-enabled=true"
73
74
        );

75
76
        let notify_ep = notify.clone();
        let ep_reflector_stream = reflector(ep_writer, watcher(endpoint_slices, ep_watch_config))
77
78
            .default_backoff()
            .touched_objects()
79
            .for_each(move |res| {
80
81
82
83
                match res {
                    Ok(obj) => {
                        tracing::debug!(
                            slice_name = obj.metadata.name.as_deref().unwrap_or("unknown"),
84
                            "EndpointSlice reflector updated"
85
                        );
86
                        notify_ep.notify_one();
87
88
                    }
                    Err(e) => {
89
                        tracing::warn!("EndpointSlice reflector error: {e}");
90
                        notify_ep.notify_one();
91
92
                    }
                }
93
                // for_each expects a Future; ready(()) is an immediately-complete one
94
95
96
                futures::future::ready(())
            });

97
98
99
100
101
102
103
        tokio::spawn(ep_reflector_stream);

        // --- DynamoWorkerMetadata CR Reflector ---
        let metadata_crs: Api<DynamoWorkerMetadata> =
            Api::namespaced(self.kube_client.clone(), &self.pod_info.pod_namespace);

        let (cr_reader, cr_writer) = reflector::store();
104

105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
        // Watch all DynamoWorkerMetadata CRs in the namespace
        let cr_watch_config = Config::default();

        tracing::info!(
            "Daemon watching DynamoWorkerMetadata CRs in namespace: {}",
            self.pod_info.pod_namespace
        );

        let notify_cr = notify.clone();
        let cr_reflector_stream = reflector(cr_writer, watcher(metadata_crs, cr_watch_config))
            .default_backoff()
            .touched_objects()
            .for_each(move |res| {
                match res {
                    Ok(obj) => {
                        tracing::debug!(
                            cr_name = obj.metadata.name.as_deref().unwrap_or("unknown"),
                            "DynamoWorkerMetadata CR reflector updated"
                        );
                        notify_cr.notify_one();
                    }
                    Err(e) => {
127
                        tracing::warn!("DynamoWorkerMetadata CR reflector error: {e}");
128
129
130
131
132
133
134
135
136
137
                        notify_cr.notify_one();
                    }
                }
                // for_each expects a Future; ready(()) is an immediately-complete one
                futures::future::ready(())
            });

        tokio::spawn(cr_reflector_stream);

        // Event-driven loop with debouncing
138
        let mut sequence = 0u64;
139
        let mut prev_snapshot = MetadataSnapshot::empty();
140
141
142

        loop {
            tokio::select! {
143
144
145
146
147
148
149
150
151
152
153
                _ = notify.notified() => {
                    // Debounce: K8s can emit many events in quick succession
                    // Wait briefly to batch them into a single snapshot update.
                    tokio::time::sleep(DEBOUNCE_DURATION).await;

                    // Drain any permit that accumulated during the sleep
                    let _ = timeout(Duration::ZERO, notify.notified()).await;

                    tracing::trace!("Debounce window elapsed, processing snapshot");

                    match self.aggregate_snapshot(&ep_reader, &cr_reader, sequence).await {
154
                        Ok(snapshot) => {
155
156
                            if snapshot.has_changes_from(&prev_snapshot) {
                                prev_snapshot = snapshot.clone();
157
158
159
160
161
162
163
164
165
166

                                if watch_tx.send(Arc::new(snapshot)).is_err() {
                                    tracing::debug!("No watch subscribers, daemon stopping");
                                    break;
                                }
                            }

                            sequence += 1;
                        }
                        Err(e) => {
167
                            tracing::error!("Failed to aggregate snapshot: {e}");
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
                            // Continue on errors - don't crash daemon
                        }
                    }
                }
                _ = self.cancel_token.cancelled() => {
                    tracing::info!("Discovery daemon received cancellation");
                    break;
                }
            }
        }

        tracing::info!("Discovery daemon stopped");
        Ok(())
    }

183
184
185
186
187
    /// Aggregate metadata from EndpointSlices and DynamoWorkerMetadata CRs into a snapshot
    ///
    /// A pod is included in the snapshot only if:
    /// 1. It appears as ready in an EndpointSlice
    /// 2. It has a corresponding DynamoWorkerMetadata CR (CR name = pod name)
188
189
    async fn aggregate_snapshot(
        &self,
190
191
        ep_reader: &reflector::Store<EndpointSlice>,
        cr_reader: &reflector::Store<DynamoWorkerMetadata>,
192
193
194
195
        sequence: u64,
    ) -> Result<MetadataSnapshot> {
        let start = std::time::Instant::now();

196
197
        // Extract ready pods from EndpointSlices: (instance_id, pod_name)
        let ready_pods: Vec<(u64, String)> = ep_reader
198
199
200
201
202
203
            .state()
            .iter()
            .flat_map(|arc_slice| extract_endpoint_info(arc_slice.as_ref()))
            .collect();

        tracing::trace!(
204
205
            "Daemon found {} ready pods from EndpointSlices",
            ready_pods.len()
206
207
        );

208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
        // Single read of CR state to extract metadata and generations atomically
        // We store (metadata, generation) tuples keyed by CR name (= pod name)
        let cr_state = cr_reader.state();
        let mut cr_map: HashMap<String, (Arc<DiscoveryMetadata>, i64)> = HashMap::new();

        for arc_cr in cr_state.iter() {
            let Some(cr_name) = arc_cr.metadata.name.as_ref() else {
                continue;
            };

            let generation = arc_cr.metadata.generation.unwrap_or(0);

            // Deserialize the data field to DiscoveryMetadata
            match serde_json::from_value::<DiscoveryMetadata>(arc_cr.spec.data.clone()) {
                Ok(metadata) => {
223
                    tracing::trace!("Loaded metadata from CR '{cr_name}'");
224
225
226
227
228
229
230
231
232
233
234
                    cr_map.insert(cr_name.clone(), (Arc::new(metadata), generation));
                }
                Err(e) => {
                    tracing::warn!(
                        "Failed to deserialize metadata from CR '{}': {}",
                        cr_name,
                        e
                    );
                }
            }
        }
235

236
        tracing::trace!("Daemon loaded {} DynamoWorkerMetadata CRs", cr_map.len());
237

238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
        // Correlate: ready pod + CR exists = include in snapshot
        // Both instances and generations are keyed by instance_id with matching keys
        let mut instances: HashMap<u64, Arc<DiscoveryMetadata>> = HashMap::new();
        let mut generations: HashMap<u64, i64> = HashMap::new();

        for (instance_id, pod_name) in ready_pods {
            // CR name is the pod name
            if let Some((metadata, generation)) = cr_map.get(&pod_name) {
                instances.insert(instance_id, metadata.clone());
                generations.insert(instance_id, *generation);
                tracing::trace!(
                    "Included pod '{}' (instance_id={:x}, generation={}) in snapshot",
                    pod_name,
                    instance_id,
                    generation
                );
            } else {
                tracing::trace!(
                    "Skipping pod '{}' (instance_id={:x}): no DynamoWorkerMetadata CR found",
                    pod_name,
                    instance_id
                );
            }
        }
262
263
264
265
266
267
268
269
270
271
272
273

        let elapsed = start.elapsed();

        tracing::trace!(
            "Daemon snapshot complete (seq={}): {} instances in {:?}",
            sequence,
            instances.len(),
            elapsed
        );

        Ok(MetadataSnapshot {
            instances,
274
            generations,
275
276
277
278
279
            sequence,
            timestamp: std::time::Instant::now(),
        })
    }
}