Unverified Commit cec19d4d authored by Biswa Panda's avatar Biswa Panda Committed by GitHub
Browse files

feat(perf): add request-plane, transport, and work-handler metric definitions (#6735)

parent a3f805c7
......@@ -76,6 +76,9 @@ impl PrometheusParser {
macro_prefix = Some(prefix);
}
}
// TODO: Handle nested `pub mod` (e.g. `transport::tcp`, `transport::nats`)
// by recursing into sub-modules and emitting nested Python classes.
// Currently these are silently skipped, producing empty Python classes.
_ => {}
}
}
......
......@@ -249,6 +249,25 @@ class name_prefix:
TOKIO = "dynamo_tokio"
# Prefix for standalone KV indexer metrics
KVINDEXER = "dynamo_kvindexer"
# Prefix for request-plane metrics at AddressedPushRouter
REQUEST_PLANE = "dynamo_request_plane"
# Prefix for transport-layer metrics (TCP / NATS)
TRANSPORT = "dynamo_transport"
# Prefix for work-handler transport breakdown metrics (backend side)
WORK_HANDLER = "dynamo_work_handler"
class request_plane:
"""Request plane metrics at AddressedPushRouter"""
# Time from generate() entry to send_request() (serialization + encoding)
QUEUE_SECONDS = "queue_seconds"
# Time for send_request() to complete (frontend view: network + queue + ack)
SEND_SECONDS = "send_seconds"
# Time from send_request() to first response item (transport roundtrip TTFT)
ROUNDTRIP_TTFT_SECONDS = "roundtrip_ttft_seconds"
# Currently in-flight requests (gauge)
INFLIGHT_REQUESTS = "inflight_requests"
class router:
......@@ -323,6 +342,26 @@ class tokio_perf:
ALIVE_TASKS = "alive_tasks"
class transport:
"""Transport-specific metrics (TCP / NATS)"""
# NOTE: Nested classes added manually because the codegen does not yet
# handle Rust submodules (see TODO in prometheus_parser.rs).
# Re-running gen-python-prometheus-names will overwrite this file and
# lose these classes until the codegen is updated.
class tcp:
POOL_ACTIVE = "tcp_pool_active"
POOL_IDLE = "tcp_pool_idle"
BYTES_SENT_TOTAL = "tcp_bytes_sent_total"
BYTES_RECEIVED_TOTAL = "tcp_bytes_received_total"
ERRORS_TOTAL = "tcp_errors_total"
SERVER_QUEUE_DEPTH = "tcp_server_queue_depth"
class nats:
ERRORS_TOTAL = "nats_errors_total"
class trtllm_additional:
"""Additional TRT-LLM worker metrics beyond what the engine natively provides."""
......
......@@ -8,7 +8,10 @@
pub mod frontend_perf;
pub mod prometheus_names;
pub mod request_plane;
pub mod tokio_perf;
pub mod transport_metrics;
pub mod work_handler_perf;
use parking_lot::Mutex;
use std::collections::HashSet;
......@@ -891,6 +894,13 @@ impl MetricsRegistry {
.map_err(|e| anyhow::anyhow!("Failed to register metric: {}", e))
}
/// Add a Prometheus metric collector, logging a warning on failure instead of returning an error.
pub fn add_metric_or_warn(&self, collector: Box<dyn prometheus::core::Collector>, name: &str) {
if let Err(e) = self.add_metric(collector) {
tracing::warn!(error = %e, metric = name, "Failed to register metric");
}
}
/// Get a read guard to the Prometheus registry for scraping
pub fn get_prometheus_registry(&self) -> std::sync::RwLockReadGuard<'_, prometheus::Registry> {
self.prometheus_registry.read().unwrap()
......
......@@ -77,6 +77,15 @@ pub mod name_prefix {
/// Prefix for standalone KV indexer metrics
pub const KVINDEXER: &str = "dynamo_kvindexer";
/// Prefix for request-plane metrics at AddressedPushRouter
pub const REQUEST_PLANE: &str = "dynamo_request_plane";
/// Prefix for transport-layer metrics (TCP / NATS)
pub const TRANSPORT: &str = "dynamo_transport";
/// Prefix for work-handler transport breakdown metrics (backend side)
pub const WORK_HANDLER: &str = "dynamo_work_handler";
}
/// Automatically inserted Prometheus label names used across the metrics system
......@@ -540,6 +549,33 @@ pub mod kvindexer {
pub const WORKERS: &str = "workers";
}
/// Request plane metrics at AddressedPushRouter
pub mod request_plane {
/// Time from generate() entry to send_request() (serialization + encoding)
pub const QUEUE_SECONDS: &str = "queue_seconds";
/// Time for send_request() to complete (frontend view: network + queue + ack)
pub const SEND_SECONDS: &str = "send_seconds";
/// Time from send_request() to first response item (transport roundtrip TTFT)
pub const ROUNDTRIP_TTFT_SECONDS: &str = "roundtrip_ttft_seconds";
/// Currently in-flight requests (gauge)
pub const INFLIGHT_REQUESTS: &str = "inflight_requests";
}
/// Transport-specific metrics (TCP / NATS)
pub mod transport {
pub mod tcp {
pub const POOL_ACTIVE: &str = "tcp_pool_active";
pub const POOL_IDLE: &str = "tcp_pool_idle";
pub const BYTES_SENT_TOTAL: &str = "tcp_bytes_sent_total";
pub const BYTES_RECEIVED_TOTAL: &str = "tcp_bytes_received_total";
pub const ERRORS_TOTAL: &str = "tcp_errors_total";
pub const SERVER_QUEUE_DEPTH: &str = "tcp_server_queue_depth";
}
pub mod nats {
pub const ERRORS_TOTAL: &str = "nats_errors_total";
}
}
// KvRouter (including KvIndexer) Prometheus metric names
pub mod kvrouter {
/// Number of KV cache events applied to the index (including status)
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Request-plane metrics for AddressedPushRouter.
//! Used to pinpoint serialization vs transport roundtrip latency.
use once_cell::sync::{Lazy, OnceCell};
use prometheus::{Gauge, Histogram, HistogramOpts};
use super::prometheus_names::{name_prefix, request_plane};
use crate::MetricsRegistry;
fn request_plane_metric_name(suffix: &str) -> String {
format!("{}_{}", name_prefix::REQUEST_PLANE, suffix)
}
/// Time from generate() entry to send_request() (serialization + encoding + control message).
pub static REQUEST_PLANE_QUEUE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
Histogram::with_opts(
HistogramOpts::new(
request_plane_metric_name(request_plane::QUEUE_SECONDS),
"Time from generate() entry to send_request() (seconds)",
)
.buckets(vec![
0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0,
]),
)
.expect("request_plane_queue_seconds histogram")
});
/// Time for send_request() to complete (frontend view: network + queue + ack).
pub static REQUEST_PLANE_SEND_SECONDS: Lazy<Histogram> = Lazy::new(|| {
Histogram::with_opts(
HistogramOpts::new(
request_plane_metric_name(request_plane::SEND_SECONDS),
"Time for send_request() to complete (seconds)",
)
.buckets(vec![
0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0,
]),
)
.expect("request_plane_send_seconds histogram")
});
/// Time from send_request() to first response item (transport roundtrip TTFT).
pub static REQUEST_PLANE_ROUNDTRIP_TTFT_SECONDS: Lazy<Histogram> = Lazy::new(|| {
Histogram::with_opts(
HistogramOpts::new(
request_plane_metric_name(request_plane::ROUNDTRIP_TTFT_SECONDS),
"Time from send_request() to first response item (seconds)",
)
.buckets(vec![
0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0,
]),
)
.expect("request_plane_roundtrip_ttft_seconds histogram")
});
/// Currently in-flight requests (incremented at generate() entry, decremented on stream complete).
pub static REQUEST_PLANE_INFLIGHT: Lazy<Gauge> = Lazy::new(|| {
Gauge::new(
request_plane_metric_name(request_plane::INFLIGHT_REQUESTS),
"Currently in-flight requests at AddressedPushRouter",
)
.expect("request_plane_inflight gauge")
});
/// Guards idempotency for the `MetricsRegistry` registration path.
static METRICS_REGISTERED: OnceCell<()> = OnceCell::new();
/// Guards idempotency for the raw `prometheus::Registry` registration path.
/// Kept separate from `METRICS_REGISTERED` so that calling `ensure_request_plane_metrics_registered`
/// first does not silently prevent the metrics from being registered in the prometheus registry.
static PROMETHEUS_REGISTERED: OnceCell<Result<(), String>> = OnceCell::new();
/// Register request-plane metrics with the given registry. Idempotent; only the first call registers.
pub fn ensure_request_plane_metrics_registered(registry: &MetricsRegistry) {
let _ = METRICS_REGISTERED.get_or_init(|| {
registry.add_metric_or_warn(
Box::new(REQUEST_PLANE_QUEUE_SECONDS.clone()),
"request_plane_queue_seconds",
);
registry.add_metric_or_warn(
Box::new(REQUEST_PLANE_SEND_SECONDS.clone()),
"request_plane_send_seconds",
);
registry.add_metric_or_warn(
Box::new(REQUEST_PLANE_ROUNDTRIP_TTFT_SECONDS.clone()),
"request_plane_roundtrip_ttft_seconds",
);
registry.add_metric_or_warn(
Box::new(REQUEST_PLANE_INFLIGHT.clone()),
"request_plane_inflight",
);
});
}
/// Register request-plane metrics with a raw Prometheus registry (e.g. for LLM HTTP service /metrics).
/// Idempotent; only the first call registers. Call this when the service exposes /metrics from its own registry.
pub fn ensure_request_plane_metrics_registered_prometheus(
registry: &prometheus::Registry,
) -> Result<(), prometheus::Error> {
PROMETHEUS_REGISTERED
.get_or_init(|| {
(|| -> Result<(), prometheus::Error> {
registry.register(Box::new(REQUEST_PLANE_QUEUE_SECONDS.clone()))?;
registry.register(Box::new(REQUEST_PLANE_SEND_SECONDS.clone()))?;
registry.register(Box::new(REQUEST_PLANE_ROUNDTRIP_TTFT_SECONDS.clone()))?;
registry.register(Box::new(REQUEST_PLANE_INFLIGHT.clone()))?;
Ok(())
})()
.map_err(|e| e.to_string())
})
.as_ref()
.map(|_| ())
.map_err(|e| prometheus::Error::Msg(e.clone()))
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Transport-layer Prometheus metrics (TCP + NATS).
//! Statics are incremented directly in the client send paths.
use once_cell::sync::{Lazy, OnceCell};
use prometheus::{Counter, IntCounterVec, Opts};
use super::prometheus_names::{name_prefix, transport};
fn transport_metric_name(suffix: &str) -> String {
format!("{}_{}", name_prefix::TRANSPORT, suffix)
}
// --- TCP counters ---
pub static TCP_BYTES_SENT_TOTAL: Lazy<Counter> = Lazy::new(|| {
Counter::new(
transport_metric_name(transport::tcp::BYTES_SENT_TOTAL),
"Total bytes sent by TCP request client",
)
.expect("tcp_bytes_sent_total counter")
});
pub static TCP_BYTES_RECEIVED_TOTAL: Lazy<Counter> = Lazy::new(|| {
Counter::new(
transport_metric_name(transport::tcp::BYTES_RECEIVED_TOTAL),
"Total bytes received by TCP request client",
)
.expect("tcp_bytes_received_total counter")
});
pub static TCP_ERRORS_TOTAL: Lazy<Counter> = Lazy::new(|| {
Counter::new(
transport_metric_name(transport::tcp::ERRORS_TOTAL),
"Total TCP request errors (send failure or timeout)",
)
.expect("tcp_errors_total counter")
});
// --- NATS counters ---
/// `error_type` label values: "request_failed"
pub static NATS_ERRORS_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
IntCounterVec::new(
Opts::new(
transport_metric_name(transport::nats::ERRORS_TOTAL),
"Total NATS request errors (label: error_type)",
),
&["error_type"],
)
.expect("nats_errors_total counter vec")
});
/// Guards idempotency for the raw `prometheus::Registry` registration path.
static PROMETHEUS_REGISTERED: OnceCell<Result<(), String>> = OnceCell::new();
/// Register transport metrics with a raw Prometheus registry. Idempotent.
pub fn ensure_transport_metrics_registered_prometheus(
registry: &prometheus::Registry,
) -> Result<(), prometheus::Error> {
PROMETHEUS_REGISTERED
.get_or_init(|| {
(|| -> Result<(), prometheus::Error> {
registry.register(Box::new(TCP_BYTES_SENT_TOTAL.clone()))?;
registry.register(Box::new(TCP_BYTES_RECEIVED_TOTAL.clone()))?;
registry.register(Box::new(TCP_ERRORS_TOTAL.clone()))?;
registry.register(Box::new(NATS_ERRORS_TOTAL.clone()))?;
Ok(())
})()
.map_err(|e| e.to_string())
})
.as_ref()
.map(|_| ())
.map_err(|e| prometheus::Error::Msg(e.clone()))
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Transport breakdown metrics for work handler (backend side).
//! Captures network transit (T2-T1) and backend processing time (T3-T2).
use once_cell::sync::{Lazy, OnceCell};
use prometheus::{Histogram, HistogramOpts};
use super::prometheus_names::{name_prefix, work_handler};
use crate::MetricsRegistry;
fn work_handler_metric_name(suffix: &str) -> String {
format!("{}_{}", name_prefix::WORK_HANDLER, suffix)
}
/// Network transit: frontend send to backend receive (wall-clock, cross-process).
pub static WORK_HANDLER_NETWORK_TRANSIT_SECONDS: Lazy<Histogram> = Lazy::new(|| {
Histogram::with_opts(
HistogramOpts::new(
work_handler_metric_name(work_handler::NETWORK_TRANSIT_SECONDS),
"Frontend-to-backend network transit time (cross-process wall-clock, seconds)",
)
.buckets(vec![
0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0,
]),
)
.expect("work_handler_network_transit_seconds histogram")
});
/// Backend processing: handle_payload entry to first response sent.
pub static WORK_HANDLER_TIME_TO_FIRST_RESPONSE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
Histogram::with_opts(
HistogramOpts::new(
work_handler_metric_name(work_handler::TIME_TO_FIRST_RESPONSE_SECONDS),
"Backend processing time from handle_payload entry to prologue sent (seconds)",
)
.buckets(vec![
0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
]),
)
.expect("work_handler_time_to_first_response_seconds histogram")
});
/// Guards idempotency for the `MetricsRegistry` registration path.
static METRICS_REGISTERED: OnceCell<()> = OnceCell::new();
/// Guards idempotency for the raw `prometheus::Registry` registration path.
/// Kept separate from `METRICS_REGISTERED` so that calling `ensure_work_handler_perf_metrics_registered`
/// first does not silently prevent the metrics from being registered in the prometheus registry.
static PROMETHEUS_REGISTERED: OnceCell<Result<(), String>> = OnceCell::new();
/// Register work handler transport breakdown metrics with the given registry. Idempotent.
pub fn ensure_work_handler_perf_metrics_registered(registry: &MetricsRegistry) {
let _ = METRICS_REGISTERED.get_or_init(|| {
registry.add_metric_or_warn(
Box::new(WORK_HANDLER_NETWORK_TRANSIT_SECONDS.clone()),
"work_handler_network_transit_seconds",
);
registry.add_metric_or_warn(
Box::new(WORK_HANDLER_TIME_TO_FIRST_RESPONSE_SECONDS.clone()),
"work_handler_time_to_first_response_seconds",
);
});
}
/// Register with a raw Prometheus registry. Idempotent.
pub fn ensure_work_handler_perf_metrics_registered_prometheus(
registry: &prometheus::Registry,
) -> Result<(), prometheus::Error> {
PROMETHEUS_REGISTERED
.get_or_init(|| {
(|| -> Result<(), prometheus::Error> {
registry.register(Box::new(WORK_HANDLER_NETWORK_TRANSIT_SECONDS.clone()))?;
registry.register(Box::new(
WORK_HANDLER_TIME_TO_FIRST_RESPONSE_SECONDS.clone(),
))?;
Ok(())
})()
.map_err(|e| e.to_string())
})
.as_ref()
.map(|_| ())
.map_err(|e| prometheus::Error::Msg(e.clone()))
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment