client.rs 15.8 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3

4
5
6
7
8
9
10
11
use std::sync::Arc;
use std::{collections::HashMap, time::Duration};

use anyhow::Result;
use arc_swap::ArcSwap;
use futures::StreamExt;
use tokio::net::unix::pipe::Receiver;

12
use crate::{
13
14
    component::{Endpoint, Instance},
    pipeline::async_trait,
15
16
17
18
19
    pipeline::{
        AddressedPushRouter, AddressedRequest, AsyncEngine, Data, ManyOut, PushRouter, RouterMode,
        SingleIn,
    },
    storage::key_value_store::{KeyValueStoreManager, WatchEvent},
20
21
    traits::DistributedRuntimeProvider,
    transports::etcd::Client as EtcdClient,
Ryan Olson's avatar
Ryan Olson committed
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
};

/// Each state will be have a nonce associated with it
/// The state will be emitted in a watch channel, so we can observe the
/// critical state transitions.
enum MapState {
    /// The map is empty; value = nonce
    Empty(u64),

    /// The map is not-empty; values are (nonce, count)
    NonEmpty(u64, u64),

    /// The watcher has finished, no more events will be emitted
    Finished,
}

enum EndpointEvent {
39
    Put(String, u64),
Ryan Olson's avatar
Ryan Olson committed
40
41
42
    Delete(String),
}

43
44
45
46
#[derive(Clone, Debug)]
pub struct Client {
    // This is me
    pub endpoint: Endpoint,
47
    // These are the remotes I know about from watching etcd
48
    pub instance_source: Arc<InstanceSource>,
49
    // These are the instance source ids less those reported as down from sending rpc
50
    instance_avail: Arc<ArcSwap<Vec<u64>>>,
51
    // These are the instance source ids less those reported as busy (above threshold)
52
    instance_free: Arc<ArcSwap<Vec<u64>>>,
53
54
55
}

#[derive(Clone, Debug)]
56
pub enum InstanceSource {
57
    Static,
58
    Dynamic(tokio::sync::watch::Receiver<Vec<Instance>>),
Ryan Olson's avatar
Ryan Olson committed
59
60
}

61
impl Client {
62
63
64
65
    // Client will only talk to a single static endpoint
    pub(crate) async fn new_static(endpoint: Endpoint) -> Result<Self> {
        Ok(Client {
            endpoint,
66
            instance_source: Arc::new(InstanceSource::Static),
67
            instance_avail: Arc::new(ArcSwap::from(Arc::new(vec![]))),
68
            instance_free: Arc::new(ArcSwap::from(Arc::new(vec![]))),
69
70
        })
    }
Ryan Olson's avatar
Ryan Olson committed
71

72
    // Client with auto-discover instances using etcd
73
    pub(crate) async fn new_dynamic(endpoint: Endpoint) -> Result<Self> {
74
75
76
77
        tracing::debug!(
            "Client::new_dynamic: Creating dynamic client for endpoint: {}",
            endpoint.path()
        );
78
79
        const INSTANCE_REFRESH_PERIOD: Duration = Duration::from_secs(1);

80
        let instance_source = Self::get_or_create_dynamic_instance_source(&endpoint).await?;
81
82
83
84
        tracing::debug!(
            "Client::new_dynamic: Got instance source for endpoint: {}",
            endpoint.path()
        );
85

86
        let client = Client {
87
            endpoint: endpoint.clone(),
88
            instance_source: instance_source.clone(),
89
            instance_avail: Arc::new(ArcSwap::from(Arc::new(vec![]))),
90
            instance_free: Arc::new(ArcSwap::from(Arc::new(vec![]))),
91
        };
92
93
94
95
        tracing::debug!(
            "Client::new_dynamic: Starting instance source monitor for endpoint: {}",
            endpoint.path()
        );
96
        client.monitor_instance_source();
97
98
99
100
        tracing::debug!(
            "Client::new_dynamic: Successfully created dynamic client for endpoint: {}",
            endpoint.path()
        );
101
        Ok(client)
102
103
104
105
106
107
108
109
110
111
112
    }

    pub fn path(&self) -> String {
        self.endpoint.path()
    }

