endpoint.rs 7.7 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3
4

use derive_getters::Dissolve;
5
use tokio_util::sync::CancellationToken;
Ryan Olson's avatar
Ryan Olson committed
6
7
8

use super::*;

9
10
pub use async_nats::service::endpoint::Stats as EndpointStats;

Ryan Olson's avatar
Ryan Olson committed
11
12
13
14
15
16
17
#[derive(Educe, Builder, Dissolve)]
#[educe(Debug)]
#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
pub struct EndpointConfig {
    #[builder(private)]
    endpoint: Endpoint,

18
    // todo: move lease to component/service
Ryan Olson's avatar
Ryan Olson committed
19
20
21
22
23
24
25
26
    /// Lease
    #[educe(Debug(ignore))]
    #[builder(default)]
    lease: Option<Lease>,

    /// Endpoint handler
    #[educe(Debug(ignore))]
    handler: Arc<dyn PushWorkHandler>,
27
28
29
30
31

    /// Stats handler
    #[educe(Debug(ignore))]
    #[builder(default, private)]
    _stats_handler: Option<EndpointStatsHandler>,
32

33
34
35
36
    /// Additional labels for metrics
    #[builder(default, setter(into))]
    metrics_labels: Option<Vec<(String, String)>>,

37
38
39
    /// Whether to wait for inflight requests to complete during shutdown
    #[builder(default = "true")]
    graceful_shutdown: bool,
Ryan Olson's avatar
Ryan Olson committed
40
41
42
43
44
45
46
}

