endpoint.rs 8.24 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
Ryan Olson's avatar
Ryan Olson committed
15
16

use derive_getters::Dissolve;
17
use tokio_util::sync::CancellationToken;
Ryan Olson's avatar
Ryan Olson committed
18
19
20

use super::*;

21
22
pub use async_nats::service::endpoint::Stats as EndpointStats;

Ryan Olson's avatar
Ryan Olson committed
23
24
25
26
27
28
29
#[derive(Educe, Builder, Dissolve)]
#[educe(Debug)]
#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
pub struct EndpointConfig {
    #[builder(private)]
    endpoint: Endpoint,

30
    // todo: move lease to component/service
Ryan Olson's avatar
Ryan Olson committed
31
32
33
34
35
36
37
38
    /// Lease
    #[educe(Debug(ignore))]
    #[builder(default)]
    lease: Option<Lease>,

    /// Endpoint handler
    #[educe(Debug(ignore))]
    handler: Arc<dyn PushWorkHandler>,
39
40
41
42
43

    /// Stats handler
    #[educe(Debug(ignore))]
    #[builder(default, private)]
    _stats_handler: Option<EndpointStatsHandler>,
44

45
46
47
48
    /// Additional labels for metrics
    #[builder(default, setter(into))]
    metrics_labels: Option<Vec<(String, String)>>,

49
50
51
    /// Whether to wait for inflight requests to complete during shutdown
    #[builder(default = "true")]
    graceful_shutdown: bool,
Ryan Olson's avatar
Ryan Olson committed
52
53
54
55
56
57
58
}

