Unverified Commit 84c9890b authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

refactor: centralize Prometheus metrics naming and sanitization DIS-554 (#2733)


Co-authored-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent 82bae247
......@@ -2015,6 +2015,7 @@ dependencies = [
"serde_json",
"serial_test",
"strum",
"temp-env",
"tempfile",
"thiserror 2.0.16",
"tmq",
......
......@@ -43,6 +43,7 @@ class PrometheusAPIClient:
Average metric value or 0 if no data/error
"""
try:
# TODO: use prometheus_names.rs
full_metric_name = f"dynamo_frontend_{metric_name}"
query = f"increase({full_metric_name}_sum[{interval}])/increase({full_metric_name}_count[{interval}])"
result = self.prom.custom_query(query=query)
......
......@@ -155,6 +155,7 @@ reqwest = { workspace = true }
rstest = "0.18.2"
rstest_reuse = "0.7.0"
serial_test = "3"
temp-env = { version = "0.3.6", features = ["async_closure"] }
tempfile = "3.17.1"
insta = { version = "1.41", features = [
"glob",
......
......@@ -2,6 +2,9 @@
// SPDX-License-Identifier: Apache-2.0
use axum::{Router, extract::State, http::StatusCode, response::IntoResponse, routing::get};
use dynamo_runtime::metrics::prometheus_names::{
frontend_service, name_prefix, sanitize_frontend_prometheus_prefix,
};
use prometheus::{Encoder, HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts};
use std::{
sync::Arc,
......@@ -12,49 +15,6 @@ pub use prometheus::Registry;
use super::RouteDoc;
// Default metric prefix
pub const FRONTEND_METRIC_PREFIX: &str = "dynamo_frontend";
// Environment variable that overrides the default metric prefix if provided
pub const METRICS_PREFIX_ENV: &str = "DYN_METRICS_PREFIX";
/// Value for the `status` label in the request counter for successful requests
pub const REQUEST_STATUS_SUCCESS: &str = "success";
/// Value for the `status` label in the request counter if the request failed
pub const REQUEST_STATUS_ERROR: &str = "error";
/// Partial value for the `type` label in the request counter for streaming requests
pub const REQUEST_TYPE_STREAM: &str = "stream";
/// Partial value for the `type` label in the request counter for unary requests
pub const REQUEST_TYPE_UNARY: &str = "unary";
fn sanitize_prometheus_prefix(raw: &str) -> String {
// Prometheus metric name pattern: [a-zA-Z_:][a-zA-Z0-9_:]*
let mut s: String = raw
.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || c == '_' || c == ':' {
c
} else {
'_'
}
})
.collect();
if s.is_empty() {
return FRONTEND_METRIC_PREFIX.to_string();
}
let first = s.as_bytes()[0];
let valid_first = first.is_ascii_alphabetic() || first == b'_' || first == b':';
if !valid_first {
s.insert(0, '_');
}
s
}
pub struct Metrics {
request_counter: IntCounterVec,
inflight_gauge: IntGaugeVec,
......@@ -67,8 +27,8 @@ pub struct Metrics {
/// RAII object for inflight gauge and request counters
/// If this object is dropped without calling `mark_ok`, then the request will increment
/// the request counter with the `status` label with [`REQUEST_STATUS_ERROR`]; otherwise, it will increment
/// the counter with `status` label [`REQUEST_STATUS_SUCCESS`]
/// the request counter with the `status` label with [`frontend_service::status::ERROR`]; otherwise, it will increment
/// the counter with `status` label [`frontend_service::status::SUCCESS`]
pub struct InflightGuard {
metrics: Arc<Metrics>,
model: String,
......@@ -130,7 +90,7 @@ impl Default for Metrics {
}
impl Metrics {
/// Create Metrics with the standard prefix defined by [`FRONTEND_METRIC_PREFIX`] or specify custom prefix via the following environment variable:
/// Create Metrics with the standard prefix defined by [`name_prefix::FRONTEND`] or specify custom prefix via the following environment variable:
/// - `DYN_METRICS_PREFIX`: Override the default metrics prefix
///
/// The following metrics will be created with the configured prefix:
......@@ -142,14 +102,14 @@ impl Metrics {
/// - `{prefix}_time_to_first_token_seconds` - HistogramVec for time to first token in seconds
/// - `{prefix}_inter_token_latency_seconds` - HistogramVec for inter-token latency in seconds
pub fn new() -> Self {
let raw_prefix = std::env::var(METRICS_PREFIX_ENV)
.unwrap_or_else(|_| FRONTEND_METRIC_PREFIX.to_string());
let prefix = sanitize_prometheus_prefix(&raw_prefix);
let raw_prefix = std::env::var(frontend_service::METRICS_PREFIX_ENV)
.unwrap_or_else(|_| name_prefix::FRONTEND.to_string());
let prefix = sanitize_frontend_prometheus_prefix(&raw_prefix);
if prefix != raw_prefix {
tracing::warn!(
raw=%raw_prefix,
sanitized=%prefix,
env=%METRICS_PREFIX_ENV,
env=%frontend_service::METRICS_PREFIX_ENV,
"Sanitized HTTP metrics prefix"
);
}
......@@ -157,7 +117,7 @@ impl Metrics {
let request_counter = IntCounterVec::new(
Opts::new(
frontend_metric_name("requests_total"),
frontend_metric_name(frontend_service::REQUESTS_TOTAL),
"Total number of LLM requests processed",
),
&["model", "endpoint", "request_type", "status"],
......@@ -166,7 +126,7 @@ impl Metrics {
let inflight_gauge = IntGaugeVec::new(
Opts::new(
frontend_metric_name("inflight_requests"),
frontend_metric_name(frontend_service::INFLIGHT_REQUESTS),
"Number of inflight requests",
),
&["model"],
......@@ -177,7 +137,7 @@ impl Metrics {
let request_duration = HistogramVec::new(
HistogramOpts::new(
frontend_metric_name("request_duration_seconds"),
frontend_metric_name(frontend_service::REQUEST_DURATION_SECONDS),
"Duration of LLM requests",
)
.buckets(buckets),
......@@ -187,7 +147,7 @@ impl Metrics {
let input_sequence_length = HistogramVec::new(
HistogramOpts::new(
frontend_metric_name("input_sequence_tokens"),
frontend_metric_name(frontend_service::INPUT_SEQUENCE_TOKENS),
"Input sequence length in tokens",
)
.buckets(vec![
......@@ -200,7 +160,7 @@ impl Metrics {
let output_sequence_length = HistogramVec::new(
HistogramOpts::new(
frontend_metric_name("output_sequence_tokens"),
frontend_metric_name(frontend_service::OUTPUT_SEQUENCE_TOKENS),
"Output sequence length in tokens",
)
.buckets(vec![
......@@ -212,7 +172,7 @@ impl Metrics {
let time_to_first_token = HistogramVec::new(
HistogramOpts::new(
frontend_metric_name("time_to_first_token_seconds"),
frontend_metric_name(frontend_service::TIME_TO_FIRST_TOKEN_SECONDS),
"Time to first token in seconds",
)
.buckets(vec![
......@@ -225,7 +185,7 @@ impl Metrics {
let inter_token_latency = HistogramVec::new(
HistogramOpts::new(
frontend_metric_name("inter_token_latency_seconds"),
frontend_metric_name(frontend_service::INTER_TOKEN_LATENCY_SECONDS),
"Inter-token latency in seconds",
)
.buckets(vec![
......@@ -422,8 +382,8 @@ impl Endpoint {
impl RequestType {
pub fn as_str(&self) -> &'static str {
match self {
RequestType::Unary => REQUEST_TYPE_UNARY,
RequestType::Stream => REQUEST_TYPE_STREAM,
RequestType::Unary => frontend_service::request_type::UNARY,
RequestType::Stream => frontend_service::request_type::STREAM,
}
}
}
......@@ -431,8 +391,8 @@ impl RequestType {
impl Status {
pub fn as_str(&self) -> &'static str {
match self {
Status::Success => REQUEST_STATUS_SUCCESS,
Status::Error => REQUEST_STATUS_ERROR,
Status::Success => frontend_service::status::SUCCESS,
Status::Error => frontend_service::status::ERROR,
}
}
}
......
......@@ -11,7 +11,7 @@ use dynamo_llm::http::{
service::{
Metrics,
error::HttpError,
metrics::{Endpoint, FRONTEND_METRIC_PREFIX, RequestType, Status},
metrics::{Endpoint, RequestType, Status},
service_v2::HttpService,
},
};
......@@ -24,6 +24,7 @@ use dynamo_llm::protocols::{
completions::{NvCreateCompletionRequest, NvCreateCompletionResponse},
},
};
use dynamo_runtime::metrics::prometheus_names::{frontend_service, name_prefix};
use dynamo_runtime::{
CancellationToken,
engine::AsyncEngineContext,
......@@ -350,7 +351,14 @@ async fn test_http_service() {
let families = registry.gather();
let histogram_metric_family = families
.into_iter()
.find(|m| m.get_name() == format!("{}_request_duration_seconds", FRONTEND_METRIC_PREFIX))
.find(|m| {
m.get_name()
== format!(
"{}_{}",
name_prefix::FRONTEND,
frontend_service::REQUEST_DURATION_SECONDS
)
})
.expect("Histogram metric not found");
assert_eq!(
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use dynamo_llm::http::service::metrics::{self, Endpoint};
use dynamo_llm::http::service::metrics::Endpoint;
use dynamo_llm::http::service::service_v2::HttpService;
use dynamo_runtime::CancellationToken;
use serial_test::serial;
use std::{env, time::Duration};
use dynamo_runtime::metrics::prometheus_names::frontend_service::METRICS_PREFIX_ENV;
use std::time::Duration;
#[path = "common/ports.rs"]
mod ports;
use ports::get_random_port;
#[tokio::test]
#[serial]
async fn metrics_prefix_default_then_env_override() {
// Case 1: default prefix
unsafe { env::remove_var(metrics::METRICS_PREFIX_ENV) };
let p1 = get_random_port().await;
let svc1 = HttpService::builder().port(p1).build().unwrap();
let token1 = CancellationToken::new();
let h1 = svc1.spawn(token1.clone()).await;
wait_for_metrics_ready(p1).await;
async fn test_metrics_prefix_default() {
// Test default prefix when no env var is set
temp_env::async_with_vars([(METRICS_PREFIX_ENV, None::<&str>)], async {
let port = get_random_port().await;
let service = HttpService::builder().port(port).build().unwrap();
let token = CancellationToken::new();
let handle = service.spawn(token.clone()).await;
wait_for_metrics_ready(port).await;
// Populate labeled metrics
let s1 = svc1.state_clone();
let state = service.state_clone();
{
let _g = s1.metrics_clone().create_inflight_guard(
let _guard = state.metrics_clone().create_inflight_guard(
"test-model",
Endpoint::ChatCompletions,
false,
);
}
let body1 = reqwest::get(format!("http://localhost:{}/metrics", p1))
let body = reqwest::get(format!("http://localhost:{}/metrics", port))
.await
.unwrap()
.text()
.await
.unwrap();
assert!(body1.contains("dynamo_frontend_requests_total"));
token1.cancel();
let _ = h1.await; // ensure port is released
assert!(body.contains("dynamo_frontend_requests_total"));
token.cancel();
let _ = handle.await;
})
.await;
}
// Case 2: env override to prefix
unsafe { env::set_var(metrics::METRICS_PREFIX_ENV, "custom_prefix") };
let p2 = get_random_port().await;
let svc2 = HttpService::builder().port(p2).build().unwrap();
let token2 = CancellationToken::new();
let h2 = svc2.spawn(token2.clone()).await;
wait_for_metrics_ready(p2).await;
#[tokio::test]
async fn test_metrics_prefix_custom() {
// Test custom prefix override via environment variable
temp_env::async_with_vars([(METRICS_PREFIX_ENV, Some("custom_prefix"))], async {
let port = get_random_port().await;
let service = HttpService::builder().port(port).build().unwrap();
let token = CancellationToken::new();
let handle = service.spawn(token.clone()).await;
wait_for_metrics_ready(port).await;
// Populate labeled metrics
let s2 = svc2.state_clone();
let state = service.state_clone();
{
let _g =
s2.metrics_clone()
.create_inflight_guard("test-model", Endpoint::ChatCompletions, true);
let _guard = state.metrics_clone().create_inflight_guard(
"test-model",
Endpoint::ChatCompletions,
true,
);
}
// Single fetch and assert
let body2 = reqwest::get(format!("http://localhost:{}/metrics", p2))
let body = reqwest::get(format!("http://localhost:{}/metrics", port))
.await
.unwrap()
.text()
.await
.unwrap();
assert!(body2.contains("custom_prefix_requests_total"));
assert!(!body2.contains("dynamo_frontend_requests_total"));
token2.cancel();
let _ = h2.await;
assert!(body.contains("custom_prefix_requests_total"));
assert!(!body.contains("dynamo_frontend_requests_total"));
// Case 3: invalid env prefix is sanitized
unsafe { env::set_var(metrics::METRICS_PREFIX_ENV, "nv-llm/http service") };
let p3 = get_random_port().await;
let svc3 = HttpService::builder().port(p3).build().unwrap();
let token3 = CancellationToken::new();
let h3 = svc3.spawn(token3.clone()).await;
wait_for_metrics_ready(p3).await;
token.cancel();
let _ = handle.await;
})
.await;
}
#[tokio::test]
async fn test_metrics_prefix_sanitized() {
// Test that invalid prefix characters are sanitized
temp_env::async_with_vars([(METRICS_PREFIX_ENV, Some("nv-llm/http service"))], async {
let port = get_random_port().await;
let service = HttpService::builder().port(port).build().unwrap();
let token = CancellationToken::new();
let handle = service.spawn(token.clone()).await;
wait_for_metrics_ready(port).await;
let s3 = svc3.state_clone();
let state = service.state_clone();
{
let _g =
s3.metrics_clone()
.create_inflight_guard("test-model", Endpoint::ChatCompletions, true);
let _guard = state.metrics_clone().create_inflight_guard(
"test-model",
Endpoint::ChatCompletions,
true,
);
}
let body3 = reqwest::get(format!("http://localhost:{}/metrics", p3))
let body = reqwest::get(format!("http://localhost:{}/metrics", port))
.await
.unwrap()
.text()
.await
.unwrap();
assert!(body3.contains("nv_llm_http_service_requests_total"));
assert!(!body3.contains("dynamo_frontend_requests_total"));
token3.cancel();
let _ = h3.await;
assert!(body.contains("nv_llm_http_service_requests_total"));
assert!(!body.contains("dynamo_frontend_requests_total"));
// Cleanup env to avoid leaking state
unsafe { env::remove_var(metrics::METRICS_PREFIX_ENV) };
token.cancel();
let _ = handle.await;
})
.await;
}
// Poll /metrics until ready or timeout
......
......@@ -60,6 +60,8 @@ pub use futures::stream;
pub use tokio_util::sync::CancellationToken;
pub use worker::Worker;
use crate::metrics::prometheus_names::distributed_runtime;
use component::{Endpoint, InstanceSource};
use config::HealthStatus;
......@@ -157,7 +159,7 @@ impl SystemHealth {
registry: &T,
) -> anyhow::Result<()> {
let gauge = registry.create_gauge(
"uptime_seconds",
distributed_runtime::UPTIME_SECONDS,
"Total uptime of the DistributedRuntime in seconds",
&[],
)?;
......
......@@ -33,8 +33,8 @@ use std::collections::HashMap;
// Import commonly used items to avoid verbose prefixes
use prometheus_names::{
COMPONENT_NATS_METRICS, DRT_NATS_METRICS, build_metric_name, labels, name_prefix, nats_client,
nats_service, work_handler,
COMPONENT_NATS_METRICS, DRT_NATS_METRICS, build_component_metric_name, labels, name_prefix,
nats_client, nats_service, sanitize_prometheus_label, sanitize_prometheus_name, work_handler,
};
// Pipeline imports for endpoint creation
......@@ -53,35 +53,6 @@ pub const USE_AUTO_LABELS: bool = true;
// Prometheus imports
use prometheus::Encoder;
/// Lints a metric name component by stripping off invalid characters and validating Prometheus naming pattern
/// Prometheus doesn't provide a built-in function to validate metric names, but the specification requires
/// names to follow the pattern [a-zA-Z_:][a-zA-Z0-9_:]*. This function implements that validation.
/// Returns error if sanitized name doesn't follow the required pattern.
fn lint_prometheus_name(name: &str) -> anyhow::Result<String> {
if name.is_empty() {
return Ok("".to_string());
}
static INVALID_CHARS_PATTERN: Lazy<Regex> =
Lazy::new(|| Regex::new(r"[^a-zA-Z0-9_:]").unwrap());
static PROMETHEUS_NAME_PATTERN: Lazy<Regex> =
Lazy::new(|| Regex::new(r"^[a-zA-Z_:][a-zA-Z0-9_:]*$").unwrap());
// Remove all invalid characters (everything except alphanumeric, colons, and underscores)
let sanitized = INVALID_CHARS_PATTERN.replace_all(name, "").to_string();
// Check if the sanitized name follows Prometheus naming pattern
if !sanitized.is_empty() && !PROMETHEUS_NAME_PATTERN.is_match(&sanitized) {
return Err(anyhow::anyhow!(
"Sanitized name '{}' does not follow Prometheus naming pattern [a-zA-Z_:][a-zA-Z0-9_:]*",
sanitized
));
}
Ok(sanitized)
}
/// Validate that a label slice has no duplicate keys.
/// Returns Ok(()) when all keys are unique; otherwise returns an error naming the duplicate key.
fn validate_no_duplicate_label_keys(labels: &[(&str, &str)]) -> anyhow::Result<()> {
......@@ -237,7 +208,7 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
// Build hierarchy: parent_hierarchy + [basename]
let hierarchy = [parent_hierarchy.clone(), vec![basename.clone()]].concat();
let metric_name = build_metric_name(metric_name);
let metric_name = build_component_metric_name(metric_name);
// Build updated_labels: auto-labels first, then `labels` + stored labels
let mut updated_labels: Vec<(String, String)> = Vec::new();
......@@ -257,7 +228,7 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
if hierarchy.len() > 1 {
let namespace = &hierarchy[1];
if !namespace.is_empty() {
let valid_namespace = lint_prometheus_name(namespace)?;
let valid_namespace = sanitize_prometheus_label(namespace)?;
if !valid_namespace.is_empty() {
updated_labels.push((labels::NAMESPACE.to_string(), valid_namespace));
}
......@@ -266,7 +237,7 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
if hierarchy.len() > 2 {
let component = &hierarchy[2];
if !component.is_empty() {
let valid_component = lint_prometheus_name(component)?;
let valid_component = sanitize_prometheus_label(component)?;
if !valid_component.is_empty() {
updated_labels.push((labels::COMPONENT.to_string(), valid_component));
}
......@@ -275,7 +246,7 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
if hierarchy.len() > 3 {
let endpoint = &hierarchy[3];
if !endpoint.is_empty() {
let valid_endpoint = lint_prometheus_name(endpoint)?;
let valid_endpoint = sanitize_prometheus_label(endpoint)?;
if !valid_endpoint.is_empty() {
updated_labels.push((labels::ENDPOINT.to_string(), valid_endpoint));
}
......@@ -595,11 +566,11 @@ mod test_helpers {
pub fn remove_nats_lines(input: &str) -> Vec<String> {
filter_prometheus_lines(input, |line| {
!line.contains(&format!(
"{}{}",
"{}_{}",
name_prefix::COMPONENT,
nats_client::PREFIX
)) && !line.contains(&format!(
"{}{}",
"{}_{}",
name_prefix::COMPONENT,
nats_service::PREFIX
)) && !line.trim().is_empty()
......@@ -610,11 +581,11 @@ mod test_helpers {
pub fn extract_nats_lines(input: &str) -> Vec<String> {
filter_prometheus_lines(input, |line| {
line.contains(&format!(
"{}{}",
"{}_{}",
name_prefix::COMPONENT,
nats_client::PREFIX
)) || line.contains(&format!(
"{}{}",
"{}_{}",
name_prefix::COMPONENT,
nats_service::PREFIX
))
......@@ -625,7 +596,7 @@ mod test_helpers {
/// Returns only the actual metric lines with values.
pub fn extract_metrics(input: &str) -> Vec<String> {
filter_prometheus_lines(input, |line| {
line.starts_with(name_prefix::COMPONENT)
line.starts_with(&format!("{}_", name_prefix::COMPONENT))
&& !line.starts_with("#")
&& !line.trim().is_empty()
})
......@@ -684,82 +655,15 @@ mod test_metricsregistry_units {
use super::*;
#[test]
fn test_build_metric_name_with_prefix() {
// Test that build_metric_name correctly prepends the dynamo_component prefix
let result = build_metric_name("requests");
fn test_build_component_metric_name_with_prefix() {
// Test that build_component_metric_name correctly prepends the dynamo_component prefix
let result = build_component_metric_name("requests");
assert_eq!(result, "dynamo_component_requests");
let result = build_metric_name("counter");
let result = build_component_metric_name("counter");
assert_eq!(result, "dynamo_component_counter");
}
#[test]
fn test_lint_prometheus_name() {
// Test that valid components remain unchanged
assert_eq!(
lint_prometheus_name("testnamespace").unwrap(),
"testnamespace"
);
assert_eq!(
lint_prometheus_name("test_namespace").unwrap(),
"test_namespace"
);
assert_eq!(lint_prometheus_name("test123").unwrap(), "test123");
assert_eq!(
lint_prometheus_name("test:namespace").unwrap(),
"test:namespace"
);
assert_eq!(
lint_prometheus_name("_testnamespace").unwrap(),
"_testnamespace"
);
assert_eq!(
lint_prometheus_name("testnamespace_123").unwrap(),
"testnamespace_123"
);
// Test that invalid characters are stripped
assert_eq!(lint_prometheus_name("").unwrap(), ""); // Empty
assert_eq!(
lint_prometheus_name("test namespace").unwrap(),
"testnamespace"
); // Space removed
assert_eq!(
lint_prometheus_name("test.namespace").unwrap(),
"testnamespace"
); // Dot removed
assert_eq!(
lint_prometheus_name("test@namespace").unwrap(),
"testnamespace"
); // @ removed
assert_eq!(
lint_prometheus_name("test#namespace").unwrap(),
"testnamespace"
); // # removed
assert_eq!(
lint_prometheus_name("test$namespace").unwrap(),
"testnamespace"
); // $ removed
assert_eq!(
lint_prometheus_name("test!@#$%^&*()namespace").unwrap(),
"testnamespace"
); // Multiple special chars removed
assert_eq!(
lint_prometheus_name("testnamespace_123!").unwrap(),
"testnamespace_123"
); // Trailing special char removed
// Test that hyphens are stripped (not allowed in Prometheus names)
assert_eq!(
lint_prometheus_name("test-namespace").unwrap(),
"testnamespace"
); // Hyphen removed
assert_eq!(
lint_prometheus_name("test-namespace-123").unwrap(),
"testnamespace123"
); // Multiple hyphens removed
}
#[test]
fn test_parse_prometheus_metric() {
use super::test_helpers::parse_prometheus_metric;
......@@ -1347,7 +1251,7 @@ mod test_metricsregistry_nats {
let expect_drt_nats_metrics_sorted = {
let mut temp = DRT_NATS_METRICS
.iter()
.map(|metric| build_metric_name(metric))
.map(|metric| build_component_metric_name(metric))
.collect::<Vec<_>>();
temp.sort();
temp
......@@ -1417,7 +1321,7 @@ mod test_metricsregistry_nats {
let expect_component_nats_metrics_sorted = {
let mut temp = COMPONENT_NATS_METRICS
.iter()
.map(|metric| build_metric_name(metric))
.map(|metric| build_component_metric_name(metric))
.collect::<Vec<_>>();
temp.sort();
temp
......@@ -1515,31 +1419,63 @@ mod test_metricsregistry_nats {
let initial_expected_metric_values = [
// DRT NATS metrics (ordered to match DRT_NATS_METRICS)
(build_metric_name(nats_client::CONNECTION_STATE), 1.0, 1.0), // Should be connected
(build_metric_name(nats_client::CONNECTS), 1.0, 1.0), // Should have 1 connection
(
build_metric_name(nats_client::IN_TOTAL_BYTES),
build_component_metric_name(nats_client::CONNECTION_STATE),
1.0,
1.0,
), // Should be connected
(build_component_metric_name(nats_client::CONNECTS), 1.0, 1.0), // Should have 1 connection
(
build_component_metric_name(nats_client::IN_TOTAL_BYTES),
800.0,
4000.0,
), // Wide range around observed value of 1888
(build_metric_name(nats_client::IN_MESSAGES), 0.0, 5.0), // Wide range around 2
(
build_metric_name(nats_client::OUT_OVERHEAD_BYTES),
build_component_metric_name(nats_client::IN_MESSAGES),
0.0,
5.0,
), // Wide range around 2
(
build_component_metric_name(nats_client::OUT_OVERHEAD_BYTES),
1500.0,
5000.0,
), // Wide range around observed value of 2752
(build_metric_name(nats_client::OUT_MESSAGES), 0.0, 5.0), // Wide range around 2
(
build_component_metric_name(nats_client::OUT_MESSAGES),
0.0,
5.0,
), // Wide range around 2
// Component NATS metrics (ordered to match COMPONENT_NATS_METRICS)
(build_metric_name(nats_service::AVG_PROCESSING_MS), 0.0, 0.0), // No processing yet
(build_metric_name(nats_service::TOTAL_ERRORS), 0.0, 0.0), // No errors yet
(build_metric_name(nats_service::TOTAL_REQUESTS), 0.0, 0.0), // No requests yet
(
build_metric_name(nats_service::TOTAL_PROCESSING_MS),
build_component_metric_name(nats_service::AVG_PROCESSING_MS),
0.0,
0.0,
), // No processing yet
(build_metric_name(nats_service::ACTIVE_SERVICES), 0.0, 2.0), // Service may not be fully active yet
(build_metric_name(nats_service::ACTIVE_ENDPOINTS), 0.0, 2.0), // Endpoint may not be fully active yet
(
build_component_metric_name(nats_service::TOTAL_ERRORS),
0.0,
0.0,
), // No errors yet
(
build_component_metric_name(nats_service::TOTAL_REQUESTS),
0.0,
0.0,
), // No requests yet
(
build_component_metric_name(nats_service::TOTAL_PROCESSING_MS),
0.0,
0.0,
), // No processing yet
(
build_component_metric_name(nats_service::ACTIVE_SERVICES),
0.0,
2.0,
), // Service may not be fully active yet
(
build_component_metric_name(nats_service::ACTIVE_ENDPOINTS),
0.0,
2.0,
), // Endpoint may not be fully active yet
];
for (metric_name, min_value, max_value) in &initial_expected_metric_values {
......@@ -1612,49 +1548,89 @@ mod test_metricsregistry_nats {
let post_expected_metric_values = [
// DRT NATS metrics
(build_metric_name(nats_client::CONNECTION_STATE), 1.0, 1.0), // Connected
(build_metric_name(nats_client::CONNECTS), 1.0, 1.0), // 1 connection
(
build_metric_name(nats_client::IN_TOTAL_BYTES),
build_component_metric_name(nats_client::CONNECTION_STATE),
1.0,
1.0,
), // Connected
(build_component_metric_name(nats_client::CONNECTS), 1.0, 1.0), // 1 connection
(
build_component_metric_name(nats_client::IN_TOTAL_BYTES),
20000.0,
32000.0,
), // Wide range around 26117
(build_metric_name(nats_client::IN_MESSAGES), 8.0, 20.0), // Wide range around 16
(
build_metric_name(nats_client::OUT_OVERHEAD_BYTES),
build_component_metric_name(nats_client::IN_MESSAGES),
8.0,
20.0,
), // Wide range around 16
(
build_component_metric_name(nats_client::OUT_OVERHEAD_BYTES),
2500.0,
8000.0,
), // Wide range around 5524
(build_metric_name(nats_client::OUT_MESSAGES), 8.0, 20.0), // Wide range around 16
(
build_component_metric_name(nats_client::OUT_MESSAGES),
8.0,
20.0,
), // Wide range around 16
// Component NATS metrics
(build_metric_name(nats_service::AVG_PROCESSING_MS), 0.0, 1.0), // Low processing time
(build_metric_name(nats_service::TOTAL_ERRORS), 0.0, 0.0), // No errors
(build_metric_name(nats_service::TOTAL_REQUESTS), 0.0, 0.0), // No work handler requests
(
build_metric_name(nats_service::TOTAL_PROCESSING_MS),
build_component_metric_name(nats_service::AVG_PROCESSING_MS),
0.0,
1.0,
), // Low processing time
(
build_component_metric_name(nats_service::TOTAL_ERRORS),
0.0,
0.0,
), // No errors
(
build_component_metric_name(nats_service::TOTAL_REQUESTS),
0.0,
0.0,
), // No work handler requests
(
build_component_metric_name(nats_service::TOTAL_PROCESSING_MS),
0.0,
5.0,
), // Low total processing time
(build_metric_name(nats_service::ACTIVE_SERVICES), 0.0, 2.0), // Service may not be fully active
(build_metric_name(nats_service::ACTIVE_ENDPOINTS), 0.0, 2.0), // Endpoint may not be fully active
(
build_component_metric_name(nats_service::ACTIVE_SERVICES),
0.0,
2.0,
), // Service may not be fully active
(
build_component_metric_name(nats_service::ACTIVE_ENDPOINTS),
0.0,
2.0,
), // Endpoint may not be fully active
// Work handler metrics
(build_metric_name(work_handler::REQUESTS_TOTAL), 10.0, 10.0), // 10 messages
(
build_metric_name(work_handler::REQUEST_BYTES_TOTAL),
build_component_metric_name(work_handler::REQUESTS_TOTAL),
10.0,
10.0,
), // 10 messages
(
build_component_metric_name(work_handler::REQUEST_BYTES_TOTAL),
21000.0,
26000.0,
), // ~75-125% of 23520
(
build_metric_name(work_handler::RESPONSE_BYTES_TOTAL),
build_component_metric_name(work_handler::RESPONSE_BYTES_TOTAL),
18000.0,
23000.0,
), // ~75-125% of 20660
(build_metric_name(work_handler::INFLIGHT_REQUESTS), 0.0, 1.0), // 0 or very low
(
build_component_metric_name(work_handler::INFLIGHT_REQUESTS),
0.0,
1.0,
), // 0 or very low
// Histograms have _{count,sum} suffixes
(
format!(
"{}_count",
build_metric_name(work_handler::REQUEST_DURATION_SECONDS)
build_component_metric_name(work_handler::REQUEST_DURATION_SECONDS)
),
10.0,
10.0,
......@@ -1662,7 +1638,7 @@ mod test_metricsregistry_nats {
(
format!(
"{}_sum",
build_metric_name(work_handler::REQUEST_DURATION_SECONDS)
build_component_metric_name(work_handler::REQUEST_DURATION_SECONDS)
),
0.0001,
1.0,
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Prometheus metric name constants
//! Prometheus metric name constants and sanitization utilities
//!
//! This module provides centralized Prometheus metric name constants for various components
//! to ensure consistency and avoid duplication across the codebase.
//! This module provides centralized Prometheus metric name constants and sanitization functions
//! for various components to ensure consistency and avoid duplication across the codebase.
//!
//! ## Key Differences: Prometheus Metric Names vs Prometheus Label Names
//!
//! **Metric names**: Allow colons and `__` anywhere. **Label names**: No colons, no `__` prefix.
//! Label names starting with `__` are reserved for Prometheus internal use.
/// Builds a full metric name by prepending the component prefix
pub fn build_metric_name(metric_name: &str) -> String {
format!("{}{}", name_prefix::COMPONENT, metric_name)
}
use once_cell::sync::Lazy;
use regex::Regex;
/// Metric name prefixes used across the metrics system
pub mod name_prefix {
/// Prefix for all Prometheus metric names.
pub const COMPONENT: &str = "dynamo_component_";
pub const COMPONENT: &str = "dynamo_component";
// TODO(keivenc): uncomment below for the frontend
// pub const FRONTEND: &str = "dynamo_frontend_";
/// Prefix for frontend service metrics
pub const FRONTEND: &str = "dynamo_frontend";
}
/// Automatically inserted Prometheus label names used across the metrics system
......@@ -32,6 +35,98 @@ pub mod labels {
pub const ENDPOINT: &str = "dynamo_endpoint";
}
/// Frontend service metrics (LLM HTTP service)
pub mod frontend_service {
// TODO: Move DYN_METRICS_PREFIX and other environment variable names to environment_names.rs
// for centralized environment variable constant management across the codebase
/// Environment variable that overrides the default metric prefix
pub const METRICS_PREFIX_ENV: &str = "DYN_METRICS_PREFIX";
/// Total number of LLM requests processed
pub const REQUESTS_TOTAL: &str = "requests_total";
/// Number of inflight requests
pub const INFLIGHT_REQUESTS: &str = "inflight_requests";
/// Duration of LLM requests
pub const REQUEST_DURATION_SECONDS: &str = "request_duration_seconds";
/// Input sequence length in tokens
pub const INPUT_SEQUENCE_TOKENS: &str = "input_sequence_tokens";
/// Output sequence length in tokens
pub const OUTPUT_SEQUENCE_TOKENS: &str = "output_sequence_tokens";
/// Time to first token in seconds
pub const TIME_TO_FIRST_TOKEN_SECONDS: &str = "time_to_first_token_seconds";
/// Inter-token latency in seconds
pub const INTER_TOKEN_LATENCY_SECONDS: &str = "inter_token_latency_seconds";
/// Status label values
pub mod status {
/// Value for successful requests
pub const SUCCESS: &str = "success";
/// Value for failed requests
pub const ERROR: &str = "error";
}
/// Request type label values
pub mod request_type {
/// Value for streaming requests
pub const STREAM: &str = "stream";
/// Value for unary requests
pub const UNARY: &str = "unary";
}
}
/// Work handler Prometheus metric names
pub mod work_handler {
/// Total number of requests processed by work handler
pub const REQUESTS_TOTAL: &str = "requests_total";
/// Total number of bytes received in requests by work handler
pub const REQUEST_BYTES_TOTAL: &str = "request_bytes_total";
/// Total number of bytes sent in responses by work handler
pub const RESPONSE_BYTES_TOTAL: &str = "response_bytes_total";
/// Number of requests currently being processed by work handler
pub const INFLIGHT_REQUESTS: &str = "inflight_requests";
/// Time spent processing requests by work handler (histogram)
pub const REQUEST_DURATION_SECONDS: &str = "request_duration_seconds";
/// Total number of errors in work handler processing
pub const ERRORS_TOTAL: &str = "errors_total";
/// Label name for error type classification
pub const ERROR_TYPE_LABEL: &str = "error_type";
/// Error type values for work handler metrics
pub mod error_types {
/// Deserialization error
pub const DESERIALIZATION: &str = "deserialization";
/// Invalid message format error
pub const INVALID_MESSAGE: &str = "invalid_message";
/// Response stream creation error
pub const RESPONSE_STREAM: &str = "response_stream";
/// Generation error
pub const GENERATE: &str = "generate";
/// Response publishing error
pub const PUBLISH_RESPONSE: &str = "publish_response";
/// Final message publishing error
pub const PUBLISH_FINAL: &str = "publish_final";
}
}
/// NATS client metrics. DistributedRuntime contains a NATS client shared by all children)
pub mod nats_client {
/// Macro to generate NATS client metric names with the prefix
......@@ -115,22 +210,31 @@ pub const COMPONENT_NATS_METRICS: &[&str] = &[
nats_service::ACTIVE_ENDPOINTS, // derived from ServiceInfo.endpoints
];
/// Work handler Prometheus metric names
pub mod work_handler {
/// Total number of requests processed by work handler
pub const REQUESTS_TOTAL: &str = "requests_total";
/// Task tracker Prometheus metric name suffixes
pub mod task_tracker {
/// Total number of tasks issued/submitted
pub const TASKS_ISSUED_TOTAL: &str = "tasks_issued_total";
/// Total number of bytes received in requests by work handler
pub const REQUEST_BYTES_TOTAL: &str = "request_bytes_total";
/// Total number of tasks started
pub const TASKS_STARTED_TOTAL: &str = "tasks_started_total";
/// Total number of bytes sent in responses by work handler
pub const RESPONSE_BYTES_TOTAL: &str = "response_bytes_total";
/// Total number of successfully completed tasks
pub const TASKS_SUCCESS_TOTAL: &str = "tasks_success_total";
/// Number of requests currently being processed by work handler
pub const INFLIGHT_REQUESTS: &str = "inflight_requests";
/// Total number of cancelled tasks
pub const TASKS_CANCELLED_TOTAL: &str = "tasks_cancelled_total";
/// Time spent processing requests by work handler (histogram)
pub const REQUEST_DURATION_SECONDS: &str = "request_duration_seconds";
/// Total number of failed tasks
pub const TASKS_FAILED_TOTAL: &str = "tasks_failed_total";
/// Total number of rejected tasks
pub const TASKS_REJECTED_TOTAL: &str = "tasks_rejected_total";
}
/// DistributedRuntime core metrics
pub mod distributed_runtime {
/// Total uptime of the DistributedRuntime in seconds
pub const UPTIME_SECONDS: &str = "uptime_seconds";
}
/// KVBM connector
......@@ -141,3 +245,334 @@ pub mod kvbm_connector {
/// KVBM connector worker
pub const KVBM_CONNECTOR_WORKER: &str = "kvbm_connector_worker";
}
// Shared regex patterns for Prometheus sanitization
static METRIC_INVALID_CHARS_PATTERN: Lazy<Regex> =
Lazy::new(|| Regex::new(r"[^a-zA-Z0-9_:]").unwrap());
static LABEL_INVALID_CHARS_PATTERN: Lazy<Regex> =
Lazy::new(|| Regex::new(r"[^a-zA-Z0-9_]").unwrap());
static INVALID_FIRST_CHAR_PATTERN: Lazy<Regex> = Lazy::new(|| Regex::new(r"^[^a-zA-Z_]").unwrap());
/// Sanitizes a Prometheus metric name by converting invalid characters to underscores
/// and ensuring the first character is valid. Uses regex for clear validation.
/// Returns an error if the input cannot be sanitized into a valid name.
///
/// **Rules**: Pattern `[a-zA-Z_:][a-zA-Z0-9_:]*`. Allows colons and `__` anywhere.
pub fn sanitize_prometheus_name(raw: &str) -> anyhow::Result<String> {
if raw.is_empty() {
return Err(anyhow::anyhow!(
"Cannot sanitize empty string into valid Prometheus name"
));
}
// Replace all invalid characters with underscores
let mut sanitized = METRIC_INVALID_CHARS_PATTERN
.replace_all(raw, "_")
.to_string();
// Ensure first character is valid (letter, underscore, or colon)
if INVALID_FIRST_CHAR_PATTERN.is_match(&sanitized) {
sanitized = format!("_{}", sanitized);
}
// Check if the result is all underscores (invalid input)
if sanitized.chars().all(|c| c == '_') {
return Err(anyhow::anyhow!(
"Input '{}' contains only invalid characters and cannot be sanitized into a valid Prometheus name",
raw
));
}
Ok(sanitized)
}
/// Sanitizes a Prometheus label name by converting invalid characters to underscores
/// and ensuring the first character is valid. Uses regex for clear validation.
/// Label names have stricter rules than metric names (no colons allowed).
/// Returns an error if the input cannot be sanitized into a valid label name.
///
/// **Rules**: Pattern `[a-zA-Z_][a-zA-Z0-9_]*`. No colons, no `__` prefix (reserved).
pub fn sanitize_prometheus_label(raw: &str) -> anyhow::Result<String> {
if raw.is_empty() {
return Err(anyhow::anyhow!(
"Cannot sanitize empty string into valid Prometheus label"
));
}
// Replace all invalid characters with underscores (no colons allowed in labels)
let mut sanitized = LABEL_INVALID_CHARS_PATTERN
.replace_all(raw, "_")
.to_string();
// Ensure first character is valid (letter or underscore only)
if INVALID_FIRST_CHAR_PATTERN.is_match(&sanitized) {
sanitized = format!("_{}", sanitized);
}
// Prevent __ prefix (reserved for Prometheus internal use) but allow __ elsewhere
if sanitized.starts_with("__") {
sanitized = sanitized
.strip_prefix("__")
.unwrap_or(&sanitized)
.to_string();
if sanitized.is_empty() || !sanitized.chars().next().unwrap().is_ascii_alphabetic() {
sanitized = format!("_{}", sanitized);
}
}
// Check if the result is all underscores (invalid input)
if sanitized.chars().all(|c| c == '_') {
return Err(anyhow::anyhow!(
"Input '{}' contains only invalid characters and cannot be sanitized into a valid Prometheus label",
raw
));
}
Ok(sanitized)
}
/// Sanitizes a Prometheus frontend metric prefix by converting invalid characters to underscores
/// and ensuring the first character is valid. Uses the general prometheus name sanitization
/// but with frontend-specific fallback behavior.
pub fn sanitize_frontend_prometheus_prefix(raw: &str) -> String {
if raw.is_empty() {
return name_prefix::FRONTEND.to_string();
}
// Reuse the general prometheus name sanitization logic, fallback to frontend prefix on error
sanitize_prometheus_name(raw).unwrap_or_else(|_| name_prefix::FRONTEND.to_string())
}
/// Builds a full component metric name by prepending the component prefix
/// Sanitizes the metric name to ensure it's valid for Prometheus
pub fn build_component_metric_name(metric_name: &str) -> String {
let sanitized_name =
sanitize_prometheus_name(metric_name).expect("metric name should be valid or sanitizable");
format!("{}_{}", name_prefix::COMPONENT, sanitized_name)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sanitize_frontend_prometheus_prefix() {
// Test that valid prefixes remain unchanged
assert_eq!(
sanitize_frontend_prometheus_prefix("dynamo_frontend"),
"dynamo_frontend"
);
assert_eq!(
sanitize_frontend_prometheus_prefix("custom_prefix"),
"custom_prefix"
);
assert_eq!(sanitize_frontend_prometheus_prefix("test123"), "test123");
// Test that invalid characters are converted to underscores
assert_eq!(
sanitize_frontend_prometheus_prefix("test prefix"),
"test_prefix"
);
assert_eq!(
sanitize_frontend_prometheus_prefix("test.prefix"),
"test_prefix"
);
assert_eq!(
sanitize_frontend_prometheus_prefix("test@prefix"),
"test_prefix"
);
assert_eq!(
sanitize_frontend_prometheus_prefix("test-prefix"),
"test_prefix"
);
// Test that invalid first characters are fixed
assert_eq!(sanitize_frontend_prometheus_prefix("123test"), "_123test");
assert_eq!(sanitize_frontend_prometheus_prefix("@test"), "_test");
// Test empty string fallback
assert_eq!(
sanitize_frontend_prometheus_prefix(""),
name_prefix::FRONTEND
);
}
#[test]
fn test_sanitize_prometheus_name() {
// Test that valid names remain unchanged
assert_eq!(
sanitize_prometheus_name("valid_name").unwrap(),
"valid_name"
);
assert_eq!(sanitize_prometheus_name("test123").unwrap(), "test123");
assert_eq!(
sanitize_prometheus_name("test_name_123").unwrap(),
"test_name_123"
);
assert_eq!(sanitize_prometheus_name("test:name").unwrap(), "test:name"); // colons allowed
// Test that invalid characters are converted to underscores
assert_eq!(sanitize_prometheus_name("test name").unwrap(), "test_name");
assert_eq!(sanitize_prometheus_name("test.name").unwrap(), "test_name");
assert_eq!(sanitize_prometheus_name("test@name").unwrap(), "test_name");
assert_eq!(sanitize_prometheus_name("test-name").unwrap(), "test_name");
assert_eq!(
sanitize_prometheus_name("test$name#123").unwrap(),
"test_name_123"
);
// Test that double underscores are ALLOWED in metric names (unlike labels)
assert_eq!(
sanitize_prometheus_name("test__name").unwrap(),
"test__name"
);
assert_eq!(
sanitize_prometheus_name("test___name").unwrap(),
"test___name"
);
assert_eq!(sanitize_prometheus_name("__test").unwrap(), "__test"); // Leading double underscore OK
// Test that invalid first characters are fixed
assert_eq!(sanitize_prometheus_name("123test").unwrap(), "_123test");
assert_eq!(sanitize_prometheus_name("@test").unwrap(), "_test"); // @ becomes _, no double underscore
assert_eq!(sanitize_prometheus_name("-test").unwrap(), "_test"); // - becomes _, no double underscore
assert_eq!(sanitize_prometheus_name(".test").unwrap(), "_test"); // . becomes _, no double underscore
// Test empty string returns error
assert!(sanitize_prometheus_name("").is_err());
// Test complex cases
assert_eq!(
sanitize_prometheus_name("123.test-name@domain").unwrap(),
"_123_test_name_domain"
);
// Test that strings with only invalid characters return error
assert!(sanitize_prometheus_name("@#$%").is_err());
assert!(sanitize_prometheus_name("!!!!").is_err());
}
#[test]
fn test_sanitize_prometheus_label() {
// Test that valid labels remain unchanged
assert_eq!(
sanitize_prometheus_label("valid_label").unwrap(),
"valid_label"
);
assert_eq!(sanitize_prometheus_label("test123").unwrap(), "test123");
assert_eq!(
sanitize_prometheus_label("test_label_123").unwrap(),
"test_label_123"
);
// Test that colons are NOT allowed in labels (stricter than names)
assert_eq!(
sanitize_prometheus_label("test:label").unwrap(),
"test_label"
);
// Test that invalid characters are converted to underscores
assert_eq!(
sanitize_prometheus_label("test label").unwrap(),
"test_label"
);
assert_eq!(
sanitize_prometheus_label("test.label").unwrap(),
"test_label"
);
assert_eq!(
sanitize_prometheus_label("test@label").unwrap(),
"test_label"
);
assert_eq!(
sanitize_prometheus_label("test-label").unwrap(),
"test_label"
);
assert_eq!(
sanitize_prometheus_label("test$label#123").unwrap(),
"test_label_123"
);
// Test that double underscores are ALLOWED in middle but NOT at start
assert_eq!(
sanitize_prometheus_label("test__label").unwrap(),
"test__label"
); // OK in middle
assert_eq!(
sanitize_prometheus_label("test___label").unwrap(),
"test___label"
); // OK in middle
assert_eq!(
sanitize_prometheus_label("test____label").unwrap(),
"test____label"
); // OK in middle
assert_eq!(sanitize_prometheus_label("__test").unwrap(), "test"); // Leading __ removed
assert!(sanitize_prometheus_label("____").is_err()); // All underscores should error
// Test that invalid first characters are fixed (no colons allowed)
assert_eq!(sanitize_prometheus_label("123test").unwrap(), "_123test");
assert_eq!(sanitize_prometheus_label("@test").unwrap(), "_test");
assert_eq!(sanitize_prometheus_label(":test").unwrap(), "_test"); // colon not allowed
assert_eq!(sanitize_prometheus_label("-test").unwrap(), "_test");
// Test empty string returns error
assert!(sanitize_prometheus_label("").is_err());
// Test complex cases
assert_eq!(
sanitize_prometheus_label("123:test-label@domain").unwrap(),
"_123_test_label_domain"
);
// Test that strings with only invalid characters return error
assert!(sanitize_prometheus_label("@#$%").is_err()); // @#$% -> ____ -> ___ -> all underscores error
assert!(sanitize_prometheus_label("!!!!").is_err()); // !!!! -> ____ -> ___ -> all underscores error
}
#[test]
fn test_build_component_metric_name() {
// Test that valid names work correctly
assert_eq!(
build_component_metric_name("test_metric"),
"dynamo_component_test_metric"
);
assert_eq!(
build_component_metric_name("requests_total"),
"dynamo_component_requests_total"
);
// Test that invalid characters are sanitized
assert_eq!(
build_component_metric_name("test metric"),
"dynamo_component_test_metric"
);
assert_eq!(
build_component_metric_name("test.metric"),
"dynamo_component_test_metric"
);
assert_eq!(
build_component_metric_name("test@metric"),
"dynamo_component_test_metric"
);
// Test that invalid first characters are fixed
assert_eq!(
build_component_metric_name("123metric"),
"dynamo_component__123metric"
);
}
#[test]
#[should_panic(expected = "metric name should be valid or sanitizable")]
fn test_build_component_metric_name_panics_on_invalid_input() {
// Test that completely invalid input panics with clear message
build_component_metric_name("@#$%");
}
#[test]
#[should_panic(expected = "metric name should be valid or sanitizable")]
fn test_build_component_metric_name_panics_on_empty_input() {
// Test that empty input panics with clear message
build_component_metric_name("");
}
}
......@@ -14,6 +14,7 @@
// limitations under the License.
use super::*;
use crate::metrics::prometheus_names::work_handler;
use crate::protocols::maybe_error::MaybeError;
use prometheus::{Histogram, IntCounter, IntCounterVec, IntGauge};
use serde::{Deserialize, Serialize};
......@@ -59,40 +60,40 @@ impl WorkHandlerMetrics {
) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let metrics_labels = metrics_labels.unwrap_or(&[]);
let request_counter = endpoint.create_intcounter(
"requests_total",
work_handler::REQUESTS_TOTAL,
"Total number of requests processed by work handler",
metrics_labels,
)?;
let request_duration = endpoint.create_histogram(
"request_duration_seconds",
work_handler::REQUEST_DURATION_SECONDS,
"Time spent processing requests by work handler",
metrics_labels,
None,
)?;
let inflight_requests = endpoint.create_intgauge(
"inflight_requests",
work_handler::INFLIGHT_REQUESTS,
"Number of requests currently being processed by work handler",
metrics_labels,
)?;
let request_bytes = endpoint.create_intcounter(
"request_bytes_total",
work_handler::REQUEST_BYTES_TOTAL,
"Total number of bytes received in requests by work handler",
metrics_labels,
)?;
let response_bytes = endpoint.create_intcounter(
"response_bytes_total",
work_handler::RESPONSE_BYTES_TOTAL,
"Total number of bytes sent in responses by work handler",
metrics_labels,
)?;
let error_counter = endpoint.create_intcountervec(
"errors_total",
work_handler::ERRORS_TOTAL,
"Total number of errors in work handler processing",
&["error_type"],
&[work_handler::ERROR_TYPE_LABEL],
metrics_labels,
)?;
......@@ -172,7 +173,7 @@ where
let json_str = String::from_utf8_lossy(&header);
if let Some(m) = self.metrics() {
m.error_counter
.with_label_values(&["deserialization"])
.with_label_values(&[work_handler::error_types::DESERIALIZATION])
.inc();
}
return Err(PipelineError::DeserializationError(format!(
......@@ -186,7 +187,7 @@ where
_ => {
if let Some(m) = self.metrics() {
m.error_counter
.with_label_values(&["invalid_message"])
.with_label_values(&[work_handler::error_types::INVALID_MESSAGE])
.inc();
}
return Err(PipelineError::Generic(String::from(
......@@ -211,7 +212,7 @@ where
.map_err(|e| {
if let Some(m) = self.metrics() {
m.error_counter
.with_label_values(&["response_stream"])
.with_label_values(&[work_handler::error_types::RESPONSE_STREAM])
.inc();
}
PipelineError::Generic(format!("Failed to create response stream: {:?}", e,))
......@@ -226,7 +227,9 @@ where
.await
.map_err(|e| {
if let Some(m) = self.metrics() {
m.error_counter.with_label_values(&["generate"]).inc();
m.error_counter
.with_label_values(&[work_handler::error_types::GENERATE])
.inc();
}
PipelineError::GenerateError(e)
});
......@@ -288,7 +291,7 @@ where
send_complete_final = false;
if let Some(m) = self.metrics() {
m.error_counter
.with_label_values(&["publish_response"])
.with_label_values(&[work_handler::error_types::PUBLISH_RESPONSE])
.inc();
}
break;
......@@ -310,7 +313,9 @@ where
context.id()
);
if let Some(m) = self.metrics() {
m.error_counter.with_label_values(&["publish_final"]).inc();
m.error_counter
.with_label_values(&[work_handler::error_types::PUBLISH_FINAL])
.inc();
}
}
}
......
......@@ -16,6 +16,7 @@
use crate::config::HealthStatus;
use crate::logging::make_request_span;
use crate::metrics::MetricsRegistry;
use crate::metrics::prometheus_names::{nats_client, nats_service};
use crate::traits::DistributedRuntimeProvider;
use axum::{Router, http::StatusCode, response::IntoResponse, routing::get};
use serde_json::json;
......@@ -307,8 +308,6 @@ mod integration_tests {
println!("Full metrics response:\n{}", response);
// Filter out NATS client metrics for comparison
use crate::metrics::prometheus_names::{nats_client, nats_service};
let filtered_response: String = response
.lines()
.filter(|line| {
......
......@@ -384,6 +384,7 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use crate::metrics::MetricsRegistry;
use crate::metrics::prometheus_names::task_tracker;
use anyhow::Result;
use async_trait::async_trait;
use derive_builder::Builder;
......@@ -1577,37 +1578,37 @@ impl PrometheusTaskMetrics {
component_name: &str,
) -> anyhow::Result<Self> {
let issued_counter = registry.create_intcounter(
&format!("{}_tasks_issued_total", component_name),
&format!("{}_{}", component_name, task_tracker::TASKS_ISSUED_TOTAL),
"Total number of tasks issued/submitted",
&[],
)?;
let started_counter = registry.create_intcounter(
&format!("{}_tasks_started_total", component_name),
&format!("{}_{}", component_name, task_tracker::TASKS_STARTED_TOTAL),
"Total number of tasks started",
&[],
)?;
let success_counter = registry.create_intcounter(
&format!("{}_tasks_success_total", component_name),
&format!("{}_{}", component_name, task_tracker::TASKS_SUCCESS_TOTAL),
"Total number of successfully completed tasks",
&[],
)?;
let cancelled_counter = registry.create_intcounter(
&format!("{}_tasks_cancelled_total", component_name),
&format!("{}_{}", component_name, task_tracker::TASKS_CANCELLED_TOTAL),
"Total number of cancelled tasks",
&[],
)?;
let failed_counter = registry.create_intcounter(
&format!("{}_tasks_failed_total", component_name),
&format!("{}_{}", component_name, task_tracker::TASKS_FAILED_TOTAL),
"Total number of failed tasks",
&[],
)?;
let rejected_counter = registry.create_intcounter(
&format!("{}_tasks_rejected_total", component_name),
&format!("{}_{}", component_name, task_tracker::TASKS_REJECTED_TOTAL),
"Total number of rejected tasks",
&[],
)?;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment