endpoint.rs 7.92 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3
4

use derive_getters::Dissolve;
5
use tokio_util::sync::CancellationToken;
Ryan Olson's avatar
Ryan Olson committed
6

7
8
use crate::storage::key_value_store;

Ryan Olson's avatar
Ryan Olson committed
9
10
use super::*;

11
12
pub use async_nats::service::endpoint::Stats as EndpointStats;

Ryan Olson's avatar
Ryan Olson committed
13
14
15
16
17
18
19
20
21
22
#[derive(Educe, Builder, Dissolve)]
#[educe(Debug)]
#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
pub struct EndpointConfig {
    #[builder(private)]
    endpoint: Endpoint,

    /// Endpoint handler
    #[educe(Debug(ignore))]
    handler: Arc<dyn PushWorkHandler>,
23
24
25
26
27

    /// Stats handler
    #[educe(Debug(ignore))]
    #[builder(default, private)]
    _stats_handler: Option<EndpointStatsHandler>,
28

29
30
31
32
    /// Additional labels for metrics
    #[builder(default, setter(into))]
    metrics_labels: Option<Vec<(String, String)>>,

33
34
35
    /// Whether to wait for inflight requests to complete during shutdown
    #[builder(default = "true")]
    graceful_shutdown: bool,
36
37
38
39
40
41
42

    /// Health check payload for this endpoint
    /// This payload will be sent to the endpoint during health checks
    /// to verify it's responding properly
    #[educe(Debug(ignore))]
    #[builder(default, setter(into, strip_option))]
    health_check_payload: Option<serde_json::Value>,
Ryan Olson's avatar
Ryan Olson committed
43
44
45
46
47
48
49
}