impl EndpointConfigBuilder {
    pub(crate) fn from_endpoint(endpoint: Endpoint) -> Self {
        Self::default().endpoint(endpoint)
    }

59
60
    pub fn stats_handler<F>(self, handler: F) -> Self
    where
61
        F: FnMut(EndpointStats) -> serde_json::Value + Send + Sync + 'static,
62
63
64
65
    {
        self._stats_handler(Some(Box::new(handler)))
    }

Ryan Olson's avatar
Ryan Olson committed
66
    pub async fn start(self) -> Result<()> {
67
        let (endpoint, lease, handler, stats_handler, metrics_labels, graceful_shutdown) =
68
            self.build_internal()?.dissolve();
69
70
        let lease = lease.or(endpoint.drt().primary_lease());
        let lease_id = lease.as_ref().map(|l| l.id()).unwrap_or(0);
Ryan Olson's avatar
Ryan Olson committed
71

72
73
74
75
        tracing::debug!(
            "Starting endpoint: {}",
            endpoint.etcd_path_with_lease_id(lease_id)
        );
Ryan Olson's avatar
Ryan Olson committed
76

77
78
79
80
81
        let service_name = endpoint.component.service_name();

        // acquire the registry lock
        let registry = endpoint.drt().component_registry.inner.lock().await;

82
83
84
        let metrics_labels: Option<Vec<(&str, &str)>> = metrics_labels
            .as_ref()
            .map(|v| v.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect());
85
        // Add metrics to the handler. The endpoint provides additional information to the handler.
86
        handler.add_metrics(&endpoint, metrics_labels.as_deref())?;
87

88
89
        // get the group
        let group = registry
Ryan Olson's avatar
Ryan Olson committed
90
            .services
91
            .get(&service_name)
Ryan Olson's avatar
Ryan Olson committed
92
            .map(|service| service.group(endpoint.component.service_name()))
Ryan Olson's avatar
Ryan Olson committed
93
94
            .ok_or(error!("Service not found"))?;

95
96
97
98
99
100
101
102
103
104
105
106
107
108
        // get the stats handler map
        let handler_map = registry
            .stats_handlers
            .get(&service_name)
            .cloned()
            .expect("no stats handler registry; this is unexpected");

        drop(registry);

        // insert the stats handler
        if let Some(stats_handler) = stats_handler {
            handler_map
                .lock()
                .unwrap()
109
                .insert(endpoint.subject_to(lease_id), stats_handler);
110
        }
Ryan Olson's avatar
Ryan Olson committed
111
112
113

        // creates an endpoint for the service
        let service_endpoint = group
114
            .endpoint(&endpoint.name_with_id(lease_id))
Ryan Olson's avatar
Ryan Olson committed
115
116
117
            .await
            .map_err(|e| anyhow::anyhow!("Failed to start endpoint: {e}"))?;

118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
        // Create a token that responds to both runtime shutdown and lease expiration
        let runtime_shutdown_token = endpoint.drt().child_token();

        // Extract all values needed from endpoint before any spawns
        let namespace_name = endpoint.component.namespace.name.clone();
        let component_name = endpoint.component.name.clone();
        let endpoint_name = endpoint.name.clone();
        let system_health = endpoint.drt().system_health.clone();
        let subject = endpoint.subject_to(lease_id);
        let etcd_path = endpoint.etcd_path_with_lease_id(lease_id);
        let etcd_client = endpoint.component.drt.etcd_client.clone();

        let cancel_token = if let Some(lease) = lease.as_ref() {
            // Create a new token that will be cancelled when EITHER the lease expires OR runtime shutdown occurs
            let combined_token = CancellationToken::new();
            let combined_for_select = combined_token.clone();
            let lease_token = lease.child_token();
            // Use secondary runtime for this lightweight monitoring task
            endpoint.drt().runtime().secondary().spawn(async move {
                tokio::select! {
                    _ = lease_token.cancelled() => {
                        tracing::trace!("Lease cancelled, triggering endpoint shutdown");
                    }
                    _ = runtime_shutdown_token.cancelled() => {
                        tracing::trace!("Runtime shutdown triggered, cancelling endpoint");
                    }
                }
                combined_for_select.cancel();
            });
            combined_token
        } else {
            // No lease, just use runtime shutdown token
            runtime_shutdown_token
        };

        // Register with graceful shutdown tracker if needed
        if graceful_shutdown {
            tracing::debug!(
                "Registering endpoint '{}' with graceful shutdown tracker",
                endpoint.name
            );
            let tracker = endpoint.drt().graceful_shutdown_tracker();
            tracker.register_endpoint();
        } else {
            tracing::debug!("Endpoint '{}' has graceful_shutdown=false", endpoint.name);
        }
Ryan Olson's avatar
Ryan Olson committed
164
165
166
167

        let push_endpoint = PushEndpoint::builder()
            .service_handler(handler)
            .cancellation_token(cancel_token.clone())
168
            .graceful_shutdown(graceful_shutdown)
Ryan Olson's avatar
Ryan Olson committed
169
170
171
172
            .build()
            .map_err(|e| anyhow::anyhow!("Failed to build push endpoint: {e}"))?;

        // launch in primary runtime
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
        let tracker_clone = if graceful_shutdown {
            Some(endpoint.drt().graceful_shutdown_tracker())
        } else {
            None
        };

        // Create clones for the async closure
        let namespace_name_for_task = namespace_name.clone();
        let component_name_for_task = component_name.clone();
        let endpoint_name_for_task = endpoint_name.clone();

        let task = tokio::spawn(async move {
            let result = push_endpoint
                .start(
                    service_endpoint,
                    namespace_name_for_task,
                    component_name_for_task,
                    endpoint_name_for_task,
                    lease_id,
                    system_health,
                )
                .await;

            // Unregister from graceful shutdown tracker
            if let Some(tracker) = tracker_clone {
                tracing::debug!("Unregistering endpoint from graceful shutdown tracker");
                tracker.unregister_endpoint();
            }

            result
        });
Ryan Olson's avatar
Ryan Olson committed
204
205
206
207

        // make the components service endpoint discovery in etcd

        // client.register_service()
208
        let info = Instance {
209
210
211
            component: component_name,
            endpoint: endpoint_name,
            namespace: namespace_name,
212
            instance_id: lease_id,
213
            transport: TransportType::NatsTcp(subject),
Ryan Olson's avatar
Ryan Olson committed
214
215
216
217
        };

        let info = serde_json::to_vec_pretty(&info)?;

218
        if let Some(etcd_client) = &etcd_client
219
            && let Err(e) = etcd_client
220
                .kv_create(&etcd_path, info, Some(lease_id))
221
                .await
222
223
224
225
        {
            tracing::error!("Failed to register discoverable service: {:?}", e);
            cancel_token.cancel();
            return Err(error!("Failed to register discoverable service"));
Ryan Olson's avatar
Ryan Olson committed
226
227
228
229
230
231
        }
        task.await??;

        Ok(())
    }
}