client.rs 6.87 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
Ryan Olson's avatar
Ryan Olson committed
15
16

use crate::pipeline::{
17
18
    AddressedPushRouter, AddressedRequest, AsyncEngine, Data, ManyOut, PushRouter, RouterMode,
    SingleIn,
Ryan Olson's avatar
Ryan Olson committed
19
20
21
22
23
24
25
26
27
};
use rand::Rng;
use std::collections::HashMap;
use std::sync::{
    atomic::{AtomicU64, Ordering},
    Arc,
};
use tokio::{net::unix::pipe::Receiver, sync::Mutex};

28
use crate::{pipeline::async_trait, transports::etcd::WatchEvent};
Ryan Olson's avatar
Ryan Olson committed
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

use super::*;

/// Each state will be have a nonce associated with it
/// The state will be emitted in a watch channel, so we can observe the
/// critical state transitions.
enum MapState {
    /// The map is empty; value = nonce
    Empty(u64),

    /// The map is not-empty; values are (nonce, count)
    NonEmpty(u64, u64),

    /// The watcher has finished, no more events will be emitted
    Finished,
}

enum EndpointEvent {
    Put(String, i64),
    Delete(String),
}

51
52
53
54
55
56
#[derive(Clone, Debug)]
pub struct Client {
    // This is me
    pub endpoint: Endpoint,
    // These are the remotes I know about
    pub endpoints: EndpointSource,
57
58
59
}

#[derive(Clone, Debug)]
60
pub enum EndpointSource {
61
    Static,
62
    Dynamic(tokio::sync::watch::Receiver<Vec<ComponentEndpointInfo>>),
Ryan Olson's avatar
Ryan Olson committed
63
64
}

65
impl Client {
66
67
68
69
70
71
72
    // Client will only talk to a single static endpoint
    pub(crate) async fn new_static(endpoint: Endpoint) -> Result<Self> {
        Ok(Client {
            endpoint,
            endpoints: EndpointSource::Static,
        })
    }
Ryan Olson's avatar
Ryan Olson committed
73

74
75
    // Client with auto-discover endpoints using etcd
    pub(crate) async fn new_dynamic(endpoint: Endpoint) -> Result<Self> {
Ryan Olson's avatar
Ryan Olson committed
76
        // create live endpoint watcher
77
78
79
80
        let Some(etcd_client) = &endpoint.component.drt.etcd_client else {
            anyhow::bail!("Attempt to create a dynamic client on a static endpoint");
        };
        let prefix_watcher = etcd_client
Ryan Olson's avatar
Ryan Olson committed
81
82
83
84
85
86
87
88
89
90
91
92
93
            .kv_get_and_watch_prefix(endpoint.etcd_path())
            .await?;

        let (prefix, _watcher, mut kv_event_rx) = prefix_watcher.dissolve();

        let (watch_tx, watch_rx) = tokio::sync::watch::channel(vec![]);

        let secondary = endpoint.component.drt.runtime.secondary().clone();

        // this task should be included in the registry
        // currently this is created once per client, but this object/task should only be instantiated
        // once per worker/instance
        secondary.spawn(async move {
94
            tracing::debug!("Starting endpoint watcher for prefix: {}", prefix);
Ryan Olson's avatar
Ryan Olson committed
95
96
97
98
99
            let mut map = HashMap::new();

            loop {
                let kv_event = tokio::select! {
                    _ = watch_tx.closed() => {
100
                        tracing::debug!("all watchers have closed; shutting down endpoint watcher for prefix: {}", prefix);
Ryan Olson's avatar
Ryan Olson committed
101
102
103
104
105
106
                        break;
                    }
                    kv_event = kv_event_rx.recv() => {
                        match kv_event {
                            Some(kv_event) => kv_event,
                            None => {
107
                                tracing::debug!("watch stream has closed; shutting down endpoint watcher for prefix: {}", prefix);
Ryan Olson's avatar
Ryan Olson committed
108
109
110
111
112
113
114
115
116
117
118
                                break;
                            }
                        }
                    }
                };

                match kv_event {
                    WatchEvent::Put(kv) => {
                        let key = String::from_utf8(kv.key().to_vec());
                        let val = serde_json::from_slice::<ComponentEndpointInfo>(kv.value());
                        if let (Ok(key), Ok(val)) = (key, val) {
119
                            map.insert(key.clone(), val);
Ryan Olson's avatar
Ryan Olson committed
120
                        } else {
121
                            tracing::error!("Unable to parse put endpoint event; shutting down endpoint watcher for prefix: {}", prefix);
Ryan Olson's avatar
Ryan Olson committed
122
123
124
125
126
127
128
                            break;
                        }
                    }
                    WatchEvent::Delete(kv) => {
                        match String::from_utf8(kv.key().to_vec()) {
                            Ok(key) => { map.remove(&key); }
                            Err(_) => {
129
                                tracing::error!("Unable to parse delete endpoint event; shutting down endpoint watcher for prefix: {}", prefix);
Ryan Olson's avatar
Ryan Olson committed
130
131
132
133
134
135
                                break;
                            }
                        }
                    }
                }

136
                let endpoints: Vec<ComponentEndpointInfo> = map.values().cloned().collect();
Ryan Olson's avatar
Ryan Olson committed
137

138
                if watch_tx.send(endpoints).is_err() {
139
                    tracing::debug!("Unable to send watch updates; shutting down endpoint watcher for prefix: {}", prefix);
Ryan Olson's avatar
Ryan Olson committed
140
141
142
143
144
                    break;
                }

            }

145
            tracing::debug!("Completed endpoint watcher for prefix: {}", prefix);
Ryan Olson's avatar
Ryan Olson committed
146
147
148
149
150
            let _ = watch_tx.send(vec![]);
        });

        Ok(Client {
            endpoint,
151
            endpoints: EndpointSource::Dynamic(watch_rx),
Ryan Olson's avatar
Ryan Olson committed
152
153
154
        })
    }

155
    /// String identifying `<namespace>/<component>/<endpoint>`
156
157
158
159
    pub fn path(&self) -> String {
        self.endpoint.path()
    }

160
    /// String identifying `<namespace>/component/<component>/<endpoint>`
161
162
163
164
    pub fn etcd_path(&self) -> String {
        self.endpoint.etcd_path()
    }

165
    pub fn endpoints(&self) -> Vec<ComponentEndpointInfo> {
166
        match &self.endpoints {
167
            EndpointSource::Static => vec![],
168
169
            EndpointSource::Dynamic(watch_rx) => watch_rx.borrow().clone(),
        }
Ryan Olson's avatar
Ryan Olson committed
170
171
    }

172
173
    pub fn endpoint_ids(&self) -> Vec<i64> {
        self.endpoints().into_iter().map(|ep| ep.id()).collect()
174
175
    }

Ryan Olson's avatar
Ryan Olson committed
176
    /// Wait for at least one [`Endpoint`] to be available
177
178
    pub async fn wait_for_endpoints(&self) -> Result<Vec<ComponentEndpointInfo>> {
        let mut endpoints: Vec<ComponentEndpointInfo> = vec![];
179
180
181
        if let EndpointSource::Dynamic(mut rx) = self.endpoints.clone() {
            // wait for there to be 1 or more endpoints
            loop {
182
183
                endpoints = rx.borrow_and_update().to_vec();
                if endpoints.is_empty() {
184
185
186
187
                    rx.changed().await?;
                } else {
                    break;
                }
Ryan Olson's avatar
Ryan Olson committed
188
189
            }
        }
190
        Ok(endpoints)
Ryan Olson's avatar
Ryan Olson committed
191
192
    }

193
194
195
196
    /// Is this component know at startup and not discovered via etcd?
    pub fn is_static(&self) -> bool {
        matches!(self.endpoints, EndpointSource::Static)
    }
Ryan Olson's avatar
Ryan Olson committed
197
}