"vscode:/vscode.git/clone" did not exist on "2a5eb7e7785814f5b9b6051f790233e1b1c28207"
client.rs 9.07 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3

4
5
6
7
8
9
10
11
use std::sync::Arc;
use std::{collections::HashMap, time::Duration};

use anyhow::Result;
use arc_swap::ArcSwap;
use futures::StreamExt;
use tokio::net::unix::pipe::Receiver;

12
use crate::discovery::{DiscoveryEvent, DiscoveryInstance};
13
use crate::{
14
15
    component::{Endpoint, Instance},
    pipeline::async_trait,
16
17
18
19
    pipeline::{
        AddressedPushRouter, AddressedRequest, AsyncEngine, Data, ManyOut, PushRouter, RouterMode,
        SingleIn,
    },
20
21
    traits::DistributedRuntimeProvider,
    transports::etcd::Client as EtcdClient,
Ryan Olson's avatar
Ryan Olson committed
22
23
};

24
25
26
27
#[derive(Clone, Debug)]
pub struct Client {
    // This is me
    pub endpoint: Endpoint,
28
29
    // These are the remotes I know about from watching key-value store
    pub instance_source: Arc<tokio::sync::watch::Receiver<Vec<Instance>>>,
30
    // These are the instance source ids less those reported as down from sending rpc
31
    instance_avail: Arc<ArcSwap<Vec<u64>>>,
32
    // These are the instance source ids less those reported as busy (above threshold)
33
    instance_free: Arc<ArcSwap<Vec<u64>>>,
34
35
36
37
    // Watch sender for available instance IDs (for sending updates)
    instance_avail_tx: Arc<tokio::sync::watch::Sender<Vec<u64>>>,
    // Watch receiver for available instance IDs (for cloning to external subscribers)
    instance_avail_rx: tokio::sync::watch::Receiver<Vec<u64>>,
38
39
}

40
impl Client {
41
42
    // Client with auto-discover instances using key-value store
    pub(crate) async fn new(endpoint: Endpoint) -> Result<Self> {
43
        tracing::trace!(
44
            "Client::new_dynamic: Creating dynamic client for endpoint: {}",
45
            endpoint.id()
46
        );
47
        let instance_source = Self::get_or_create_dynamic_instance_source(&endpoint).await?;
48

49
        let (avail_tx, avail_rx) = tokio::sync::watch::channel(vec![]);
50
        let client = Client {
51
            endpoint: endpoint.clone(),
52
            instance_source: instance_source.clone(),
53
            instance_avail: Arc::new(ArcSwap::from(Arc::new(vec![]))),
54
            instance_free: Arc::new(ArcSwap::from(Arc::new(vec![]))),
55
56
            instance_avail_tx: Arc::new(avail_tx),
            instance_avail_rx: avail_rx,
57
        };
58
        client.monitor_instance_source();
59
        Ok(client)
60
61
    }

62
    /// Instances available from watching key-value store
63
    pub fn instances(&self) -> Vec<Instance> {
64
        self.instance_source.borrow().clone()
65
66
    }

67
    pub fn instance_ids(&self) -> Vec<u64> {
68
69
70
        self.instances().into_iter().map(|ep| ep.id()).collect()
    }

71
    pub fn instance_ids_avail(&self) -> arc_swap::Guard<Arc<Vec<u64>>> {
72
73
74
        self.instance_avail.load()
    }

75
    pub fn instance_ids_free(&self) -> arc_swap::Guard<Arc<Vec<u64>>> {
76
77
78
        self.instance_free.load()
    }

79
80
81
82
83
    /// Get a watcher for available instance IDs
    pub fn instance_avail_watcher(&self) -> tokio::sync::watch::Receiver<Vec<u64>> {
        self.instance_avail_rx.clone()
    }

84
85
    /// Wait for at least one Instance to be available for this Endpoint
    pub async fn wait_for_instances(&self) -> Result<Vec<Instance>> {
86
        tracing::trace!(
87
            "wait_for_instances: Starting wait for endpoint: {}",
88
            self.endpoint.id()
89
        );
90
91
92
93
94
95
96
97
98
99
        let mut rx = self.instance_source.as_ref().clone();
        // wait for there to be 1 or more endpoints
        let mut instances: Vec<Instance>;
        loop {
            instances = rx.borrow_and_update().to_vec();
            if instances.is_empty() {
                rx.changed().await?;
            } else {
                tracing::info!(
                    "wait_for_instances: Found {} instance(s) for endpoint: {}",
100
                    instances.len(),
101
                    self.endpoint.id()
102
                );
103
                break;
104
105
106
107
108
            }
        }
        Ok(instances)
    }

109
    /// Mark an instance as down/unavailable
110
    pub fn report_instance_down(&self, instance_id: u64) {
111
112
113
114
115
        let filtered = self
            .instance_ids_avail()
            .iter()
            .filter_map(|&id| if id == instance_id { None } else { Some(id) })
            .collect::<Vec<_>>();
116
117
118
119
        self.instance_avail.store(Arc::new(filtered.clone()));

        // Notify watch channel subscribers about the change
        let _ = self.instance_avail_tx.send(filtered);
120
121
122
123

        tracing::debug!("inhibiting instance {instance_id}");
    }

124
    /// Update the set of free instances based on busy instance IDs
125
    pub fn update_free_instances(&self, busy_instance_ids: &[u64]) {
126
        let all_instance_ids = self.instance_ids();
127
        let free_ids: Vec<u64> = all_instance_ids
128
129
130
131
132
133
            .into_iter()
            .filter(|id| !busy_instance_ids.contains(id))
            .collect();
        self.instance_free.store(Arc::new(free_ids));
    }

134
    /// Monitor the key-value instance source and update instance_avail.
135
136
137
    fn monitor_instance_source(&self) {
        let cancel_token = self.endpoint.drt().primary_token();
        let client = self.clone();
138
        let endpoint_id = self.endpoint.id();
139
        tokio::task::spawn(async move {
140
            let mut rx = client.instance_source.as_ref().clone();
141
            while !cancel_token.is_cancelled() {
142
                let instance_ids: Vec<u64> = rx
143
144
145
146
                    .borrow_and_update()
                    .iter()
                    .map(|instance| instance.id())
                    .collect();
147
148
149

                // TODO: this resets both tracked available and free instances
                client.instance_avail.store(Arc::new(instance_ids.clone()));
150
                client.instance_free.store(Arc::new(instance_ids.clone()));
151

152
153
154
                // Send update to watch channel subscribers
                let _ = client.instance_avail_tx.send(instance_ids);

155
                if let Err(err) = rx.changed().await {
156
                    tracing::error!(
157
                        "monitor_instance_source: The Sender is dropped: {err}, endpoint={endpoint_id}",
158
                    );
159
160
161
162
                    cancel_token.cancel();
                }
            }
        });
163
164
165
166
    }

