endpoint.rs 4.48 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
Ryan Olson's avatar
Ryan Olson committed
15
16
17
18
19
20
21
22
23
24
25
26

use derive_getters::Dissolve;

use super::*;

#[derive(Educe, Builder, Dissolve)]
#[educe(Debug)]
#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
pub struct EndpointConfig {
    #[builder(private)]
    endpoint: Endpoint,

27
    // todo: move lease to component/service
Ryan Olson's avatar
Ryan Olson committed
28
29
30
31
32
33
34
35
    /// Lease
    #[educe(Debug(ignore))]
    #[builder(default)]
    lease: Option<Lease>,

    /// Endpoint handler
    #[educe(Debug(ignore))]
    handler: Arc<dyn PushWorkHandler>,
36
37
38
39
40

    /// Stats handler
    #[educe(Debug(ignore))]
    #[builder(default, private)]
    _stats_handler: Option<EndpointStatsHandler>,
Ryan Olson's avatar
Ryan Olson committed
41
42
43
44
45
46
47
}

impl EndpointConfigBuilder {
    pub(crate) fn from_endpoint(endpoint: Endpoint) -> Self {
        Self::default().endpoint(endpoint)
    }

48
49
50
51
52
53
54
    pub fn stats_handler<F>(self, handler: F) -> Self
    where
        F: FnMut(async_nats::service::endpoint::Stats) -> serde_json::Value + Send + Sync + 'static,
    {
        self._stats_handler(Some(Box::new(handler)))
    }

Ryan Olson's avatar
Ryan Olson committed
55
    pub async fn start(self) -> Result<()> {
56
        let (endpoint, lease, handler, stats_handler) = self.build_internal()?.dissolve();
57
        let lease = lease.unwrap_or(endpoint.drt().primary_lease());
Ryan Olson's avatar
Ryan Olson committed
58

59
        tracing::debug!(
Ryan Olson's avatar
Ryan Olson committed
60
61
62
63
            "Starting endpoint: {}",
            endpoint.etcd_path_with_id(lease.id())
        );

64
65
66
67
68
69
70
        let service_name = endpoint.component.service_name();

        // acquire the registry lock
        let registry = endpoint.drt().component_registry.inner.lock().await;

        // get the group
        let group = registry
Ryan Olson's avatar
Ryan Olson committed
71
            .services
72
            .get(&service_name)
Ryan Olson's avatar
Ryan Olson committed
73
            .map(|service| service.group(endpoint.component.service_name()))
Ryan Olson's avatar
Ryan Olson committed
74
75
            .ok_or(error!("Service not found"))?;

76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
        // get the stats handler map
        let handler_map = registry
            .stats_handlers
            .get(&service_name)
            .cloned()
            .expect("no stats handler registry; this is unexpected");

        drop(registry);

        // insert the stats handler
        if let Some(stats_handler) = stats_handler {
            handler_map
                .lock()
                .unwrap()
                .insert(endpoint.subject_to(lease.id()), stats_handler);
        }
Ryan Olson's avatar
Ryan Olson committed
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117

        // creates an endpoint for the service
        let service_endpoint = group
            .endpoint(&endpoint.name_with_id(lease.id()))
            .await
            .map_err(|e| anyhow::anyhow!("Failed to start endpoint: {e}"))?;

        let cancel_token = lease.child_token();

        let push_endpoint = PushEndpoint::builder()
            .service_handler(handler)
            .cancellation_token(cancel_token.clone())
            .build()
            .map_err(|e| anyhow::anyhow!("Failed to build push endpoint: {e}"))?;

        // launch in primary runtime
        let task = tokio::spawn(push_endpoint.start(service_endpoint));

        // make the components service endpoint discovery in etcd

        // client.register_service()
        let info = ComponentEndpointInfo {
            component: endpoint.component.name.clone(),
            endpoint: endpoint.name.clone(),
            namespace: endpoint.component.namespace.clone(),
            lease_id: lease.id(),
Ryan Olson's avatar
Ryan Olson committed
118
            transport: TransportType::NatsTcp(endpoint.subject_to(lease.id())),
Ryan Olson's avatar
Ryan Olson committed
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
        };

        let info = serde_json::to_vec_pretty(&info)?;

        if let Err(e) = endpoint
            .component
            .drt
            .etcd_client
            .kv_create(
                endpoint.etcd_path_with_id(lease.id()),
                info,
                Some(lease.id()),
            )
            .await
        {
134
            tracing::error!("Failed to register discoverable service: {:?}", e);
Ryan Olson's avatar
Ryan Olson committed
135
136
137
138
139
140
141
142
143
            cancel_token.cancel();
            return Err(error!("Failed to register discoverable service"));
        }

        task.await??;

        Ok(())
    }
}