endpoint.rs 8.02 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3
4

use derive_getters::Dissolve;
5
use tokio_util::sync::CancellationToken;
Ryan Olson's avatar
Ryan Olson committed
6
7
8

use super::*;

9
10
pub use async_nats::service::endpoint::Stats as EndpointStats;

Ryan Olson's avatar
Ryan Olson committed
11
12
13
14
15
16
17
18
19
20
#[derive(Educe, Builder, Dissolve)]
#[educe(Debug)]
#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
pub struct EndpointConfig {
    #[builder(private)]
    endpoint: Endpoint,

    /// Endpoint handler
    #[educe(Debug(ignore))]
    handler: Arc<dyn PushWorkHandler>,
21
22
23
24
25

    /// Stats handler
    #[educe(Debug(ignore))]
    #[builder(default, private)]
    _stats_handler: Option<EndpointStatsHandler>,
26

27
28
29
30
    /// Additional labels for metrics
    #[builder(default, setter(into))]
    metrics_labels: Option<Vec<(String, String)>>,

31
32
33
    /// Whether to wait for inflight requests to complete during shutdown
    #[builder(default = "true")]
    graceful_shutdown: bool,
34
35
36
37
38
39
40

    /// Health check payload for this endpoint
    /// This payload will be sent to the endpoint during health checks
    /// to verify it's responding properly
    #[educe(Debug(ignore))]
    #[builder(default, setter(into, strip_option))]
    health_check_payload: Option<serde_json::Value>,
Ryan Olson's avatar
Ryan Olson committed
41
42
43
44
45
46
47
}

