endpoint.rs 9.2 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3
4

use derive_getters::Dissolve;
5
use tokio_util::sync::CancellationToken;
Ryan Olson's avatar
Ryan Olson committed
6
7
8

use super::*;

9
10
pub use async_nats::service::endpoint::Stats as EndpointStats;

Ryan Olson's avatar
Ryan Olson committed
11
12
13
14
15
16
17
#[derive(Educe, Builder, Dissolve)]
#[educe(Debug)]
#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
pub struct EndpointConfig {
    #[builder(private)]
    endpoint: Endpoint,

18
    // todo: move lease to component/service
Ryan Olson's avatar
Ryan Olson committed
19
20
21
22
23
24
25
26
    /// Lease
    #[educe(Debug(ignore))]
    #[builder(default)]
    lease: Option<Lease>,

    /// Endpoint handler
    #[educe(Debug(ignore))]
    handler: Arc<dyn PushWorkHandler>,
27
28
29
30
31

    /// Stats handler
    #[educe(Debug(ignore))]
    #[builder(default, private)]
    _stats_handler: Option<EndpointStatsHandler>,
32

33
34
35
36
    /// Additional labels for metrics
    #[builder(default, setter(into))]
    metrics_labels: Option<Vec<(String, String)>>,

37
38
39
    /// Whether to wait for inflight requests to complete during shutdown
    #[builder(default = "true")]
    graceful_shutdown: bool,
40
41
42
43
44
45
46

    /// Health check payload for this endpoint
    /// This payload will be sent to the endpoint during health checks
    /// to verify it's responding properly
    #[educe(Debug(ignore))]
    #[builder(default, setter(into, strip_option))]
    health_check_payload: Option<serde_json::Value>,
Ryan Olson's avatar
Ryan Olson committed
47
48
49
50
51
52
53
}

