lib.rs 3.69 KB
Newer Older
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

4
use dynamo_runtime::{
5
    DistributedRuntime, Result,
6
    metrics::MetricsHierarchy,
7
    pipeline::{
8
9
        AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn,
        async_trait, network::Ingress,
10
11
    },
    protocols::annotated::Annotated,
12
    stream,
13
14
15
};
use prometheus::IntCounter;
use std::sync::Arc;
16

17
18
19
pub const DEFAULT_NAMESPACE: &str = "dyn_example_namespace";
pub const DEFAULT_COMPONENT: &str = "dyn_example_component";
pub const DEFAULT_ENDPOINT: &str = "dyn_example_endpoint";
20

21
22
/// Stats structure returned by the endpoint's stats handler
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
23
pub struct MyStats {
24
25
26
27
28
29
30
    // Example value for demonstration purposes
    pub val: i32,
}

/// Custom metrics for system stats with data bytes tracking
#[derive(Clone, Debug)]
pub struct MySystemStatsMetrics {
31
    pub data_bytes_processed: IntCounter,
32
33
34
}

impl MySystemStatsMetrics {
35
    pub fn from_endpoint(endpoint: &dynamo_runtime::component::Endpoint) -> anyhow::Result<Self> {
36
        let data_bytes_processed = endpoint.metrics().create_intcounter(
37
38
39
40
41
42
43
44
45
46
47
48
49
            "my_custom_bytes_processed_total",
            "Example of a custom metric. Total number of data bytes processed by system handler",
            &[],
        )?;

        Ok(Self {
            data_bytes_processed,
        })
    }
}

#[derive(Clone)]
pub struct RequestHandler {
50
    metrics: Option<MySystemStatsMetrics>,
51
52
53
54
55
56
57
58
59
}

impl RequestHandler {
    pub fn new() -> Arc<Self> {
        Arc::new(Self { metrics: None })
    }

    pub fn with_metrics(metrics: MySystemStatsMetrics) -> Arc<Self> {
        Arc::new(Self {
60
            metrics: Some(metrics),
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
        })
    }
}

#[async_trait]
impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for RequestHandler {
    async fn generate(&self, input: SingleIn<String>) -> Result<ManyOut<Annotated<String>>> {
        let (data, ctx) = input.into_parts();

        // Track data bytes processed if metrics are available
        if let Some(metrics) = &self.metrics {
            metrics.data_bytes_processed.inc_by(data.len() as u64);
        }

        let chars = data
            .chars()
            .map(|c| Annotated::from_data(c.to_string()))
            .collect::<Vec<_>>();

        let stream = stream::iter(chars);

        Ok(ResponseStream::new(Box::pin(stream), ctx.context()))
    }
}

86
/// Backend function that sets up the system status server with metrics and ingress handler
87
88
89
90
/// This function can be reused by integration tests to ensure they use the exact same setup
pub async fn backend(drt: DistributedRuntime, endpoint_name: Option<&str>) -> Result<()> {
    let endpoint_name = endpoint_name.unwrap_or(DEFAULT_ENDPOINT);

91
    let mut component = drt
92
        .namespace(DEFAULT_NAMESPACE)?
93
94
95
        .component(DEFAULT_COMPONENT)?;
    component.add_stats_service().await?;
    let endpoint = component.endpoint(endpoint_name);
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116

    // Create custom metrics for system stats
    let system_metrics =
        MySystemStatsMetrics::from_endpoint(&endpoint).expect("Failed to create system metrics");

    // Use the factory pattern - single line factory call with metrics
    let ingress = Ingress::for_engine(RequestHandler::with_metrics(system_metrics))?;

    endpoint
        .endpoint_builder()
        .stats_handler(|_stats| {
            println!("Stats handler called with stats: {:?}", _stats);
            // TODO(keivenc): return a real stats object
            let stats = MyStats { val: 10 };
            serde_json::to_value(stats).unwrap()
        })
        .handler(ingress)
        .start()
        .await?;

    Ok(())
117
}