impl EndpointConfigBuilder {
    pub(crate) fn from_endpoint(endpoint: Endpoint) -> Self {
        Self::default().endpoint(endpoint)
    }

47
48
    pub fn stats_handler<F>(self, handler: F) -> Self
    where
49
        F: FnMut(EndpointStats) -> serde_json::Value + Send + Sync + 'static,
50
51
52
53
    {
        self._stats_handler(Some(Box::new(handler)))
    }

Ryan Olson's avatar
Ryan Olson committed
54
    pub async fn start(self) -> Result<()> {
55
        let (endpoint, lease, handler, stats_handler, metrics_labels, graceful_shutdown) =
56
            self.build_internal()?.dissolve();
57
58
        let lease = lease.or(endpoint.drt().primary_lease());
        let lease_id = lease.as_ref().map(|l| l.id()).unwrap_or(0);
Ryan Olson's avatar
Ryan Olson committed
59

60
61
62
63
        tracing::debug!(
            "Starting endpoint: {}",
            endpoint.etcd_path_with_lease_id(lease_id)
        );
Ryan Olson's avatar
Ryan Olson committed
64

65
66
67
68
69
        let service_name = endpoint.component.service_name();

        // acquire the registry lock
        let registry = endpoint.drt().component_registry.inner.lock().await;

70
71
72
        let metrics_labels: Option<Vec<(&str, &str)>> = metrics_labels
            .as_ref()
            .map(|v| v.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect());
73
        // Add metrics to the handler. The endpoint provides additional information to the handler.
74
        handler.add_metrics(&endpoint, metrics_labels.as_deref())?;
75

76
77
        // get the group
        let group = registry
Ryan Olson's avatar
Ryan Olson committed
78
            .services
79
            .get(&service_name)
Ryan Olson's avatar
Ryan Olson committed
80
            .map(|service| service.group(endpoint.component.service_name()))
Ryan Olson's avatar
Ryan Olson committed
81
82
            .ok_or(error!("Service not found"))?;

83
84
85
86
87
88
89
90
91
92
93
94
95
96
        // get the stats handler map
        let handler_map = registry
            .stats_handlers
            .get(&service_name)
            .cloned()
            .expect("no stats handler registry; this is unexpected");

        drop(registry);

        // insert the stats handler
        if let Some(stats_handler) = stats_handler {
            handler_map
                .lock()
                .unwrap()
97
                .insert(endpoint.subject_to(lease_id), stats_handler);
98
        }
Ryan Olson's avatar
Ryan Olson committed
99
100
101

        // creates an endpoint for the service
        let service_endpoint = group
102
            .endpoint(&endpoint.name_with_id(lease_id))
Ryan Olson's avatar
Ryan Olson committed
103
104
105
            .await
            .map_err(|e| anyhow::anyhow!("Failed to start endpoint: {e}"))?;

106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
        // Create a token that responds to both runtime shutdown and lease expiration
        let runtime_shutdown_token = endpoint.drt().child_token();

        // Extract all values needed from endpoint before any spawns
        let namespace_name = endpoint.component.namespace.name.clone();
        let component_name = endpoint.component.name.clone();
        let endpoint_name = endpoint.name.clone();
        let system_health = endpoint.drt().system_health.clone();
        let subject = endpoint.subject_to(lease_id);
        let etcd_path = endpoint.etcd_path_with_lease_id(lease_id);
        let etcd_client = endpoint.component.drt.etcd_client.clone();

        let cancel_token = if let Some(lease) = lease.as_ref() {
            // Create a new token that will be cancelled when EITHER the lease expires OR runtime shutdown occurs
            let combined_token = CancellationToken::new();
            let combined_for_select = combined_token.clone();
            let lease_token = lease.child_token();
            // Use secondary runtime for this lightweight monitoring task
            endpoint.drt().runtime().secondary().spawn(async move {
                tokio::select! {
                    _ = lease_token.cancelled() => {
                        tracing::trace!("Lease cancelled, triggering endpoint shutdown");
                    }
                    _ = runtime_shutdown_token.cancelled() => {
                        tracing::trace!("Runtime shutdown triggered, cancelling endpoint");
                    }
                }
                combined_for_select.cancel();
            });
            combined_token
        } else {
            // No lease, just use runtime shutdown token
            runtime_shutdown_token
        };

        // Register with graceful shutdown tracker if needed
        if graceful_shutdown {
            tracing::debug!(
                "Registering endpoint '{}' with graceful shutdown tracker",
                endpoint.name
            );
            let tracker = endpoint.drt().graceful_shutdown_tracker();
            tracker.register_endpoint();
        } else {
            tracing::debug!("Endpoint '{}' has graceful_shutdown=false", endpoint.name);
        }
Ryan Olson's avatar
Ryan Olson committed
152
153
154
155

        let push_endpoint = PushEndpoint::builder()
            .service_handler(handler)
            .cancellation_token(cancel_token.clone())
156
            .graceful_shutdown(graceful_shutdown)
Ryan Olson's avatar
Ryan Olson committed
157
158
159
160
            .build()
            .map_err(|e| anyhow::anyhow!("Failed to build push endpoint: {e}"))?;

        // launch in primary runtime
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
        let tracker_clone = if graceful_shutdown {
            Some(endpoint.drt().graceful_shutdown_tracker())
        } else {
            None
        };

        // Create clones for the async closure
        let namespace_name_for_task = namespace_name.clone();
        let component_name_for_task = component_name.clone();
        let endpoint_name_for_task = endpoint_name.clone();

        let task = tokio::spawn(async move {
            let result = push_endpoint
                .start(
                    service_endpoint,
                    namespace_name_for_task,
                    component_name_for_task,
                    endpoint_name_for_task,
                    lease_id,
                    system_health,
                )
                .await;

            // Unregister from graceful shutdown tracker
            if let Some(tracker) = tracker_clone {
                tracing::debug!("Unregistering endpoint from graceful shutdown tracker");
                tracker.unregister_endpoint();
            }

            result
        });
Ryan Olson's avatar
Ryan Olson committed
192
193
194
195

        // make the components service endpoint discovery in etcd

        // client.register_service()
196
        let info = Instance {
197
198
199
            component: component_name,
            endpoint: endpoint_name,
            namespace: namespace_name,
200
            instance_id: lease_id,
201
            transport: TransportType::NatsTcp(subject),
Ryan Olson's avatar
Ryan Olson committed
202
203
204
205
        };

        let info = serde_json::to_vec_pretty(&info)?;

206
        if let Some(etcd_client) = &etcd_client
207
            && let Err(e) = etcd_client
208
                .kv_create(&etcd_path, info, Some(lease_id))
209
                .await
210
211
212
213
        {
            tracing::error!("Failed to register discoverable service: {:?}", e);
            cancel_token.cancel();
            return Err(error!("Failed to register discoverable service"));
Ryan Olson's avatar
Ryan Olson committed
214
215
216
217
218
219
        }
        task.await??;

        Ok(())
    }
}