impl EndpointConfigBuilder {
    pub(crate) fn from_endpoint(endpoint: Endpoint) -> Self {
        Self::default().endpoint(endpoint)
    }

54
55
    pub fn stats_handler<F>(self, handler: F) -> Self
    where
56
        F: FnMut(EndpointStats) -> serde_json::Value + Send + Sync + 'static,
57
58
59
60
    {
        self._stats_handler(Some(Box::new(handler)))
    }

Ryan Olson's avatar
Ryan Olson committed
61
    pub async fn start(self) -> Result<()> {
62
63
64
65
66
67
68
69
70
        let (
            endpoint,
            lease,
            handler,
            stats_handler,
            metrics_labels,
            graceful_shutdown,
            health_check_payload,
        ) = self.build_internal()?.dissolve();
71
72
        let lease = lease.or(endpoint.drt().primary_lease());
        let lease_id = lease.as_ref().map(|l| l.id()).unwrap_or(0);
Ryan Olson's avatar
Ryan Olson committed
73

74
75
76
77
        tracing::debug!(
            "Starting endpoint: {}",
            endpoint.etcd_path_with_lease_id(lease_id)
        );
Ryan Olson's avatar
Ryan Olson committed
78

79
80
81
82
83
        let service_name = endpoint.component.service_name();

        // acquire the registry lock
        let registry = endpoint.drt().component_registry.inner.lock().await;

84
85
86
        let metrics_labels: Option<Vec<(&str, &str)>> = metrics_labels
            .as_ref()
            .map(|v| v.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect());
87
        // Add metrics to the handler. The endpoint provides additional information to the handler.
88
        handler.add_metrics(&endpoint, metrics_labels.as_deref())?;
89

90
91
        // get the group
        let group = registry
Ryan Olson's avatar
Ryan Olson committed
92
            .services
93
            .get(&service_name)
Ryan Olson's avatar
Ryan Olson committed
94
            .map(|service| service.group(endpoint.component.service_name()))
Ryan Olson's avatar
Ryan Olson committed
95
96
            .ok_or(error!("Service not found"))?;

97
98
99
100
101
102
103
104
105
106
107
108
109
        // get the stats handler map
        let handler_map = registry
            .stats_handlers
            .get(&service_name)
            .cloned()
            .expect("no stats handler registry; this is unexpected");

        drop(registry);

        // insert the stats handler
        if let Some(stats_handler) = stats_handler {
            handler_map
                .lock()
110
                .insert(endpoint.subject_to(lease_id), stats_handler);
111
        }
Ryan Olson's avatar
Ryan Olson committed
112
113
114

        // creates an endpoint for the service
        let service_endpoint = group
115
            .endpoint(&endpoint.name_with_id(lease_id))
Ryan Olson's avatar
Ryan Olson committed
116
117
118
            .await
            .map_err(|e| anyhow::anyhow!("Failed to start endpoint: {e}"))?;

119
120
121
122
123
124
125
126
127
128
129
130
        // Create a token that responds to both runtime shutdown and lease expiration
        let runtime_shutdown_token = endpoint.drt().child_token();

        // Extract all values needed from endpoint before any spawns
        let namespace_name = endpoint.component.namespace.name.clone();
        let component_name = endpoint.component.name.clone();
        let endpoint_name = endpoint.name.clone();
        let system_health = endpoint.drt().system_health.clone();
        let subject = endpoint.subject_to(lease_id);
        let etcd_path = endpoint.etcd_path_with_lease_id(lease_id);
        let etcd_client = endpoint.component.drt.etcd_client.clone();

131
132
133
134
135
136
137
138
139
        // Register health check target in SystemHealth if provided
        if let Some(health_check_payload) = &health_check_payload {
            let instance = Instance {
                component: component_name.clone(),
                endpoint: endpoint_name.clone(),
                namespace: namespace_name.clone(),
                instance_id: lease_id,
                transport: TransportType::NatsTcp(subject.clone()),
            };
140
            tracing::debug!(endpoint_name = %endpoint_name, "Registering endpoint health check target");
141
            let guard = system_health.lock();
142
143
144
145
146
147
            guard.register_health_check_target(
                &endpoint_name,
                instance,
                health_check_payload.clone(),
            );
            if let Some(notifier) = guard.get_endpoint_health_check_notifier(&endpoint_name) {
148
149
150
151
                handler.set_endpoint_health_check_notifier(notifier)?;
            }
        }

152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
        let cancel_token = if let Some(lease) = lease.as_ref() {
            // Create a new token that will be cancelled when EITHER the lease expires OR runtime shutdown occurs
            let combined_token = CancellationToken::new();
            let combined_for_select = combined_token.clone();
            let lease_token = lease.child_token();
            // Use secondary runtime for this lightweight monitoring task
            endpoint.drt().runtime().secondary().spawn(async move {
                tokio::select! {
                    _ = lease_token.cancelled() => {
                        tracing::trace!("Lease cancelled, triggering endpoint shutdown");
                    }
                    _ = runtime_shutdown_token.cancelled() => {
                        tracing::trace!("Runtime shutdown triggered, cancelling endpoint");
                    }
                }
                combined_for_select.cancel();
            });
            combined_token
        } else {
            // No lease, just use runtime shutdown token
            runtime_shutdown_token
        };

        // Register with graceful shutdown tracker if needed
        if graceful_shutdown {
            tracing::debug!(
                "Registering endpoint '{}' with graceful shutdown tracker",
                endpoint.name
            );
            let tracker = endpoint.drt().graceful_shutdown_tracker();
            tracker.register_endpoint();
        } else {
            tracing::debug!("Endpoint '{}' has graceful_shutdown=false", endpoint.name);
        }
Ryan Olson's avatar
Ryan Olson committed
186
187
188
189

        let push_endpoint = PushEndpoint::builder()
            .service_handler(handler)
            .cancellation_token(cancel_token.clone())
190
            .graceful_shutdown(graceful_shutdown)
Ryan Olson's avatar
Ryan Olson committed
191
192
193
194
            .build()
            .map_err(|e| anyhow::anyhow!("Failed to build push endpoint: {e}"))?;

        // launch in primary runtime
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
        let tracker_clone = if graceful_shutdown {
            Some(endpoint.drt().graceful_shutdown_tracker())
        } else {
            None
        };

        // Create clones for the async closure
        let namespace_name_for_task = namespace_name.clone();
        let component_name_for_task = component_name.clone();
        let endpoint_name_for_task = endpoint_name.clone();

        let task = tokio::spawn(async move {
            let result = push_endpoint
                .start(
                    service_endpoint,
                    namespace_name_for_task,
                    component_name_for_task,
                    endpoint_name_for_task,
                    lease_id,
                    system_health,
                )
                .await;

            // Unregister from graceful shutdown tracker
            if let Some(tracker) = tracker_clone {
                tracing::debug!("Unregistering endpoint from graceful shutdown tracker");
                tracker.unregister_endpoint();
            }

            result
        });
Ryan Olson's avatar
Ryan Olson committed
226
227
228
229

        // make the components service endpoint discovery in etcd

        // client.register_service()
230
        let info = Instance {
231
232
233
            component: component_name.clone(),
            endpoint: endpoint_name.clone(),
            namespace: namespace_name.clone(),
234
            instance_id: lease_id,
235
            transport: TransportType::NatsTcp(subject),
Ryan Olson's avatar
Ryan Olson committed
236
237
238
239
        };

        let info = serde_json::to_vec_pretty(&info)?;

240
        if let Some(etcd_client) = &etcd_client
241
            && let Err(e) = etcd_client
242
                .kv_create(&etcd_path, info, Some(lease_id))
243
                .await
244
        {
245
246
247
248
249
250
            tracing::error!(
                component_name,
                endpoint_name,
                error = %e,
                "Unable to register service for discovery"
            );
251
            cancel_token.cancel();
252
253
254
            return Err(error!(
                "Unable to register service for discovery. Check discovery service status"
            ));
Ryan Olson's avatar
Ryan Olson committed
255
256
257
258
259
260
        }
        task.await??;

        Ok(())
    }
}