    async fn get_or_create_dynamic_instance_source(
        endpoint: &Endpoint,
167
    ) -> Result<Arc<tokio::sync::watch::Receiver<Vec<Instance>>>> {
168
169
170
171
172
173
174
175
176
177
178
179
        let drt = endpoint.drt();
        let instance_sources = drt.instance_sources();
        let mut instance_sources = instance_sources.lock().await;

        if let Some(instance_source) = instance_sources.get(endpoint) {
            if let Some(instance_source) = instance_source.upgrade() {
                return Ok(instance_source);
            } else {
                instance_sources.remove(endpoint);
            }
        }

180
181
182
183
184
185
186
187
188
189
        let discovery = drt.discovery();
        let discovery_query = crate::discovery::DiscoveryQuery::Endpoint {
            namespace: endpoint.component.namespace.name.clone(),
            component: endpoint.component.name.clone(),
            endpoint: endpoint.name.clone(),
        };

        let mut discovery_stream = discovery
            .list_and_watch(discovery_query.clone(), None)
            .await?;
Ryan Olson's avatar
Ryan Olson committed
190
191
        let (watch_tx, watch_rx) = tokio::sync::watch::channel(vec![]);

192
        let secondary = endpoint.component.drt.runtime().secondary().clone();
Ryan Olson's avatar
Ryan Olson committed
193
194

        secondary.spawn(async move {
195
            tracing::trace!("endpoint_watcher: Starting for discovery query: {:?}", discovery_query);
196
            let mut map: HashMap<u64, Instance> = HashMap::new();
Ryan Olson's avatar
Ryan Olson committed
197
198

            loop {
199
                let discovery_event = tokio::select! {
Ryan Olson's avatar
Ryan Olson committed
200
201
202
                    _ = watch_tx.closed() => {
                        break;
                    }
203
204
205
206
207
208
209
210
211
                    discovery_event = discovery_stream.next() => {
                        match discovery_event {
                            Some(Ok(event)) => {
                                event
                            },
                            Some(Err(e)) => {
                                tracing::error!("endpoint_watcher: discovery stream error: {}; shutting down for discovery query: {:?}", e, discovery_query);
                                break;
                            }
Ryan Olson's avatar
Ryan Olson committed
212
213
214
215
216
217
218
                            None => {
                                break;
                            }
                        }
                    }
                };

219
                match discovery_event {
220
221
222
                    DiscoveryEvent::Added(discovery_instance) => {
                        if let DiscoveryInstance::Endpoint(instance) = discovery_instance {

223
                                map.insert(instance.instance_id, instance);
Ryan Olson's avatar
Ryan Olson committed
224
                        }
225
                    }
226
                    DiscoveryEvent::Removed(instance_id) => {
227
                        map.remove(&instance_id);
Ryan Olson's avatar
Ryan Olson committed
228
229
230
                    }
                }

231
232
                let instances: Vec<Instance> = map.values().cloned().collect();
                if watch_tx.send(instances).is_err() {
Ryan Olson's avatar
Ryan Olson committed
233
234
235
236
237
238
                    break;
                }
            }
            let _ = watch_tx.send(vec![]);
        });

239
        let instance_source = Arc::new(watch_rx);
240
241
        instance_sources.insert(endpoint.clone(), Arc::downgrade(&instance_source));
        Ok(instance_source)
242
    }
Ryan Olson's avatar
Ryan Olson committed
243
}