client.rs 6.75 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
Ryan Olson's avatar
Ryan Olson committed
15
16

use crate::pipeline::{
17
18
    AddressedPushRouter, AddressedRequest, AsyncEngine, Data, ManyOut, PushRouter, RouterMode,
    SingleIn,
Ryan Olson's avatar
Ryan Olson committed
19
20
21
22
23
24
25
26
27
};
use rand::Rng;
use std::collections::HashMap;
use std::sync::{
    atomic::{AtomicU64, Ordering},
    Arc,
};
use tokio::{net::unix::pipe::Receiver, sync::Mutex};

28
use crate::{pipeline::async_trait, transports::etcd::WatchEvent};
Ryan Olson's avatar
Ryan Olson committed
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

use super::*;

/// Each state will be have a nonce associated with it
/// The state will be emitted in a watch channel, so we can observe the
/// critical state transitions.
enum MapState {
    /// The map is empty; value = nonce
    Empty(u64),

    /// The map is not-empty; values are (nonce, count)
    NonEmpty(u64, u64),

    /// The watcher has finished, no more events will be emitted
    Finished,
}

enum EndpointEvent {
    Put(String, i64),
    Delete(String),
}

51
52
53
54
55
#[derive(Clone, Debug)]
pub struct Client {
    // This is me
    pub endpoint: Endpoint,
    // These are the remotes I know about
56
    pub instances: InstanceSource,
57
58
59
}

#[derive(Clone, Debug)]
60
pub enum InstanceSource {
61
    Static,
62
    Dynamic(tokio::sync::watch::Receiver<Vec<Instance>>),
Ryan Olson's avatar
Ryan Olson committed
63
64
}

65
impl Client {
66
67
68
69
    // Client will only talk to a single static endpoint
    pub(crate) async fn new_static(endpoint: Endpoint) -> Result<Self> {
        Ok(Client {
            endpoint,
70
            instances: InstanceSource::Static,
71
72
        })
    }
Ryan Olson's avatar
Ryan Olson committed
73

74
    // Client with auto-discover instances using etcd
75
    pub(crate) async fn new_dynamic(endpoint: Endpoint) -> Result<Self> {
Ryan Olson's avatar
Ryan Olson committed
76
        // create live endpoint watcher
77
78
79
80
        let Some(etcd_client) = &endpoint.component.drt.etcd_client else {
            anyhow::bail!("Attempt to create a dynamic client on a static endpoint");
        };
        let prefix_watcher = etcd_client
81
            .kv_get_and_watch_prefix(endpoint.etcd_root())
Ryan Olson's avatar
Ryan Olson committed
82
83
84
85
86
87
88
89
90
91
92
93
            .await?;

        let (prefix, _watcher, mut kv_event_rx) = prefix_watcher.dissolve();

        let (watch_tx, watch_rx) = tokio::sync::watch::channel(vec![]);

        let secondary = endpoint.component.drt.runtime.secondary().clone();

        // this task should be included in the registry
        // currently this is created once per client, but this object/task should only be instantiated
        // once per worker/instance
        secondary.spawn(async move {
94
            tracing::debug!("Starting endpoint watcher for prefix: {}", prefix);
Ryan Olson's avatar
Ryan Olson committed
95
96
97
98
99
            let mut map = HashMap::new();

            loop {
                let kv_event = tokio::select! {
                    _ = watch_tx.closed() => {
100
                        tracing::debug!("all watchers have closed; shutting down endpoint watcher for prefix: {prefix}");
Ryan Olson's avatar
Ryan Olson committed
101
102
103
104
105
106
                        break;
                    }
                    kv_event = kv_event_rx.recv() => {
                        match kv_event {
                            Some(kv_event) => kv_event,
                            None => {
107
                                tracing::debug!("watch stream has closed; shutting down endpoint watcher for prefix: {prefix}");
Ryan Olson's avatar
Ryan Olson committed
108
109
110
111
112
113
114
115
116
                                break;
                            }
                        }
                    }
                };

                match kv_event {
                    WatchEvent::Put(kv) => {
                        let key = String::from_utf8(kv.key().to_vec());
117
                        let val = serde_json::from_slice::<Instance>(kv.value());
Ryan Olson's avatar
Ryan Olson committed
118
                        if let (Ok(key), Ok(val)) = (key, val) {
119
                            map.insert(key.clone(), val);
Ryan Olson's avatar
Ryan Olson committed
120
                        } else {
121
                            tracing::error!("Unable to parse put endpoint event; shutting down endpoint watcher for prefix: {prefix}");
Ryan Olson's avatar
Ryan Olson committed
122
123
124
125
126
127
128
                            break;
                        }
                    }
                    WatchEvent::Delete(kv) => {
                        match String::from_utf8(kv.key().to_vec()) {
                            Ok(key) => { map.remove(&key); }
                            Err(_) => {
129
                                tracing::error!("Unable to parse delete endpoint event; shutting down endpoint watcher for prefix: {}", prefix);
Ryan Olson's avatar
Ryan Olson committed
130
131
132
133
134
135
                                break;
                            }
                        }
                    }
                }

136
                let instances: Vec<Instance> = map.values().cloned().collect();
Ryan Olson's avatar
Ryan Olson committed
137

138
                if watch_tx.send(instances).is_err() {
139
                    tracing::debug!("Unable to send watch updates; shutting down endpoint watcher for prefix: {}", prefix);
Ryan Olson's avatar
Ryan Olson committed
140
141
142
143
144
                    break;
                }

            }

145
            tracing::debug!("Completed endpoint watcher for prefix: {prefix}");
Ryan Olson's avatar
Ryan Olson committed
146
147
148
149
150
            let _ = watch_tx.send(vec![]);
        });

        Ok(Client {
            endpoint,
151
            instances: InstanceSource::Dynamic(watch_rx),
Ryan Olson's avatar
Ryan Olson committed
152
153
154
        })
    }

155
156
157
158
    pub fn path(&self) -> String {
        self.endpoint.path()
    }

159
160
161
    /// The root etcd path we watch in etcd to discover new instances to route to.
    pub fn etcd_root(&self) -> String {
        self.endpoint.etcd_root()
162
163
    }

164
165
166
167
    pub fn instances(&self) -> Vec<Instance> {
        match &self.instances {
            InstanceSource::Static => vec![],
            InstanceSource::Dynamic(watch_rx) => watch_rx.borrow().clone(),
168
        }
Ryan Olson's avatar
Ryan Olson committed
169
170
    }

171
172
    pub fn instance_ids(&self) -> Vec<i64> {
        self.instances().into_iter().map(|ep| ep.id()).collect()
173
174
    }

175
176
177
178
    /// Wait for at least one Instance to be available for this Endpoint
    pub async fn wait_for_instances(&self) -> Result<Vec<Instance>> {
        let mut instances: Vec<Instance> = vec![];
        if let InstanceSource::Dynamic(mut rx) = self.instances.clone() {
179
180
            // wait for there to be 1 or more endpoints
            loop {
181
182
                instances = rx.borrow_and_update().to_vec();
                if instances.is_empty() {
183
184
185
186
                    rx.changed().await?;
                } else {
                    break;
                }
Ryan Olson's avatar
Ryan Olson committed
187
188
            }
        }
189
        Ok(instances)
Ryan Olson's avatar
Ryan Olson committed
190
191
    }

192
193
    /// Is this component know at startup and not discovered via etcd?
    pub fn is_static(&self) -> bool {
194
        matches!(self.instances, InstanceSource::Static)
195
    }
Ryan Olson's avatar
Ryan Olson committed
196
}