Unverified Commit 3411bda8 authored by ryan-lempka's avatar ryan-lempka Committed by GitHub
Browse files

feat: enable custom metrics prefix (#2432)

parent 086ea4f0
...@@ -1902,6 +1902,7 @@ dependencies = [ ...@@ -1902,6 +1902,7 @@ dependencies = [
"sentencepiece", "sentencepiece",
"serde", "serde",
"serde_json", "serde_json",
"serial_test",
"strum", "strum",
"tempfile", "tempfile",
"thiserror 2.0.12", "thiserror 2.0.12",
...@@ -6255,6 +6256,15 @@ dependencies = [ ...@@ -6255,6 +6256,15 @@ dependencies = [
"winapi-util", "winapi-util",
] ]
[[package]]
name = "scc"
version = "2.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22b2d775fb28f245817589471dd49c5edf64237f4a19d10ce9a92ff4651a27f4"
dependencies = [
"sdd",
]
[[package]] [[package]]
name = "schannel" name = "schannel"
version = "0.1.27" version = "0.1.27"
...@@ -6309,6 +6319,12 @@ dependencies = [ ...@@ -6309,6 +6319,12 @@ dependencies = [
"tendril", "tendril",
] ]
[[package]]
name = "sdd"
version = "3.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "490dcfcbfef26be6800d11870ff2df8774fa6e86d047e3e8c8a76b25655e41ca"
[[package]] [[package]]
name = "secrecy" name = "secrecy"
version = "0.10.3" version = "0.10.3"
...@@ -6554,6 +6570,31 @@ dependencies = [ ...@@ -6554,6 +6570,31 @@ dependencies = [
"unsafe-libyaml", "unsafe-libyaml",
] ]
[[package]]
name = "serial_test"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b258109f244e1d6891bf1053a55d63a5cd4f8f4c30cf9a1280989f80e7a1fa9"
dependencies = [
"futures",
"log",
"once_cell",
"parking_lot",
"scc",
"serial_test_derive",
]
[[package]]
name = "serial_test_derive"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d69265a08751de7844521fd15003ae0a888e035773ba05695c5c759a6f89eef"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]] [[package]]
name = "servo_arc" name = "servo_arc"
version = "0.4.0" version = "0.4.0"
......
...@@ -133,6 +133,12 @@ def parse_args(): ...@@ -133,6 +133,12 @@ def parse_args():
type=validate_model_path, type=validate_model_path,
help="Path to model directory on disk (e.g., /tmp/model_cache/lama3.2_1B/)", help="Path to model directory on disk (e.g., /tmp/model_cache/lama3.2_1B/)",
) )
parser.add_argument(
"--metrics-prefix",
type=str,
default=None,
help="Prefix for Dynamo frontend metrics. If unset, uses DYN_METRICS_PREFIX env var or 'dynamo_frontend'.",
)
flags = parser.parse_args() flags = parser.parse_args()
...@@ -146,6 +152,12 @@ async def async_main(): ...@@ -146,6 +152,12 @@ async def async_main():
flags = parse_args() flags = parse_args()
is_static = bool(flags.static_endpoint) # true if the string has a value is_static = bool(flags.static_endpoint) # true if the string has a value
# Configure Dynamo frontend HTTP service metrics prefix
if flags.metrics_prefix is not None:
prefix = flags.metrics_prefix.strip()
if prefix:
os.environ["DYN_METRICS_PREFIX"] = flags.metrics_prefix
runtime = DistributedRuntime(asyncio.get_running_loop(), is_static) runtime = DistributedRuntime(asyncio.get_running_loop(), is_static)
if flags.router_mode == "kv": if flags.router_mode == "kv":
......
...@@ -143,6 +143,7 @@ proptest = "1.5.0" ...@@ -143,6 +143,7 @@ proptest = "1.5.0"
reqwest = { workspace = true } reqwest = { workspace = true }
rstest = "0.18.2" rstest = "0.18.2"
rstest_reuse = "0.7.0" rstest_reuse = "0.7.0"
serial_test = "3"
tempfile = "3.17.1" tempfile = "3.17.1"
insta = { version = "1.41", features = [ insta = { version = "1.41", features = [
"glob", "glob",
......
...@@ -12,9 +12,12 @@ pub use prometheus::Registry; ...@@ -12,9 +12,12 @@ pub use prometheus::Registry;
use super::RouteDoc; use super::RouteDoc;
/// Metric prefix for all HTTP service metrics // Default metric prefix
pub const FRONTEND_METRIC_PREFIX: &str = "dynamo_frontend"; pub const FRONTEND_METRIC_PREFIX: &str = "dynamo_frontend";
// Environment variable that overrides the default metric prefix if provided
pub const METRICS_PREFIX_ENV: &str = "DYN_METRICS_PREFIX";
/// Value for the `status` label in the request counter for successful requests /// Value for the `status` label in the request counter for successful requests
pub const REQUEST_STATUS_SUCCESS: &str = "success"; pub const REQUEST_STATUS_SUCCESS: &str = "success";
...@@ -27,9 +30,29 @@ pub const REQUEST_TYPE_STREAM: &str = "stream"; ...@@ -27,9 +30,29 @@ pub const REQUEST_TYPE_STREAM: &str = "stream";
/// Partial value for the `type` label in the request counter for unary requests /// Partial value for the `type` label in the request counter for unary requests
pub const REQUEST_TYPE_UNARY: &str = "unary"; pub const REQUEST_TYPE_UNARY: &str = "unary";
/// Helper function to construct metric names with the standard prefix fn sanitize_prometheus_prefix(raw: &str) -> String {
fn frontend_metric_name(suffix: &str) -> String { // Prometheus metric name pattern: [a-zA-Z_:][a-zA-Z0-9_:]*
format!("{}_{}", FRONTEND_METRIC_PREFIX, suffix) let mut s: String = raw
.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || c == '_' || c == ':' {
c
} else {
'_'
}
})
.collect();
if s.is_empty() {
return FRONTEND_METRIC_PREFIX.to_string();
}
let first = s.as_bytes()[0];
let valid_first = first.is_ascii_alphabetic() || first == b'_' || first == b':';
if !valid_first {
s.insert(0, '_');
}
s
} }
pub struct Metrics { pub struct Metrics {
...@@ -107,16 +130,31 @@ impl Default for Metrics { ...@@ -107,16 +130,31 @@ impl Default for Metrics {
} }
impl Metrics { impl Metrics {
/// Create Metrics with the standard prefix defined by [`FRONTEND_METRIC_PREFIX`] /// Create Metrics with the standard prefix defined by [`FRONTEND_METRIC_PREFIX`] or specify custom prefix via the following environment variable:
/// The following metrics will be created: /// - `DYN_METRICS_PREFIX`: Override the default metrics prefix
/// - `dynamo_frontend_requests_total` - IntCounterVec for the total number of requests processed ///
/// - `dynamo_frontend_inflight_requests` - IntGaugeVec for the number of inflight requests /// The following metrics will be created with the configured prefix:
/// - `dynamo_frontend_request_duration_seconds` - HistogramVec for the duration of requests /// - `{prefix}_requests_total` - IntCounterVec for the total number of requests processed
/// - `dynamo_frontend_input_sequence_tokens` - HistogramVec for input sequence length in tokens /// - `{prefix}_inflight_requests` - IntGaugeVec for the number of inflight requests
/// - `dynamo_frontend_output_sequence_tokens` - HistogramVec for output sequence length in tokens /// - `{prefix}_request_duration_seconds` - HistogramVec for the duration of requests
/// - `dynamo_frontend_time_to_first_token_seconds` - HistogramVec for time to first token in seconds /// - `{prefix}_input_sequence_tokens` - HistogramVec for input sequence length in tokens
/// - `dynamo_frontend_inter_token_latency_seconds` - HistogramVec for inter-token latency in seconds /// - `{prefix}_output_sequence_tokens` - HistogramVec for output sequence length in tokens
/// - `{prefix}_time_to_first_token_seconds` - HistogramVec for time to first token in seconds
/// - `{prefix}_inter_token_latency_seconds` - HistogramVec for inter-token latency in seconds
pub fn new() -> Self { pub fn new() -> Self {
let raw_prefix = std::env::var(METRICS_PREFIX_ENV)
.unwrap_or_else(|_| FRONTEND_METRIC_PREFIX.to_string());
let prefix = sanitize_prometheus_prefix(&raw_prefix);
if prefix != raw_prefix {
tracing::warn!(
raw=%raw_prefix,
sanitized=%prefix,
env=%METRICS_PREFIX_ENV,
"Sanitized HTTP metrics prefix"
);
}
let frontend_metric_name = |suffix: &str| format!("{}_{}", &prefix, suffix);
let request_counter = IntCounterVec::new( let request_counter = IntCounterVec::new(
Opts::new( Opts::new(
frontend_metric_name("requests_total"), frontend_metric_name("requests_total"),
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use dynamo_llm::http::service::metrics::{self, Endpoint};
use dynamo_llm::http::service::service_v2::HttpService;
use dynamo_runtime::CancellationToken;
use serial_test::serial;
use std::{env, time::Duration};
#[tokio::test]
#[serial]
async fn metrics_prefix_default_then_env_override() {
// Case 1: default prefix
env::remove_var(metrics::METRICS_PREFIX_ENV);
let svc1 = HttpService::builder().port(9101).build().unwrap();
let token1 = CancellationToken::new();
let _h1 = svc1.spawn(token1.clone()).await;
wait_for_metrics_ready(9101).await;
// Populate labeled metrics
let s1 = svc1.state_clone();
{
let _g = s1.metrics_clone().create_inflight_guard(
"test-model",
Endpoint::ChatCompletions,
false,
);
}
let body1 = reqwest::get("http://localhost:9101/metrics")
.await
.unwrap()
.text()
.await
.unwrap();
assert!(body1.contains("dynamo_frontend_requests_total"));
token1.cancel();
// Case 2: env override to prefix
env::set_var(metrics::METRICS_PREFIX_ENV, "custom_prefix");
let svc2 = HttpService::builder().port(9102).build().unwrap();
let token2 = CancellationToken::new();
let _h2 = svc2.spawn(token2.clone()).await;
wait_for_metrics_ready(9102).await;
// Populate labeled metrics
let s2 = svc2.state_clone();
{
let _g =
s2.metrics_clone()
.create_inflight_guard("test-model", Endpoint::ChatCompletions, true);
}
// Single fetch and assert
let body2 = reqwest::get("http://localhost:9102/metrics")
.await
.unwrap()
.text()
.await
.unwrap();
assert!(body2.contains("custom_prefix_requests_total"));
assert!(!body2.contains("dynamo_frontend_requests_total"));
token2.cancel();
// Case 3: invalid env prefix is sanitized
env::set_var(metrics::METRICS_PREFIX_ENV, "nv-llm/http service");
let svc3 = HttpService::builder().port(9103).build().unwrap();
let token3 = CancellationToken::new();
let _h3 = svc3.spawn(token3.clone()).await;
wait_for_metrics_ready(9103).await;
let s3 = svc3.state_clone();
{
let _g =
s3.metrics_clone()
.create_inflight_guard("test-model", Endpoint::ChatCompletions, true);
}
let body3 = reqwest::get("http://localhost:9103/metrics")
.await
.unwrap()
.text()
.await
.unwrap();
assert!(body3.contains("nv_llm_http_service_requests_total"));
assert!(!body3.contains("dynamo_frontend_requests_total"));
token3.cancel();
// Cleanup env to avoid leaking state
env::remove_var(metrics::METRICS_PREFIX_ENV);
}
// Poll /metrics until ready or timeout
async fn wait_for_metrics_ready(port: u16) {
let url = format!("http://localhost:{}/metrics", port);
let start = tokio::time::Instant::now();
let timeout = Duration::from_secs(5);
loop {
if start.elapsed() > timeout {
panic!("Timed out waiting for metrics endpoint at {}", url);
}
match reqwest::get(&url).await {
Ok(resp) if resp.status().is_success() => break,
_ => tokio::time::sleep(Duration::from_millis(50)).await,
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment