lib.rs 3.18 KB
Newer Older
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

4
use dynamo_runtime::{
5
    DistributedRuntime,
6
    metrics::MetricsHierarchy,
7
    pipeline::{
8
9
        AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn,
        async_trait, network::Ingress,
10
11
    },
    protocols::annotated::Annotated,
12
    stream,
13
14
15
};
use prometheus::IntCounter;
use std::sync::Arc;
16

17
18
19
pub const DEFAULT_NAMESPACE: &str = "dyn_example_namespace";
pub const DEFAULT_COMPONENT: &str = "dyn_example_component";
pub const DEFAULT_ENDPOINT: &str = "dyn_example_endpoint";
20

21
22
23
/// Custom metrics for system stats with data bytes tracking
#[derive(Clone, Debug)]
pub struct MySystemStatsMetrics {
24
    pub data_bytes_processed: IntCounter,
25
26
27
}

impl MySystemStatsMetrics {
28
    pub fn from_endpoint(endpoint: &dynamo_runtime::component::Endpoint) -> anyhow::Result<Self> {
29
        let data_bytes_processed = endpoint.metrics().create_intcounter(
30
31
32
33
34
35
36
37
38
39
40
41
42
            "my_custom_bytes_processed_total",
            "Example of a custom metric. Total number of data bytes processed by system handler",
            &[],
        )?;

        Ok(Self {
            data_bytes_processed,
        })
    }
}

#[derive(Clone)]
pub struct RequestHandler {
43
    metrics: Option<MySystemStatsMetrics>,
44
45
46
47
48
49
50
51
52
}

impl RequestHandler {
    pub fn new() -> Arc<Self> {
        Arc::new(Self { metrics: None })
    }

    pub fn with_metrics(metrics: MySystemStatsMetrics) -> Arc<Self> {
        Arc::new(Self {
53
            metrics: Some(metrics),
54
55
56
57
58
59
        })
    }
}

#[async_trait]
impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for RequestHandler {
60
61
62
63
    async fn generate(
        &self,
        input: SingleIn<String>,
    ) -> anyhow::Result<ManyOut<Annotated<String>>> {
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
        let (data, ctx) = input.into_parts();

        // Track data bytes processed if metrics are available
        if let Some(metrics) = &self.metrics {
            metrics.data_bytes_processed.inc_by(data.len() as u64);
        }

        let chars = data
            .chars()
            .map(|c| Annotated::from_data(c.to_string()))
            .collect::<Vec<_>>();

        let stream = stream::iter(chars);

        Ok(ResponseStream::new(Box::pin(stream), ctx.context()))
    }
}

82
/// Backend function that sets up the system status server with metrics and ingress handler
83
/// This function can be reused by integration tests to ensure they use the exact same setup
84
pub async fn backend(drt: DistributedRuntime, endpoint_name: Option<&str>) -> anyhow::Result<()> {
85
86
    let endpoint_name = endpoint_name.unwrap_or(DEFAULT_ENDPOINT);

87
    let component = drt
88
        .namespace(DEFAULT_NAMESPACE)?
89
90
        .component(DEFAULT_COMPONENT)?;
    let endpoint = component.endpoint(endpoint_name);
91
92
93
94
95
96
97
98

    // Create custom metrics for system stats
    let system_metrics =
        MySystemStatsMetrics::from_endpoint(&endpoint).expect("Failed to create system metrics");

    // Use the factory pattern - single line factory call with metrics
    let ingress = Ingress::for_engine(RequestHandler::with_metrics(system_metrics))?;

99
    endpoint.endpoint_builder().handler(ingress).start().await?;
100
101

    Ok(())
102
}