impl EndpointConfigBuilder {
    pub(crate) fn from_endpoint(endpoint: Endpoint) -> Self {
        Self::default().endpoint(endpoint)
    }

48
49
    pub fn stats_handler<F>(self, handler: F) -> Self
    where
50
        F: FnMut(EndpointStats) -> serde_json::Value + Send + Sync + 'static,
51
52
53
54
    {
        self._stats_handler(Some(Box::new(handler)))
    }

Ryan Olson's avatar
Ryan Olson committed
55
    pub async fn start(self) -> Result<()> {
56
57
58
59
60
61
62
63
        let (
            endpoint,
            handler,
            stats_handler,
            metrics_labels,
            graceful_shutdown,
            health_check_payload,
        ) = self.build_internal()?.dissolve();
64
        let connection_id = endpoint.drt().connection_id();
Ryan Olson's avatar
Ryan Olson committed
65

66
67
        tracing::debug!(
            "Starting endpoint: {}",
68
            endpoint.etcd_path_with_lease_id(connection_id)
69
        );
Ryan Olson's avatar
Ryan Olson committed
70

71
72
73
74
75
        let service_name = endpoint.component.service_name();

        // acquire the registry lock
        let registry = endpoint.drt().component_registry.inner.lock().await;

76
77
78
        let metrics_labels: Option<Vec<(&str, &str)>> = metrics_labels
            .as_ref()
            .map(|v| v.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect());
79
        // Add metrics to the handler. The endpoint provides additional information to the handler.
80
        handler.add_metrics(&endpoint, metrics_labels.as_deref())?;
81

82
83
        // get the group
        let group = registry
Ryan Olson's avatar
Ryan Olson committed
84
            .services
85
            .get(&service_name)
Ryan Olson's avatar
Ryan Olson committed
86
            .map(|service| service.group(endpoint.component.service_name()))
Ryan Olson's avatar
Ryan Olson committed
87
88
            .ok_or(error!("Service not found"))?;

89
90
91
92
93
94
95
96
97
98
99
100
101
        // get the stats handler map
        let handler_map = registry
            .stats_handlers
            .get(&service_name)
            .cloned()
            .expect("no stats handler registry; this is unexpected");

        drop(registry);

        // insert the stats handler
        if let Some(stats_handler) = stats_handler {
            handler_map
                .lock()
102
                .insert(endpoint.subject_to(connection_id), stats_handler);
103
        }
Ryan Olson's avatar
Ryan Olson committed
104
105
106

        // creates an endpoint for the service
        let service_endpoint = group
107
            .endpoint(&endpoint.name_with_id(connection_id))
Ryan Olson's avatar
Ryan Olson committed
108
109
110
            .await
            .map_err(|e| anyhow::anyhow!("Failed to start endpoint: {e}"))?;

111
112
113
        // This creates a child token of the runtime's endpoint_shutdown_token. That token is
        // cancelled first as part of graceful shutdown. See Runtime::shutdown.
        let endpoint_shutdown_token = endpoint.drt().child_token();
114
115
116
117
118
119

        // Extract all values needed from endpoint before any spawns
        let namespace_name = endpoint.component.namespace.name.clone();
        let component_name = endpoint.component.name.clone();
        let endpoint_name = endpoint.name.clone();
        let system_health = endpoint.drt().system_health.clone();
120
121
        let subject = endpoint.subject_to(connection_id);
        let etcd_path = endpoint.etcd_path_with_lease_id(connection_id);
122
123
        let etcd_client = endpoint.component.drt.etcd_client.clone();

124
125
126
127
128
129
        // Register health check target in SystemHealth if provided
        if let Some(health_check_payload) = &health_check_payload {
            let instance = Instance {
                component: component_name.clone(),
                endpoint: endpoint_name.clone(),
                namespace: namespace_name.clone(),
130
                instance_id: connection_id,
131
132
                transport: TransportType::NatsTcp(subject.clone()),
            };
133
            tracing::debug!(endpoint_name = %endpoint_name, "Registering endpoint health check target");
134
            let guard = system_health.lock();
135
136
137
138
139
140
            guard.register_health_check_target(
                &endpoint_name,
                instance,
                health_check_payload.clone(),
            );
            if let Some(notifier) = guard.get_endpoint_health_check_notifier(&endpoint_name) {
141
142
143
144
                handler.set_endpoint_health_check_notifier(notifier)?;
            }
        }

145
146
147
148
149
150
151
152
153
154
155
        // Register with graceful shutdown tracker if needed
        if graceful_shutdown {
            tracing::debug!(
                "Registering endpoint '{}' with graceful shutdown tracker",
                endpoint.name
            );
            let tracker = endpoint.drt().graceful_shutdown_tracker();
            tracker.register_endpoint();
        } else {
            tracing::debug!("Endpoint '{}' has graceful_shutdown=false", endpoint.name);
        }
Ryan Olson's avatar
Ryan Olson committed
156
157
158

        let push_endpoint = PushEndpoint::builder()
            .service_handler(handler)
159
            .cancellation_token(endpoint_shutdown_token.clone())
160
            .graceful_shutdown(graceful_shutdown)
Ryan Olson's avatar
Ryan Olson committed
161
162
163
            .build()
            .map_err(|e| anyhow::anyhow!("Failed to build push endpoint: {e}"))?;

164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
        let tracker_clone = if graceful_shutdown {
            Some(endpoint.drt().graceful_shutdown_tracker())
        } else {
            None
        };

        // Create clones for the async closure
        let namespace_name_for_task = namespace_name.clone();
        let component_name_for_task = component_name.clone();
        let endpoint_name_for_task = endpoint_name.clone();

        let task = tokio::spawn(async move {
            let result = push_endpoint
                .start(
                    service_endpoint,
                    namespace_name_for_task,
                    component_name_for_task,
                    endpoint_name_for_task,
182
                    connection_id,
183
184
185
186
187
188
189
190
191
192
193
194
                    system_health,
                )
                .await;

            // Unregister from graceful shutdown tracker
            if let Some(tracker) = tracker_clone {
                tracing::debug!("Unregistering endpoint from graceful shutdown tracker");
                tracker.unregister_endpoint();
            }

            result
        });
Ryan Olson's avatar
Ryan Olson committed
195
196
197
198

        // make the components service endpoint discovery in etcd

        // client.register_service()
199
        let info = Instance {
200
201
202
            component: component_name.clone(),
            endpoint: endpoint_name.clone(),
            namespace: namespace_name.clone(),
203
            instance_id: connection_id,
204
            transport: TransportType::NatsTcp(subject),
Ryan Olson's avatar
Ryan Olson committed
205
206
207
208
        };

        let info = serde_json::to_vec_pretty(&info)?;

209
        if let Some(etcd_client) = &etcd_client
210
            && let Err(e) = etcd_client
211
                .kv_create(&etcd_path, info, Some(connection_id))
212
                .await
213
        {
214
215
216
217
218
219
            tracing::error!(
                component_name,
                endpoint_name,
                error = %e,
                "Unable to register service for discovery"
            );
220
            endpoint_shutdown_token.cancel();
221
222
223
            return Err(error!(
                "Unable to register service for discovery. Check discovery service status"
            ));
Ryan Olson's avatar
Ryan Olson committed
224
225
226
227
228
229
        }
        task.await??;

        Ok(())
    }
}