    /// The root etcd path we watch in etcd to discover new instances to route to.
    pub fn etcd_root(&self) -> String {
        self.endpoint.etcd_root()
    }

113
    /// Instances available from watching etcd
114
    pub fn instances(&self) -> Vec<Instance> {
115
116
117
118
        match self.instance_source.as_ref() {
            InstanceSource::Static => vec![],
            InstanceSource::Dynamic(watch_rx) => watch_rx.borrow().clone(),
        }
119
120
    }

121
    pub fn instance_ids(&self) -> Vec<u64> {
122
123
124
        self.instances().into_iter().map(|ep| ep.id()).collect()
    }

125
    pub fn instance_ids_avail(&self) -> arc_swap::Guard<Arc<Vec<u64>>> {
126
127
128
        self.instance_avail.load()
    }

129
    pub fn instance_ids_free(&self) -> arc_swap::Guard<Arc<Vec<u64>>> {
130
131
132
        self.instance_free.load()
    }

133
134
    /// Wait for at least one Instance to be available for this Endpoint
    pub async fn wait_for_instances(&self) -> Result<Vec<Instance>> {
135
136
137
138
        tracing::debug!(
            "wait_for_instances: Starting wait for endpoint: {}",
            self.endpoint.path()
        );
139
140
141
        let mut instances: Vec<Instance> = vec![];
        if let InstanceSource::Dynamic(mut rx) = self.instance_source.as_ref().clone() {
            // wait for there to be 1 or more endpoints
142
            let mut iteration = 0;
143
144
            loop {
                instances = rx.borrow_and_update().to_vec();
145
146
147
148
149
150
                tracing::debug!(
                    "wait_for_instances: iteration={}, current_instance_count={}, endpoint={}",
                    iteration,
                    instances.len(),
                    self.endpoint.path()
                );
151
                if instances.is_empty() {
152
153
154
155
                    tracing::debug!(
                        "wait_for_instances: No instances yet, waiting for change notification for endpoint: {}",
                        self.endpoint.path()
                    );
156
                    rx.changed().await?;
157
158
159
160
                    tracing::debug!(
                        "wait_for_instances: Change notification received for endpoint: {}",
                        self.endpoint.path()
                    );
161
                } else {
162
163
164
165
166
                    tracing::info!(
                        "wait_for_instances: Found {} instance(s) for endpoint: {}",
                        instances.len(),
                        self.endpoint.path()
                    );
167
168
                    break;
                }
169
                iteration += 1;
170
            }
171
172
173
174
175
        } else {
            tracing::debug!(
                "wait_for_instances: Static instance source, no dynamic discovery for endpoint: {}",
                self.endpoint.path()
            );
176
177
178
179
        }
        Ok(instances)
    }

180
181
182
    /// Is this component know at startup and not discovered via etcd?
    pub fn is_static(&self) -> bool {
        matches!(self.instance_source.as_ref(), InstanceSource::Static)
183
184
185
    }