impl EndpointConfigBuilder {
    pub(crate) fn from_endpoint(endpoint: Endpoint) -> Self {
        Self::default().endpoint(endpoint)
    }

50
51
    pub fn stats_handler<F>(self, handler: F) -> Self
    where
52
        F: FnMut(EndpointStats) -> serde_json::Value + Send + Sync + 'static,
53
54
55
56
    {
        self._stats_handler(Some(Box::new(handler)))
    }

Ryan Olson's avatar
Ryan Olson committed
57
    pub async fn start(self) -> Result<()> {
58
59
60
61
62
63
64
65
        let (
            endpoint,
            handler,
            stats_handler,
            metrics_labels,
            graceful_shutdown,
            health_check_payload,
        ) = self.build_internal()?.dissolve();
66
        let connection_id = endpoint.drt().connection_id();
Ryan Olson's avatar
Ryan Olson committed
67

68
69
        tracing::debug!(
            "Starting endpoint: {}",
70
            endpoint.etcd_path_with_lease_id(connection_id)
71
        );
Ryan Olson's avatar
Ryan Olson committed
72

73
74
75
76
77
        let service_name = endpoint.component.service_name();

        // acquire the registry lock
        let registry = endpoint.drt().component_registry.inner.lock().await;

78
79
80
        let metrics_labels: Option<Vec<(&str, &str)>> = metrics_labels
            .as_ref()
            .map(|v| v.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect());
81
        // Add metrics to the handler. The endpoint provides additional information to the handler.
82
        handler.add_metrics(&endpoint, metrics_labels.as_deref())?;
83

84
85
        // get the group
        let group = registry
Ryan Olson's avatar
Ryan Olson committed
86
            .services
87
            .get(&service_name)
Ryan Olson's avatar
Ryan Olson committed
88
            .map(|service| service.group(endpoint.component.service_name()))
Ryan Olson's avatar
Ryan Olson committed
89
90
            .ok_or(error!("Service not found"))?;

91
92
93
94
95
96
97
98
99
100
101
102
103
        // get the stats handler map
        let handler_map = registry
            .stats_handlers
            .get(&service_name)
            .cloned()
            .expect("no stats handler registry; this is unexpected");

        drop(registry);

        // insert the stats handler
        if let Some(stats_handler) = stats_handler {
            handler_map
                .lock()
104
                .insert(endpoint.subject_to(connection_id), stats_handler);
105
        }
Ryan Olson's avatar
Ryan Olson committed
106
107
108

        // creates an endpoint for the service
        let service_endpoint = group
109
            .endpoint(&endpoint.name_with_id(connection_id))
Ryan Olson's avatar
Ryan Olson committed
110
111
112
            .await
            .map_err(|e| anyhow::anyhow!("Failed to start endpoint: {e}"))?;

113
114
115
        // This creates a child token of the runtime's endpoint_shutdown_token. That token is
        // cancelled first as part of graceful shutdown. See Runtime::shutdown.
        let endpoint_shutdown_token = endpoint.drt().child_token();
116
117
118
119
120
121

        // Extract all values needed from endpoint before any spawns
        let namespace_name = endpoint.component.namespace.name.clone();
        let component_name = endpoint.component.name.clone();
        let endpoint_name = endpoint.name.clone();
        let system_health = endpoint.drt().system_health.clone();
122
        let subject = endpoint.subject_to(connection_id);
123

124
125
126
127
128
129
        // Register health check target in SystemHealth if provided
        if let Some(health_check_payload) = &health_check_payload {
            let instance = Instance {
                component: component_name.clone(),
                endpoint: endpoint_name.clone(),
                namespace: namespace_name.clone(),
130
                instance_id: connection_id,
131
132
                transport: TransportType::NatsTcp(subject.clone()),
            };
133
            tracing::debug!(endpoint_name = %endpoint_name, "Registering endpoint health check target");
134
            let guard = system_health.lock();
135
136
137
138
139
140
            guard.register_health_check_target(
                &endpoint_name,
                instance,
                health_check_payload.clone(),
            );
            if let Some(notifier) = guard.get_endpoint_health_check_notifier(&endpoint_name) {
141
142
143
144
                handler.set_endpoint_health_check_notifier(notifier)?;
            }
        }

145
146
147
148
149
150
151
152
153
154
155
        // Register with graceful shutdown tracker if needed
        if graceful_shutdown {
            tracing::debug!(
                "Registering endpoint '{}' with graceful shutdown tracker",
                endpoint.name
            );
            let tracker = endpoint.drt().graceful_shutdown_tracker();
            tracker.register_endpoint();
        } else {
            tracing::debug!("Endpoint '{}' has graceful_shutdown=false", endpoint.name);
        }
Ryan Olson's avatar
Ryan Olson committed
156
157
158

        let push_endpoint = PushEndpoint::builder()
            .service_handler(handler)
159
            .cancellation_token(endpoint_shutdown_token.clone())
160
            .graceful_shutdown(graceful_shutdown)
Ryan Olson's avatar
Ryan Olson committed
161
162
163
            .build()
            .map_err(|e| anyhow::anyhow!("Failed to build push endpoint: {e}"))?;

164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
        let tracker_clone = if graceful_shutdown {
            Some(endpoint.drt().graceful_shutdown_tracker())
        } else {
            None
        };

        // Create clones for the async closure
        let namespace_name_for_task = namespace_name.clone();
        let component_name_for_task = component_name.clone();
        let endpoint_name_for_task = endpoint_name.clone();

        let task = tokio::spawn(async move {
            let result = push_endpoint
                .start(
                    service_endpoint,
                    namespace_name_for_task,
                    component_name_for_task,
                    endpoint_name_for_task,
182
                    connection_id,
183
184
185
186
187
188
189
190
191
192
193
194
                    system_health,
                )
                .await;

            // Unregister from graceful shutdown tracker
            if let Some(tracker) = tracker_clone {
                tracing::debug!("Unregistering endpoint from graceful shutdown tracker");
                tracker.unregister_endpoint();
            }

            result
        });
Ryan Olson's avatar
Ryan Olson committed
195

196
197
198
199
200
201
202
        // Register this endpoint instance in the discovery plane
        // The discovery interface abstracts storage backend (etcd, k8s, etc) and provides
        // consistent registration/discovery across the system.
        let discovery = endpoint.drt().discovery();

        let discovery_spec = crate::discovery::DiscoverySpec::Endpoint {
            namespace: namespace_name.clone(),
203
204
            component: component_name.clone(),
            endpoint: endpoint_name.clone(),
205
            transport: TransportType::NatsTcp(subject.clone()),
Ryan Olson's avatar
Ryan Olson committed
206
207
        };

208
        if let Err(e) = discovery.register(discovery_spec).await {
209
210
211
            tracing::error!(
                component_name,
                endpoint_name,
212
                error = %e,
213
214
                "Unable to register service for discovery"
            );
215
            endpoint_shutdown_token.cancel();
216
217
218
            return Err(error!(
                "Unable to register service for discovery. Check discovery service status"
            ));
Ryan Olson's avatar
Ryan Olson committed
219
        }
220

Ryan Olson's avatar
Ryan Olson committed
221
222
223
224
225
        task.await??;

        Ok(())
    }
}