endpoint.rs 8.95 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3
4

use derive_getters::Dissolve;
5
use tokio_util::sync::CancellationToken;
Ryan Olson's avatar
Ryan Olson committed
6
7
8

use super::*;

9
10
pub use async_nats::service::endpoint::Stats as EndpointStats;

Ryan Olson's avatar
Ryan Olson committed
11
12
13
14
15
16
17
#[derive(Educe, Builder, Dissolve)]
#[educe(Debug)]
#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
pub struct EndpointConfig {
    #[builder(private)]
    endpoint: Endpoint,

18
    // todo: move lease to component/service
Ryan Olson's avatar
Ryan Olson committed
19
20
21
22
23
24
25
26
    /// Lease
    #[educe(Debug(ignore))]
    #[builder(default)]
    lease: Option<Lease>,

    /// Endpoint handler
    #[educe(Debug(ignore))]
    handler: Arc<dyn PushWorkHandler>,
27
28
29
30
31

    /// Stats handler
    #[educe(Debug(ignore))]
    #[builder(default, private)]
    _stats_handler: Option<EndpointStatsHandler>,
32

33
34
35
36
    /// Additional labels for metrics
    #[builder(default, setter(into))]
    metrics_labels: Option<Vec<(String, String)>>,

37
38
39
    /// Whether to wait for inflight requests to complete during shutdown
    #[builder(default = "true")]
    graceful_shutdown: bool,
40
41
42
43
44
45
46

    /// Health check payload for this endpoint
    /// This payload will be sent to the endpoint during health checks
    /// to verify it's responding properly
    #[educe(Debug(ignore))]
    #[builder(default, setter(into, strip_option))]
    health_check_payload: Option<serde_json::Value>,
Ryan Olson's avatar
Ryan Olson committed
47
48
49
50
51
52
53
}

