"tests/models/language/pooling/test_jina.py" did not exist on "b724afe343b788d61ca0425892f2631669f97f45"
http_metrics.rs 3.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use dynamo_llm::http::service::metrics::{self, Endpoint};
use dynamo_llm::http::service::service_v2::HttpService;
use dynamo_runtime::CancellationToken;
use serial_test::serial;
use std::{env, time::Duration};

10
11
12
13
#[path = "common/ports.rs"]
mod ports;
use ports::get_random_port;

14
15
16
17
#[tokio::test]
#[serial]
async fn metrics_prefix_default_then_env_override() {
    // Case 1: default prefix
18
    unsafe { env::remove_var(metrics::METRICS_PREFIX_ENV) };
19
20
    let p1 = get_random_port().await;
    let svc1 = HttpService::builder().port(p1).build().unwrap();
21
    let token1 = CancellationToken::new();
22
23
    let h1 = svc1.spawn(token1.clone()).await;
    wait_for_metrics_ready(p1).await;
24
25
26
27
28
29
30
31
32
33

    // Populate labeled metrics
    let s1 = svc1.state_clone();
    {
        let _g = s1.metrics_clone().create_inflight_guard(
            "test-model",
            Endpoint::ChatCompletions,
            false,
        );
    }
34
    let body1 = reqwest::get(format!("http://localhost:{}/metrics", p1))
35
36
37
38
39
40
41
        .await
        .unwrap()
        .text()
        .await
        .unwrap();
    assert!(body1.contains("dynamo_frontend_requests_total"));
    token1.cancel();
42
    let _ = h1.await; // ensure port is released
43
44

    // Case 2: env override to prefix
45
    unsafe { env::set_var(metrics::METRICS_PREFIX_ENV, "custom_prefix") };
46
47
    let p2 = get_random_port().await;
    let svc2 = HttpService::builder().port(p2).build().unwrap();
48
    let token2 = CancellationToken::new();
49
50
    let h2 = svc2.spawn(token2.clone()).await;
    wait_for_metrics_ready(p2).await;
51
52
53
54
55
56
57
58
59

    // Populate labeled metrics
    let s2 = svc2.state_clone();
    {
        let _g =
            s2.metrics_clone()
                .create_inflight_guard("test-model", Endpoint::ChatCompletions, true);
    }
    // Single fetch and assert
60
    let body2 = reqwest::get(format!("http://localhost:{}/metrics", p2))
61
62
63
64
65
66
67
68
        .await
        .unwrap()
        .text()
        .await
        .unwrap();
    assert!(body2.contains("custom_prefix_requests_total"));
    assert!(!body2.contains("dynamo_frontend_requests_total"));
    token2.cancel();
69
    let _ = h2.await;
70
71

    // Case 3: invalid env prefix is sanitized
72
    unsafe { env::set_var(metrics::METRICS_PREFIX_ENV, "nv-llm/http service") };
73
74
    let p3 = get_random_port().await;
    let svc3 = HttpService::builder().port(p3).build().unwrap();
75
    let token3 = CancellationToken::new();
76
77
    let h3 = svc3.spawn(token3.clone()).await;
    wait_for_metrics_ready(p3).await;
78
79
80
81
82
83
84

    let s3 = svc3.state_clone();
    {
        let _g =
            s3.metrics_clone()
                .create_inflight_guard("test-model", Endpoint::ChatCompletions, true);
    }
85
    let body3 = reqwest::get(format!("http://localhost:{}/metrics", p3))
86
87
88
89
90
91
92
93
        .await
        .unwrap()
        .text()
        .await
        .unwrap();
    assert!(body3.contains("nv_llm_http_service_requests_total"));
    assert!(!body3.contains("dynamo_frontend_requests_total"));
    token3.cancel();
94
    let _ = h3.await;
95
96

    // Cleanup env to avoid leaking state
97
    unsafe { env::remove_var(metrics::METRICS_PREFIX_ENV) };
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
}

// Poll /metrics until ready or timeout
async fn wait_for_metrics_ready(port: u16) {
    let url = format!("http://localhost:{}/metrics", port);
    let start = tokio::time::Instant::now();
    let timeout = Duration::from_secs(5);
    loop {
        if start.elapsed() > timeout {
            panic!("Timed out waiting for metrics endpoint at {}", url);
        }
        match reqwest::get(&url).await {
            Ok(resp) if resp.status().is_success() => break,
            _ => tokio::time::sleep(Duration::from_millis(50)).await,
        }
    }
}