    /// Mark an instance as down/unavailable
186
    pub fn report_instance_down(&self, instance_id: u64) {
187
188
189
190
191
192
        let filtered = self
            .instance_ids_avail()
            .iter()
            .filter_map(|&id| if id == instance_id { None } else { Some(id) })
            .collect::<Vec<_>>();
        self.instance_avail.store(Arc::new(filtered));
193
194
195
196

        tracing::debug!("inhibiting instance {instance_id}");
    }

197
    /// Update the set of free instances based on busy instance IDs
198
    pub fn update_free_instances(&self, busy_instance_ids: &[u64]) {
199
        let all_instance_ids = self.instance_ids();
200
        let free_ids: Vec<u64> = all_instance_ids
201
202
203
204
205
206
            .into_iter()
            .filter(|id| !busy_instance_ids.contains(id))
            .collect();
        self.instance_free.store(Arc::new(free_ids));
    }

207
208
209
210
    /// Monitor the ETCD instance source and update instance_avail.
    fn monitor_instance_source(&self) {
        let cancel_token = self.endpoint.drt().primary_token();
        let client = self.clone();
211
212
213
214
215
        let endpoint_path = self.endpoint.path();
        tracing::debug!(
            "monitor_instance_source: Starting monitor for endpoint: {}",
            endpoint_path
        );
216
217
218
        tokio::task::spawn(async move {
            let mut rx = match client.instance_source.as_ref() {
                InstanceSource::Static => {
219
220
221
                    tracing::error!(
                        "monitor_instance_source: Static instance source is not watchable"
                    );
222
223
224
225
                    return;
                }
                InstanceSource::Dynamic(rx) => rx.clone(),
            };
226
            let mut iteration = 0;
227
            while !cancel_token.is_cancelled() {
228
                let instance_ids: Vec<u64> = rx
229
230
231
232
                    .borrow_and_update()
                    .iter()
                    .map(|instance| instance.id())
                    .collect();
233

234
235
236
237
238
239
240
241
                tracing::debug!(
                    "monitor_instance_source: iteration={}, instance_count={}, instance_ids={:?}, endpoint={}",
                    iteration,
                    instance_ids.len(),
                    instance_ids,
                    endpoint_path
                );

242
243
                // TODO: this resets both tracked available and free instances
                client.instance_avail.store(Arc::new(instance_ids.clone()));
244
                client.instance_free.store(Arc::new(instance_ids.clone()));
245

246
247
248
249
                tracing::debug!(
                    "monitor_instance_source: instance source updated, endpoint={}",
                    endpoint_path
                );
250
251

                if let Err(err) = rx.changed().await {
252
253
254
255
256
                    tracing::error!(
                        "monitor_instance_source: The Sender is dropped: {}, endpoint={}",
                        err,
                        endpoint_path
                    );
257
258
                    cancel_token.cancel();
                }
259
                iteration += 1;
260
            }
261
262
263
264
            tracing::debug!(
                "monitor_instance_source: Monitor loop exiting for endpoint: {}",
                endpoint_path
            );
265
        });
266
267
268
269
270
271
272
273
274
    }

