service.rs 2.29 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3

4
5
6
use super::*;
use crate::component::Component;
use crate::config::RequestPlaneMode;
7
8
9
use async_nats::service::Service as NatsService;
use async_nats::service::ServiceExt as _;
use derive_builder::Builder;
Ryan Olson's avatar
Ryan Olson committed
10
use derive_getters::Dissolve;
11
use parking_lot::Mutex;
12
use std::collections::HashMap;
13
use std::sync::Arc;
Ryan Olson's avatar
Ryan Olson committed
14

15
pub use super::endpoint::EndpointStats;
16
17

type StatsHandlerRegistry = Arc<Mutex<HashMap<String, EndpointStatsHandler>>>;
Ryan Olson's avatar
Ryan Olson committed
18
pub type StatsHandler =
19
    Box<dyn FnMut(String, EndpointStats) -> serde_json::Value + Send + Sync + 'static>;
20
pub type EndpointStatsHandler =
21
    Box<dyn FnMut(EndpointStats) -> serde_json::Value + Send + Sync + 'static>;
22

Neelay Shah's avatar
Neelay Shah committed
23
pub const PROJECT_NAME: &str = "Dynamo";
24
const SERVICE_VERSION: &str = env!("CARGO_PKG_VERSION");
25

26
pub async fn build_nats_service(
27
28
29
30
31
32
    nats_client: &crate::transports::nats::Client,
    component: &Component,
    description: Option<String>,
) -> anyhow::Result<(NatsService, StatsHandlerRegistry)> {
    let service_name = component.service_name();
    tracing::trace!("component: {component}; creating, service_name: {service_name}");
Ryan Olson's avatar
Ryan Olson committed
33

34
35
36
37
    let description = description.unwrap_or(format!(
        "{PROJECT_NAME} component {} in namespace {}",
        component.name, component.namespace
    ));
38

39
40
41
42
43
44
45
    let stats_handler_registry: StatsHandlerRegistry = Arc::new(Mutex::new(HashMap::new()));
    let stats_handler_registry_clone = stats_handler_registry.clone();

    let nats_service_builder = nats_client.client().service_builder();

    let nats_service_builder =
        nats_service_builder
46
47
            .description(description)
            .stats_handler(move |name, stats| {
48
                tracing::trace!("stats_handler: {name}, {stats:?}");
49
                let mut guard = stats_handler_registry.lock();
50
51
52
53
                match guard.get_mut(&name) {
                    Some(handler) => handler(stats),
                    None => serde_json::Value::Null,
                }
54
            });
55
56
57
58
    let nats_service = nats_service_builder
        .start(service_name, SERVICE_VERSION.to_string())
        .await
        .map_err(|e| anyhow::anyhow!("Failed to start NATS service: {e}"))?;
59

60
    Ok((nats_service, stats_handler_registry_clone))
Ryan Olson's avatar
Ryan Olson committed
61
}