service.rs 3.06 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/*
 * Copyright 2024-2025 NVIDIA CORPORATION & AFFILIATES
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

use derive_getters::Dissolve;

use super::*;

use async_nats::service::{endpoint, Service};

pub type StatsHandler =
    Box<dyn FnMut(String, endpoint::Stats) -> serde_json::Value + Send + Sync + 'static>;

#[derive(Educe, Builder, Dissolve)]
#[educe(Debug)]
#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
pub struct ServiceConfig {
    #[builder(private)]
    component: Component,

    /// Description
    #[builder(default)]
    description: Option<String>,

    // todo - make optional - if None, then skip making the endpoint
    // and skip making the service-endpoint discoverable.
    /// Endpoint handler
    #[educe(Debug(ignore))]
    #[builder(default)]
    stats_handler: Option<StatsHandler>,
}

impl ServiceConfigBuilder {
    /// Create the [`Component`]'s service and store it in the registry.
    pub async fn create(self) -> Result<Component> {
        let version = "0.0.1".to_string();

        let (component, description, stat_handler) = self.build_internal()?.dissolve();

        let service_name = component.slug();
        let description = description.unwrap_or(format!(
            "Triton Component {} in {}",
            component.name, component.namespace
        ));

        let mut guard = component.drt.component_registry.services.lock().await;

        if guard.contains_key(&component.etcd_path()) {
            return Err(anyhow::anyhow!("Service already exists"));
        }

        // create service on the secondary runtime
        let secondary = component.drt.runtime.secondary.clone();
        let builder = component.drt.nats_client.client().service_builder();
        let service = secondary
            .spawn(async move {
                // unwrap the stats handler
                let builder = match stat_handler {
                    Some(handler) => builder.stats_handler(handler),
                    None => builder,
                };

                log::debug!("Starting service: {}", service_name);

                builder
                    .description(description)
                    .start(service_name.to_string(), version)
                    .await
            })
            .await?
            .map_err(|e| anyhow::anyhow!("Failed to start service: {e}"))?;

        guard.insert(component.etcd_path(), service);
        drop(guard);

        Ok(component)
    }
}

impl ServiceConfigBuilder {
    pub(crate) fn from_component(component: Component) -> Self {
        Self::default().component(component)
    }
}