    async fn get_or_create_dynamic_instance_source(
        endpoint: &Endpoint,
    ) -> Result<Arc<InstanceSource>> {
        let drt = endpoint.drt();
        let instance_sources = drt.instance_sources();
        let mut instance_sources = instance_sources.lock().await;

275
276
277
278
279
        tracing::debug!(
            "get_or_create_dynamic_instance_source: Checking cache for endpoint: {}",
            endpoint.path()
        );

280
281
        if let Some(instance_source) = instance_sources.get(endpoint) {
            if let Some(instance_source) = instance_source.upgrade() {
282
283
284
285
                tracing::debug!(
                    "get_or_create_dynamic_instance_source: Found cached instance source for endpoint: {}",
                    endpoint.path()
                );
286
287
                return Ok(instance_source);
            } else {
288
289
290
291
                tracing::debug!(
                    "get_or_create_dynamic_instance_source: Cached instance source was dropped, removing for endpoint: {}",
                    endpoint.path()
                );
292
293
294
295
                instance_sources.remove(endpoint);
            }
        }

296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
        tracing::debug!(
            "get_or_create_dynamic_instance_source: Creating new instance source for endpoint: {}",
            endpoint.path()
        );

        let discovery = drt.discovery();
        let discovery_query = crate::discovery::DiscoveryQuery::Endpoint {
            namespace: endpoint.component.namespace.name.clone(),
            component: endpoint.component.name.clone(),
            endpoint: endpoint.name.clone(),
        };

        tracing::debug!(
            "get_or_create_dynamic_instance_source: Calling discovery.list_and_watch for query: {:?}",
            discovery_query
        );

        let mut discovery_stream = discovery
            .list_and_watch(discovery_query.clone(), None)
            .await?;

        tracing::debug!(
            "get_or_create_dynamic_instance_source: Got discovery stream for query: {:?}",
            discovery_query
        );

Ryan Olson's avatar
Ryan Olson committed
322
323
        let (watch_tx, watch_rx) = tokio::sync::watch::channel(vec![]);

324
        let secondary = endpoint.component.drt.runtime().secondary().clone();
Ryan Olson's avatar
Ryan Olson committed
325
326

        secondary.spawn(async move {
327
328
329
            tracing::debug!("endpoint_watcher: Starting for discovery query: {:?}", discovery_query);
            let mut map: HashMap<u64, Instance> = HashMap::new();
            let mut event_count = 0;
Ryan Olson's avatar
Ryan Olson committed
330
331

            loop {
332
                let discovery_event = tokio::select! {
Ryan Olson's avatar
Ryan Olson committed
333
                    _ = watch_tx.closed() => {
334
                        tracing::debug!("endpoint_watcher: all watchers have closed; shutting down for discovery query: {:?}", discovery_query);
Ryan Olson's avatar
Ryan Olson committed
335
336
                        break;
                    }
337
338
339
340
341
342
343
344
345
346
347
                    discovery_event = discovery_stream.next() => {
                        tracing::debug!("endpoint_watcher: Received stream event for discovery query: {:?}", discovery_query);
                        match discovery_event {
                            Some(Ok(event)) => {
                                tracing::debug!("endpoint_watcher: Got Ok event: {:?}", event);
                                event
                            },
                            Some(Err(e)) => {
                                tracing::error!("endpoint_watcher: discovery stream error: {}; shutting down for discovery query: {:?}", e, discovery_query);
                                break;
                            }
Ryan Olson's avatar
Ryan Olson committed
348
                            None => {
349
                                tracing::debug!("endpoint_watcher: watch stream has closed; shutting down for discovery query: {:?}", discovery_query);
Ryan Olson's avatar
Ryan Olson committed
350
351
352
353
354
355
                                break;
                            }
                        }
                    }
                };

356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
                event_count += 1;
                tracing::debug!("endpoint_watcher: Processing event #{} for discovery query: {:?}", event_count, discovery_query);

                match discovery_event {
                    crate::discovery::DiscoveryEvent::Added(discovery_instance) => {
                        match discovery_instance {
                            crate::discovery::DiscoveryInstance::Endpoint(instance) => {
                                tracing::debug!(
                                    "endpoint_watcher: Added endpoint instance_id={}, namespace={}, component={}, endpoint={}",
                                    instance.instance_id,
                                    instance.namespace,
                                    instance.component,
                                    instance.endpoint
                                );
                                map.insert(instance.instance_id, instance);
                            }
                            _ => {
                                tracing::debug!("endpoint_watcher: Ignoring non-endpoint instance (Model, etc.) for discovery query: {:?}", discovery_query);
Ryan Olson's avatar
Ryan Olson committed
374
375
                            }
                        }
376
377
378
379
380
381
382
383
                    }
                    crate::discovery::DiscoveryEvent::Removed(instance_id) => {
                        tracing::debug!(
                            "endpoint_watcher: Removed instance_id={} for discovery query: {:?}",
                            instance_id,
                            discovery_query
                        );
                        map.remove(&instance_id);
Ryan Olson's avatar
Ryan Olson committed
384
385
386
                    }
                }

387
                let instances: Vec<Instance> = map.values().cloned().collect();
388
389
390
391
392
                tracing::debug!(
                    "endpoint_watcher: Current map size={}, sending update for discovery query: {:?}",
                    instances.len(),
                    discovery_query
                );
Ryan Olson's avatar
Ryan Olson committed
393

394
                if watch_tx.send(instances).is_err() {
395
                    tracing::debug!("endpoint_watcher: Unable to send watch updates; shutting down for discovery query: {:?}", discovery_query);
Ryan Olson's avatar
Ryan Olson committed
396
397
398
399
                    break;
                }
            }

400
            tracing::debug!("endpoint_watcher: Completed for discovery query: {:?}, total events processed: {}", discovery_query, event_count);
Ryan Olson's avatar
Ryan Olson committed
401
402
403
            let _ = watch_tx.send(vec![]);
        });

404
405
        let instance_source = Arc::new(InstanceSource::Dynamic(watch_rx));
        instance_sources.insert(endpoint.clone(), Arc::downgrade(&instance_source));
406
407
408
409
        tracing::debug!(
            "get_or_create_dynamic_instance_source: Successfully created and cached instance source for endpoint: {}",
            endpoint.path()
        );
410
        Ok(instance_source)
411
    }
Ryan Olson's avatar
Ryan Olson committed
412
}