service.rs 4.5 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3

4
5
6
use async_nats::service::Service as NatsService;
use async_nats::service::ServiceExt as _;
use derive_builder::Builder;
Ryan Olson's avatar
Ryan Olson committed
7
use derive_getters::Dissolve;
8
use std::collections::HashMap;
9
use std::sync::{Arc, Mutex};
Ryan Olson's avatar
Ryan Olson committed
10

11
use crate::component::Component;
Ryan Olson's avatar
Ryan Olson committed
12

13
pub use super::endpoint::EndpointStats;
14
15
16
17

use educe::Educe;

type StatsHandlerRegistry = Arc<Mutex<HashMap<String, EndpointStatsHandler>>>;
Ryan Olson's avatar
Ryan Olson committed
18
pub type StatsHandler =
19
    Box<dyn FnMut(String, EndpointStats) -> serde_json::Value + Send + Sync + 'static>;
20
pub type EndpointStatsHandler =
21
    Box<dyn FnMut(EndpointStats) -> serde_json::Value + Send + Sync + 'static>;
22

Neelay Shah's avatar
Neelay Shah committed
23
pub const PROJECT_NAME: &str = "Dynamo";
24
const SERVICE_VERSION: &str = env!("CARGO_PKG_VERSION");
25

Ryan Olson's avatar
Ryan Olson committed
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#[derive(Educe, Builder, Dissolve)]
#[educe(Debug)]
#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
pub struct ServiceConfig {
    #[builder(private)]
    component: Component,

    /// Description
    #[builder(default)]
    description: Option<String>,
}

impl ServiceConfigBuilder {
    /// Create the [`Component`]'s service and store it in the registry.
40
    pub async fn create(self) -> anyhow::Result<Component> {
41
        let (component, description) = self.build_internal()?.dissolve();
Ryan Olson's avatar
Ryan Olson committed
42

Ryan Olson's avatar
Ryan Olson committed
43
        let service_name = component.service_name();
44

45
46
47
48
49
50
51
52
53
54
55
56
        // Pre-check to save cost of creating the service, but don't hold the lock
        if component
            .drt
            .component_registry
            .inner
            .lock()
            .await
            .services
            .contains_key(&service_name)
        {
            anyhow::bail!("Service {service_name} already exists");
        }
57

58
59
60
61
62
        let Some(nats_client) = component.drt.nats_client() else {
            anyhow::bail!("Cannot create NATS service without NATS.");
        };
        let (nats_service, stats_reg) =
            build_nats_service(nats_client, &component, description).await?;
Ryan Olson's avatar
Ryan Olson committed
63

64
        let mut guard = component.drt.component_registry.inner.lock().await;
65
66
67
68
69
70
71
72
73
74
75
76
        if !guard.services.contains_key(&service_name) {
            // Normal case
            guard.services.insert(service_name.clone(), nats_service);
            guard.stats_handlers.insert(service_name, stats_reg);
            drop(guard);
        } else {
            drop(guard);
            let _ = nats_service.stop().await;
            return Err(anyhow::anyhow!(
                "Service create race for {service_name}, now already exists"
            ));
        }
77

78
79
80
81
82
83
84
        // Register metrics callback. CRITICAL: Never fail service creation for metrics issues.
        if let Err(err) = component.start_scraping_nats_service_component_metrics() {
            tracing::debug!(
                "Metrics registration failed for '{}': {}",
                component.service_name(),
                err
            );
Ryan Olson's avatar
Ryan Olson committed
85
        }
86
87
88
89
90
91
92
93
94
95
96
        Ok(component)
    }
}

async fn build_nats_service(
    nats_client: &crate::transports::nats::Client,
    component: &Component,
    description: Option<String>,
) -> anyhow::Result<(NatsService, StatsHandlerRegistry)> {
    let service_name = component.service_name();
    tracing::trace!("component: {component}; creating, service_name: {service_name}");
Ryan Olson's avatar
Ryan Olson committed
97

98
99
100
101
    let description = description.unwrap_or(format!(
        "{PROJECT_NAME} component {} in namespace {}",
        component.name, component.namespace
    ));
102

103
104
105
106
107
108
109
    let stats_handler_registry: StatsHandlerRegistry = Arc::new(Mutex::new(HashMap::new()));
    let stats_handler_registry_clone = stats_handler_registry.clone();

    let nats_service_builder = nats_client.client().service_builder();

    let nats_service_builder =
        nats_service_builder
110
111
            .description(description)
            .stats_handler(move |name, stats| {
112
                tracing::trace!("stats_handler: {name}, {stats:?}");
113
114
115
116
117
                let mut guard = stats_handler_registry.lock().unwrap();
                match guard.get_mut(&name) {
                    Some(handler) => handler(stats),
                    None => serde_json::Value::Null,
                }
118
            });
119
120
121
122
    let nats_service = nats_service_builder
        .start(service_name, SERVICE_VERSION.to_string())
        .await
        .map_err(|e| anyhow::anyhow!("Failed to start NATS service: {e}"))?;
123

124
    Ok((nats_service, stats_handler_registry_clone))
Ryan Olson's avatar
Ryan Olson committed
125
126
127
128
129
130
131
}

impl ServiceConfigBuilder {
    pub(crate) fn from_component(component: Component) -> Self {
        Self::default().component(component)
    }
}