impl EndpointConfigBuilder {
    pub(crate) fn from_endpoint(endpoint: Endpoint) -> Self {
        Self::default().endpoint(endpoint)
    }

54
55
    pub fn stats_handler<F>(self, handler: F) -> Self
    where
56
        F: FnMut(EndpointStats) -> serde_json::Value + Send + Sync + 'static,
57
58
59
60
    {
        self._stats_handler(Some(Box::new(handler)))
    }

Ryan Olson's avatar
Ryan Olson committed
61
    pub async fn start(self) -> Result<()> {
62
63
64
65
66
67
68
69
70
        let (
            endpoint,
            lease,
            handler,
            stats_handler,
            metrics_labels,
            graceful_shutdown,
            health_check_payload,
        ) = self.build_internal()?.dissolve();
71
72
        let lease = lease.or(endpoint.drt().primary_lease());
        let lease_id = lease.as_ref().map(|l| l.id()).unwrap_or(0);
Ryan Olson's avatar
Ryan Olson committed
73

74
75
76
77
        tracing::debug!(
            "Starting endpoint: {}",
            endpoint.etcd_path_with_lease_id(lease_id)
        );
Ryan Olson's avatar
Ryan Olson committed
78

79
80
81
82
83
        let service_name = endpoint.component.service_name();

        // acquire the registry lock
        let registry = endpoint.drt().component_registry.inner.lock().await;

84
85
86
        let metrics_labels: Option<Vec<(&str, &str)>> = metrics_labels
            .as_ref()
            .map(|v| v.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect());
87
        // Add metrics to the handler. The endpoint provides additional information to the handler.
88
        handler.add_metrics(&endpoint, metrics_labels.as_deref())?;
89

90
91
        // get the group
        let group = registry
Ryan Olson's avatar
Ryan Olson committed
92
            .services
93
            .get(&service_name)
Ryan Olson's avatar
Ryan Olson committed
94
            .map(|service| service.group(endpoint.component.service_name()))
Ryan Olson's avatar
Ryan Olson committed
95
96
            .ok_or(error!("Service not found"))?;

97
98
99
100
101
102
103
104
105
106
107
108
109
110
        // get the stats handler map
        let handler_map = registry
            .stats_handlers
            .get(&service_name)
            .cloned()
            .expect("no stats handler registry; this is unexpected");

        drop(registry);

        // insert the stats handler
        if let Some(stats_handler) = stats_handler {
            handler_map
                .lock()
                .unwrap()
111
                .insert(endpoint.subject_to(lease_id), stats_handler);
112
        }
Ryan Olson's avatar
Ryan Olson committed
113
114
115

        // creates an endpoint for the service
        let service_endpoint = group
116
            .endpoint(&endpoint.name_with_id(lease_id))
Ryan Olson's avatar
Ryan Olson committed
117
118
119
            .await
            .map_err(|e| anyhow::anyhow!("Failed to start endpoint: {e}"))?;

120
121
122
123
124
125
126
127
128
129
130
131
        // Create a token that responds to both runtime shutdown and lease expiration
        let runtime_shutdown_token = endpoint.drt().child_token();

        // Extract all values needed from endpoint before any spawns
        let namespace_name = endpoint.component.namespace.name.clone();
        let component_name = endpoint.component.name.clone();
        let endpoint_name = endpoint.name.clone();
        let system_health = endpoint.drt().system_health.clone();
        let subject = endpoint.subject_to(lease_id);
        let etcd_path = endpoint.etcd_path_with_lease_id(lease_id);
        let etcd_client = endpoint.component.drt.etcd_client.clone();

132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
        // Register health check target in SystemHealth if provided
        if let Some(health_check_payload) = &health_check_payload {
            let instance = Instance {
                component: component_name.clone(),
                endpoint: endpoint_name.clone(),
                namespace: namespace_name.clone(),
                instance_id: lease_id,
                transport: TransportType::NatsTcp(subject.clone()),
            };
            tracing::debug!(subject = %subject, "Registering endpoint health check target");
            let guard = system_health.lock().unwrap();
            guard.register_health_check_target(&subject, instance, health_check_payload.clone());
            if let Some(notifier) = guard.get_endpoint_health_check_notifier(&subject) {
                handler.set_endpoint_health_check_notifier(notifier)?;
            }
        }

149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
        let cancel_token = if let Some(lease) = lease.as_ref() {
            // Create a new token that will be cancelled when EITHER the lease expires OR runtime shutdown occurs
            let combined_token = CancellationToken::new();
            let combined_for_select = combined_token.clone();
            let lease_token = lease.child_token();
            // Use secondary runtime for this lightweight monitoring task
            endpoint.drt().runtime().secondary().spawn(async move {
                tokio::select! {
                    _ = lease_token.cancelled() => {
                        tracing::trace!("Lease cancelled, triggering endpoint shutdown");
                    }
                    _ = runtime_shutdown_token.cancelled() => {
                        tracing::trace!("Runtime shutdown triggered, cancelling endpoint");
                    }
                }
                combined_for_select.cancel();
            });
            combined_token
        } else {
            // No lease, just use runtime shutdown token
            runtime_shutdown_token
        };

        // Register with graceful shutdown tracker if needed
        if graceful_shutdown {
            tracing::debug!(
                "Registering endpoint '{}' with graceful shutdown tracker",
                endpoint.name
            );
            let tracker = endpoint.drt().graceful_shutdown_tracker();
            tracker.register_endpoint();
        } else {
            tracing::debug!("Endpoint '{}' has graceful_shutdown=false", endpoint.name);
        }
Ryan Olson's avatar
Ryan Olson committed
183
184
185
186

        let push_endpoint = PushEndpoint::builder()
            .service_handler(handler)
            .cancellation_token(cancel_token.clone())
187
            .graceful_shutdown(graceful_shutdown)
Ryan Olson's avatar
Ryan Olson committed
188
189
190
191
            .build()
            .map_err(|e| anyhow::anyhow!("Failed to build push endpoint: {e}"))?;

        // launch in primary runtime
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
        let tracker_clone = if graceful_shutdown {
            Some(endpoint.drt().graceful_shutdown_tracker())
        } else {
            None
        };

        // Create clones for the async closure
        let namespace_name_for_task = namespace_name.clone();
        let component_name_for_task = component_name.clone();
        let endpoint_name_for_task = endpoint_name.clone();

        let task = tokio::spawn(async move {
            let result = push_endpoint
                .start(
                    service_endpoint,
                    namespace_name_for_task,
                    component_name_for_task,
                    endpoint_name_for_task,
                    lease_id,
                    system_health,
                )
                .await;

            // Unregister from graceful shutdown tracker
            if let Some(tracker) = tracker_clone {
                tracing::debug!("Unregistering endpoint from graceful shutdown tracker");
                tracker.unregister_endpoint();
            }

            result
        });
Ryan Olson's avatar
Ryan Olson committed
223
224
225
226

        // make the components service endpoint discovery in etcd

        // client.register_service()
227
        let info = Instance {
228
229
230
            component: component_name,
            endpoint: endpoint_name,
            namespace: namespace_name,
231
            instance_id: lease_id,
232
            transport: TransportType::NatsTcp(subject),
Ryan Olson's avatar
Ryan Olson committed
233
234
235
236
        };

        let info = serde_json::to_vec_pretty(&info)?;

237
        if let Some(etcd_client) = &etcd_client
238
            && let Err(e) = etcd_client
239
                .kv_create(&etcd_path, info, Some(lease_id))
240
                .await
241
242
243
244
        {
            tracing::error!("Failed to register discoverable service: {:?}", e);
            cancel_token.cancel();
            return Err(error!("Failed to register discoverable service"));
Ryan Olson's avatar
Ryan Olson committed
245
246
247
248
249
250
        }
        task.await??;

        Ok(())
    }
}