Unverified Commit 3c44b88e authored by Biswa Panda's avatar Biswa Panda Committed by GitHub
Browse files

feat(perf): add frontend pipeline and tokio runtime perf metric definitions (#6731)

parent c619ce36
...@@ -391,6 +391,7 @@ FROM wheel_builder_base AS runtime_wheel_builder ...@@ -391,6 +391,7 @@ FROM wheel_builder_base AS runtime_wheel_builder
{% if target not in ("dev", "local-dev") %} {% if target not in ("dev", "local-dev") %}
# Copy source code (order matters for layer caching) # Copy source code (order matters for layer caching)
COPY .cargo/ /opt/dynamo/.cargo/
COPY pyproject.toml README.md LICENSE Cargo.toml Cargo.lock rust-toolchain.toml hatch_build.py /opt/dynamo/ COPY pyproject.toml README.md LICENSE Cargo.toml Cargo.lock rust-toolchain.toml hatch_build.py /opt/dynamo/
COPY lib/ /opt/dynamo/lib/ COPY lib/ /opt/dynamo/lib/
COPY components/ /opt/dynamo/components/ COPY components/ /opt/dynamo/components/
...@@ -531,6 +532,7 @@ RUN --mount=type=secret,id=aws-key-id,env=AWS_ACCESS_KEY_ID \ ...@@ -531,6 +532,7 @@ RUN --mount=type=secret,id=aws-key-id,env=AWS_ACCESS_KEY_ID \
{% if target not in ("dev", "local-dev") %} {% if target not in ("dev", "local-dev") %}
# Copy source code (order matters for layer caching) # Copy source code (order matters for layer caching)
COPY .cargo/ /opt/dynamo/.cargo/
COPY pyproject.toml README.md LICENSE Cargo.toml Cargo.lock rust-toolchain.toml hatch_build.py /opt/dynamo/ COPY pyproject.toml README.md LICENSE Cargo.toml Cargo.lock rust-toolchain.toml hatch_build.py /opt/dynamo/
COPY lib/ /opt/dynamo/lib/ COPY lib/ /opt/dynamo/lib/
COPY components/ /opt/dynamo/components/ COPY components/ /opt/dynamo/components/
......
...@@ -53,6 +53,7 @@ WORKDIR /dynamo ...@@ -53,6 +53,7 @@ WORKDIR /dynamo
# Copy Cargo workspace manifests, lockfile, and README (some crates inherit # Copy Cargo workspace manifests, lockfile, and README (some crates inherit
# readme.workspace = true, so cargo needs README.md at the workspace root) # readme.workspace = true, so cargo needs README.md at the workspace root)
COPY --from=dynamo .cargo/ .cargo/
COPY --from=dynamo Cargo.toml Cargo.lock README.md ./ COPY --from=dynamo Cargo.toml Cargo.lock README.md ./
# Copy all workspace crates (libdynamo_llm depends transitively on many) # Copy all workspace crates (libdynamo_llm depends transitively on many)
......
...@@ -42,6 +42,23 @@ class distributed_runtime: ...@@ -42,6 +42,23 @@ class distributed_runtime:
UPTIME_SECONDS = "uptime_seconds" UPTIME_SECONDS = "uptime_seconds"
class frontend_perf:
"""Frontend pipeline stage and event-loop metrics"""
# Per-stage latency histogram (label: stage = preprocess|route|transport_roundtrip|postprocess)
STAGE_DURATION_SECONDS = "stage_duration_seconds"
# Tokenization time in preprocessor
TOKENIZE_SECONDS = "tokenize_seconds"
# Template application time in preprocessor
TEMPLATE_SECONDS = "template_seconds"
# Per-token detokenization cost (microseconds)
DETOKENIZE_PER_TOKEN_US = "detokenize_per_token_us"
# Event loop delay canary (sleep 10ms, measure drift)
EVENT_LOOP_DELAY_SECONDS = "event_loop_delay_seconds"
# Count of event loop stalls (delay > 5ms)
EVENT_LOOP_STALL_TOTAL = "event_loop_stall_total"
class frontend_service: class frontend_service:
"""Frontend service metrics (LLM HTTP service)""" """Frontend service metrics (LLM HTTP service)"""
...@@ -206,6 +223,8 @@ class name_prefix: ...@@ -206,6 +223,8 @@ class name_prefix:
FRONTEND = "dynamo_frontend" FRONTEND = "dynamo_frontend"
# Prefix for KV router metrics (used with router_id label) # Prefix for KV router metrics (used with router_id label)
ROUTER = "dynamo_router" ROUTER = "dynamo_router"
# Prefix for tokio runtime metrics
TOKIO = "dynamo_tokio"
class router: class router:
...@@ -282,6 +301,23 @@ class task_tracker: ...@@ -282,6 +301,23 @@ class task_tracker:
TASKS_REJECTED_TOTAL = "tasks_rejected_total" TASKS_REJECTED_TOTAL = "tasks_rejected_total"
class tokio_perf:
"""Tokio runtime metrics"""
WORKER_MEAN_POLL_TIME_NS = "worker_mean_poll_time_ns"
GLOBAL_QUEUE_DEPTH = "global_queue_depth"
BUDGET_FORCED_YIELD_TOTAL = "budget_forced_yield_total"
WORKER_BUSY_RATIO = "worker_busy_ratio"
WORKER_PARK_COUNT_TOTAL = "worker_park_count_total"
WORKER_LOCAL_QUEUE_DEPTH = "worker_local_queue_depth"
WORKER_STEAL_COUNT_TOTAL = "worker_steal_count_total"
WORKER_OVERFLOW_COUNT_TOTAL = "worker_overflow_count_total"
BLOCKING_THREADS = "blocking_threads"
BLOCKING_IDLE_THREADS = "blocking_idle_threads"
BLOCKING_QUEUE_DEPTH = "blocking_queue_depth"
ALIVE_TASKS = "alive_tasks"
class work_handler: class work_handler:
"""Work handler Prometheus metric names""" """Work handler Prometheus metric names"""
...@@ -298,5 +334,9 @@ class work_handler: ...@@ -298,5 +334,9 @@ class work_handler:
REQUEST_DURATION_SECONDS = "request_duration_seconds" REQUEST_DURATION_SECONDS = "request_duration_seconds"
# Total number of errors in work handler processing # Total number of errors in work handler processing
ERRORS_TOTAL = "errors_total" ERRORS_TOTAL = "errors_total"
# Network transit: frontend send to backend receive (wall-clock, cross-process)
NETWORK_TRANSIT_SECONDS = "network_transit_seconds"
# Backend processing: handle_payload entry to first response sent
TIME_TO_FIRST_RESPONSE_SECONDS = "time_to_first_response_seconds"
# Label name for error type classification # Label name for error type classification
ERROR_TYPE_LABEL = "error_type" ERROR_TYPE_LABEL = "error_type"
...@@ -366,14 +366,22 @@ impl RuntimeConfig { ...@@ -366,14 +366,22 @@ impl RuntimeConfig {
/// Create a new default runtime configuration /// Create a new default runtime configuration
pub(crate) fn create_runtime(&self) -> std::io::Result<tokio::runtime::Runtime> { pub(crate) fn create_runtime(&self) -> std::io::Result<tokio::runtime::Runtime> {
tokio::runtime::Builder::new_multi_thread() let mut builder = tokio::runtime::Builder::new_multi_thread();
builder
.worker_threads( .worker_threads(
self.num_worker_threads self.num_worker_threads
.unwrap_or_else(|| std::thread::available_parallelism().unwrap().get()), .unwrap_or_else(|| std::thread::available_parallelism().unwrap().get()),
) )
.max_blocking_threads(self.max_blocking_threads) .max_blocking_threads(self.max_blocking_threads)
.enable_all() .enable_all();
.build() if env_is_truthy(environment_names::runtime::DYN_ENABLE_POLL_HISTOGRAM) {
tracing::info!(
"Tokio poll-time histogram enabled (DYN_ENABLE_POLL_HISTOGRAM); \
expect ~2× Instant::now() overhead per task poll"
);
builder.enable_metrics_poll_time_histogram();
}
builder.build()
} }
} }
......
...@@ -66,6 +66,10 @@ pub mod runtime { ...@@ -66,6 +66,10 @@ pub mod runtime {
/// Maximum number of blocking threads for Tokio runtime /// Maximum number of blocking threads for Tokio runtime
pub const DYN_RUNTIME_MAX_BLOCKING_THREADS: &str = "DYN_RUNTIME_MAX_BLOCKING_THREADS"; pub const DYN_RUNTIME_MAX_BLOCKING_THREADS: &str = "DYN_RUNTIME_MAX_BLOCKING_THREADS";
/// Enable Tokio task poll-time histogram (calls enable_metrics_poll_time_histogram on builder).
/// Set to "1", "true", or "yes" to enable. Adds ~2× overhead of Instant::now() per task poll.
pub const DYN_ENABLE_POLL_HISTOGRAM: &str = "DYN_ENABLE_POLL_HISTOGRAM";
/// System status server configuration /// System status server configuration
pub mod system { pub mod system {
/// Enable system status server for health and metrics endpoints /// Enable system status server for health and metrics endpoints
......
...@@ -6,7 +6,9 @@ ...@@ -6,7 +6,9 @@
//! This module provides a trait-based interface for creating and managing Prometheus metrics //! This module provides a trait-based interface for creating and managing Prometheus metrics
//! with automatic label injection and hierarchical naming support. //! with automatic label injection and hierarchical naming support.
pub mod frontend_perf;
pub mod prometheus_names; pub mod prometheus_names;
pub mod tokio_perf;
use parking_lot::Mutex; use parking_lot::Mutex;
use std::collections::HashSet; use std::collections::HashSet;
......
// SPDX-FileCopyrightText: Copyright (c) 2026-2027 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Frontend pipeline stage and finer-grained perf metrics.
//! Used by both runtime (route, transport_roundtrip) and llm (preprocess, postprocess, tokenize, template, detokenize).
use once_cell::sync::{Lazy, OnceCell};
use prometheus::{Histogram, HistogramOpts, HistogramVec, Registry};
use super::prometheus_names::{frontend_perf, name_prefix};
use crate::MetricsRegistry;
fn frontend_metric_name(suffix: &str) -> String {
format!("{}_{}", name_prefix::FRONTEND, suffix)
}
/// Per-stage latency: preprocess, route, transport_roundtrip, postprocess.
pub static STAGE_DURATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
HistogramVec::new(
HistogramOpts::new(
frontend_metric_name(frontend_perf::STAGE_DURATION_SECONDS),
"Pipeline stage duration (seconds)",
)
.buckets(vec![
0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 2.5, 5.0,
]),
&["stage"],
)
.expect("stage_duration_seconds histogram vec")
});
/// Tokenization time in preprocessor (gather_tokens).
pub static TOKENIZE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
Histogram::with_opts(
HistogramOpts::new(
frontend_metric_name(frontend_perf::TOKENIZE_SECONDS),
"Tokenization time in preprocessor (seconds)",
)
.buckets(vec![
0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0,
]),
)
.expect("tokenize_seconds histogram")
});
/// Template application time in preprocessor (apply_template).
pub static TEMPLATE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
Histogram::with_opts(
HistogramOpts::new(
frontend_metric_name(frontend_perf::TEMPLATE_SECONDS),
"Template application time in preprocessor (seconds)",
)
.buckets(vec![
0.00001, 0.00005, 0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05,
]),
)
.expect("template_seconds histogram")
});
/// Per-token detokenization cost (microseconds).
pub static DETOKENIZE_PER_TOKEN_US: Lazy<Histogram> = Lazy::new(|| {
Histogram::with_opts(
HistogramOpts::new(
frontend_metric_name(frontend_perf::DETOKENIZE_PER_TOKEN_US),
"Detokenization cost per token (microseconds)",
)
.buckets(vec![
1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0,
]),
)
.expect("detokenize_per_token_us histogram")
});
/// Guards idempotency for the `MetricsRegistry` registration path.
static REGISTERED: OnceCell<()> = OnceCell::new();
/// Guards idempotency for the raw `prometheus::Registry` registration path.
/// Kept separate from `REGISTERED` so that calling `ensure_frontend_perf_metrics_registered`
/// first does not silently prevent the metrics from being registered in the prometheus registry.
static PROMETHEUS_REGISTERED: OnceCell<()> = OnceCell::new();
/// Register frontend perf metrics with the given registry. Idempotent.
pub fn ensure_frontend_perf_metrics_registered(registry: &MetricsRegistry) {
let _ = REGISTERED.get_or_init(|| {
registry
.add_metric(Box::new(STAGE_DURATION_SECONDS.clone()))
.ok();
registry.add_metric(Box::new(TOKENIZE_SECONDS.clone())).ok();
registry.add_metric(Box::new(TEMPLATE_SECONDS.clone())).ok();
registry
.add_metric(Box::new(DETOKENIZE_PER_TOKEN_US.clone()))
.ok();
});
}
/// Register frontend perf metrics with a raw Prometheus registry (e.g. for LLM HTTP service /metrics).
/// Idempotent. Call this when the service exposes /metrics from its own registry.
pub fn ensure_frontend_perf_metrics_registered_prometheus(
registry: &Registry,
) -> Result<(), prometheus::Error> {
if PROMETHEUS_REGISTERED.get().is_some() {
return Ok(());
}
registry.register(Box::new(STAGE_DURATION_SECONDS.clone()))?;
registry.register(Box::new(TOKENIZE_SECONDS.clone()))?;
registry.register(Box::new(TEMPLATE_SECONDS.clone()))?;
registry.register(Box::new(DETOKENIZE_PER_TOKEN_US.clone()))?;
let _ = PROMETHEUS_REGISTERED.set(());
Ok(())
}
...@@ -71,6 +71,9 @@ pub mod name_prefix { ...@@ -71,6 +71,9 @@ pub mod name_prefix {
/// Prefix for KV router metrics (used with router_id label) /// Prefix for KV router metrics (used with router_id label)
pub const ROUTER: &str = "dynamo_router"; pub const ROUTER: &str = "dynamo_router";
/// Prefix for tokio runtime metrics
pub const TOKIO: &str = "dynamo_tokio";
} }
/// Automatically inserted Prometheus label names used across the metrics system /// Automatically inserted Prometheus label names used across the metrics system
...@@ -320,6 +323,12 @@ pub mod work_handler { ...@@ -320,6 +323,12 @@ pub mod work_handler {
/// Total number of errors in work handler processing /// Total number of errors in work handler processing
pub const ERRORS_TOTAL: &str = "errors_total"; pub const ERRORS_TOTAL: &str = "errors_total";
/// Network transit: frontend send to backend receive (wall-clock, cross-process)
pub const NETWORK_TRANSIT_SECONDS: &str = "network_transit_seconds";
/// Backend processing: handle_payload entry to first response sent
pub const TIME_TO_FIRST_RESPONSE_SECONDS: &str = "time_to_first_response_seconds";
/// Label name for error type classification /// Label name for error type classification
pub const ERROR_TYPE_LABEL: &str = "error_type"; pub const ERROR_TYPE_LABEL: &str = "error_type";
...@@ -478,6 +487,38 @@ pub mod router { ...@@ -478,6 +487,38 @@ pub mod router {
pub const OUTPUT_SEQUENCE_TOKENS: &str = "router_output_sequence_tokens"; pub const OUTPUT_SEQUENCE_TOKENS: &str = "router_output_sequence_tokens";
} }
/// Frontend pipeline stage and event-loop metrics
pub mod frontend_perf {
/// Per-stage latency histogram (label: stage = preprocess|route|transport_roundtrip|postprocess)
pub const STAGE_DURATION_SECONDS: &str = "stage_duration_seconds";
/// Tokenization time in preprocessor
pub const TOKENIZE_SECONDS: &str = "tokenize_seconds";
/// Template application time in preprocessor
pub const TEMPLATE_SECONDS: &str = "template_seconds";
/// Per-token detokenization cost (microseconds)
pub const DETOKENIZE_PER_TOKEN_US: &str = "detokenize_per_token_us";
/// Event loop delay canary (sleep 10ms, measure drift)
pub const EVENT_LOOP_DELAY_SECONDS: &str = "event_loop_delay_seconds";
/// Count of event loop stalls (delay > 5ms)
pub const EVENT_LOOP_STALL_TOTAL: &str = "event_loop_stall_total";
}
/// Tokio runtime metrics
pub mod tokio_perf {
pub const WORKER_MEAN_POLL_TIME_NS: &str = "worker_mean_poll_time_ns";
pub const GLOBAL_QUEUE_DEPTH: &str = "global_queue_depth";
pub const BUDGET_FORCED_YIELD_TOTAL: &str = "budget_forced_yield_total";
pub const WORKER_BUSY_RATIO: &str = "worker_busy_ratio";
pub const WORKER_PARK_COUNT_TOTAL: &str = "worker_park_count_total";
pub const WORKER_LOCAL_QUEUE_DEPTH: &str = "worker_local_queue_depth";
pub const WORKER_STEAL_COUNT_TOTAL: &str = "worker_steal_count_total";
pub const WORKER_OVERFLOW_COUNT_TOTAL: &str = "worker_overflow_count_total";
pub const BLOCKING_THREADS: &str = "blocking_threads";
pub const BLOCKING_IDLE_THREADS: &str = "blocking_idle_threads";
pub const BLOCKING_QUEUE_DEPTH: &str = "blocking_queue_depth";
pub const ALIVE_TASKS: &str = "alive_tasks";
}
// KvRouter (including KvInexer) Prometheus metric names // KvRouter (including KvInexer) Prometheus metric names
pub mod kvrouter { pub mod kvrouter {
/// Number of KV cache events applied to the index (including status) /// Number of KV cache events applied to the index (including status)
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Tokio runtime metrics and event-loop canary
use once_cell::sync::{Lazy, OnceCell};
use prometheus::{Counter, Gauge, Histogram, HistogramOpts, IntCounterVec, IntGaugeVec, Opts};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use tokio::runtime::Handle;
use super::prometheus_names::{frontend_perf, name_prefix, tokio_perf as names};
use crate::MetricsRegistry;
fn tokio_metric_name(suffix: &str) -> String {
format!("{}_{}", name_prefix::TOKIO, suffix)
}
// --- Tokio runtime gauges/counters (updated every 1s by collector) ---
pub static TOKIO_GLOBAL_QUEUE_DEPTH: Lazy<Gauge> = Lazy::new(|| {
Gauge::new(
tokio_metric_name(names::GLOBAL_QUEUE_DEPTH),
"Number of tasks in the runtime global queue",
)
.expect("tokio global_queue_depth gauge")
});
pub static TOKIO_BUDGET_FORCED_YIELD_TOTAL: Lazy<Counter> = Lazy::new(|| {
Counter::new(
tokio_metric_name(names::BUDGET_FORCED_YIELD_TOTAL),
"Number of times tasks were forced to yield after exhausting budget",
)
.expect("tokio budget_forced_yield_total counter")
});
pub static TOKIO_BLOCKING_THREADS: Lazy<Gauge> = Lazy::new(|| {
Gauge::new(
tokio_metric_name(names::BLOCKING_THREADS),
"Number of blocking threads",
)
.expect("tokio blocking_threads gauge")
});
pub static TOKIO_BLOCKING_IDLE_THREADS: Lazy<Gauge> = Lazy::new(|| {
Gauge::new(
tokio_metric_name(names::BLOCKING_IDLE_THREADS),
"Number of idle blocking threads",
)
.expect("tokio blocking_idle_threads gauge")
});
pub static TOKIO_BLOCKING_QUEUE_DEPTH: Lazy<Gauge> = Lazy::new(|| {
Gauge::new(
tokio_metric_name(names::BLOCKING_QUEUE_DEPTH),
"Number of tasks in the blocking thread pool queue",
)
.expect("tokio blocking_queue_depth gauge")
});
pub static TOKIO_ALIVE_TASKS: Lazy<Gauge> = Lazy::new(|| {
Gauge::new(
tokio_metric_name(names::ALIVE_TASKS),
"Number of alive tasks in the runtime",
)
.expect("tokio alive_tasks gauge")
});
// Per-worker metrics (GaugeVec/IntCounterVec with label "worker")
pub static TOKIO_WORKER_MEAN_POLL_TIME_NS: Lazy<IntGaugeVec> = Lazy::new(|| {
IntGaugeVec::new(
Opts::new(
tokio_metric_name(names::WORKER_MEAN_POLL_TIME_NS),
"Worker mean task poll time (nanoseconds)",
),
&["worker"],
)
.expect("tokio worker_mean_poll_time_ns gauge vec")
});
pub static TOKIO_WORKER_BUSY_RATIO_VEC: Lazy<IntGaugeVec> = Lazy::new(|| {
IntGaugeVec::new(
Opts::new(
tokio_metric_name(names::WORKER_BUSY_RATIO),
"Worker busy ratio (0-1) as integer mill ratio; >950 = saturated",
),
&["worker"],
)
.expect("tokio worker_busy_ratio vec")
});
pub static TOKIO_WORKER_PARK_COUNT_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
IntCounterVec::new(
Opts::new(
tokio_metric_name(names::WORKER_PARK_COUNT_TOTAL),
"Total number of times worker has parked",
),
&["worker"],
)
.expect("tokio worker_park_count_total")
});
pub static TOKIO_WORKER_LOCAL_QUEUE_DEPTH: Lazy<IntGaugeVec> = Lazy::new(|| {
IntGaugeVec::new(
Opts::new(
tokio_metric_name(names::WORKER_LOCAL_QUEUE_DEPTH),
"Number of tasks in worker local queue",
),
&["worker"],
)
.expect("tokio worker_local_queue_depth")
});
pub static TOKIO_WORKER_STEAL_COUNT_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
IntCounterVec::new(
Opts::new(
tokio_metric_name(names::WORKER_STEAL_COUNT_TOTAL),
"Total number of tasks stolen by worker",
),
&["worker"],
)
.expect("tokio worker_steal_count_total")
});
pub static TOKIO_WORKER_OVERFLOW_COUNT_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
IntCounterVec::new(
Opts::new(
tokio_metric_name(names::WORKER_OVERFLOW_COUNT_TOTAL),
"Total number of times worker local queue overflowed",
),
&["worker"],
)
.expect("tokio worker_overflow_count_total")
});
// --- Event loop canary ---
pub static EVENT_LOOP_DELAY_SECONDS: Lazy<Histogram> = Lazy::new(|| {
Histogram::with_opts(
HistogramOpts::new(
format!(
"{}_{}",
name_prefix::FRONTEND,
frontend_perf::EVENT_LOOP_DELAY_SECONDS
),
"Event loop delay canary: drift from 10ms sleep (seconds)",
)
.buckets(vec![
0.0, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0,
]),
)
.expect("event_loop_delay_seconds histogram")
});
pub static EVENT_LOOP_STALL_TOTAL: Lazy<Counter> = Lazy::new(|| {
Counter::new(
format!(
"{}_{}",
name_prefix::FRONTEND,
frontend_perf::EVENT_LOOP_STALL_TOTAL
),
"Number of event loop stalls (delay > 5ms)",
)
.expect("event_loop_stall_total counter")
});
/// Guards idempotency for the `MetricsRegistry` registration path.
static REGISTERED: OnceCell<()> = OnceCell::new();
/// Guards idempotency for the raw `prometheus::Registry` registration path.
/// Kept separate from `REGISTERED` so that calling `ensure_tokio_perf_metrics_registered`
/// first does not silently prevent the metrics from being registered in the prometheus registry.
static PROMETHEUS_REGISTERED: OnceCell<()> = OnceCell::new();
/// Register tokio perf and canary metrics with the given registry. Idempotent.
pub fn ensure_tokio_perf_metrics_registered(registry: &MetricsRegistry) {
let _ = REGISTERED.get_or_init(|| {
registry
.add_metric(Box::new(TOKIO_GLOBAL_QUEUE_DEPTH.clone()))
.ok();
registry
.add_metric(Box::new(TOKIO_BUDGET_FORCED_YIELD_TOTAL.clone()))
.ok();
registry
.add_metric(Box::new(TOKIO_BLOCKING_THREADS.clone()))
.ok();
registry
.add_metric(Box::new(TOKIO_BLOCKING_IDLE_THREADS.clone()))
.ok();
registry
.add_metric(Box::new(TOKIO_BLOCKING_QUEUE_DEPTH.clone()))
.ok();
registry
.add_metric(Box::new(TOKIO_ALIVE_TASKS.clone()))
.ok();
registry
.add_metric(Box::new(TOKIO_WORKER_MEAN_POLL_TIME_NS.clone()))
.ok();
registry
.add_metric(Box::new(TOKIO_WORKER_BUSY_RATIO_VEC.clone()))
.ok();
registry
.add_metric(Box::new(TOKIO_WORKER_PARK_COUNT_TOTAL.clone()))
.ok();
registry
.add_metric(Box::new(TOKIO_WORKER_LOCAL_QUEUE_DEPTH.clone()))
.ok();
registry
.add_metric(Box::new(TOKIO_WORKER_STEAL_COUNT_TOTAL.clone()))
.ok();
registry
.add_metric(Box::new(TOKIO_WORKER_OVERFLOW_COUNT_TOTAL.clone()))
.ok();
registry
.add_metric(Box::new(EVENT_LOOP_DELAY_SECONDS.clone()))
.ok();
registry
.add_metric(Box::new(EVENT_LOOP_STALL_TOTAL.clone()))
.ok();
});
}
/// Register tokio perf and canary metrics with a raw Prometheus registry.
pub fn ensure_tokio_perf_metrics_registered_prometheus(
registry: &prometheus::Registry,
) -> Result<(), prometheus::Error> {
if PROMETHEUS_REGISTERED.get().is_some() {
return Ok(());
}
registry.register(Box::new(TOKIO_GLOBAL_QUEUE_DEPTH.clone()))?;
registry.register(Box::new(TOKIO_BUDGET_FORCED_YIELD_TOTAL.clone()))?;
registry.register(Box::new(TOKIO_BLOCKING_THREADS.clone()))?;
registry.register(Box::new(TOKIO_BLOCKING_IDLE_THREADS.clone()))?;
registry.register(Box::new(TOKIO_BLOCKING_QUEUE_DEPTH.clone()))?;
registry.register(Box::new(TOKIO_ALIVE_TASKS.clone()))?;
registry.register(Box::new(TOKIO_WORKER_MEAN_POLL_TIME_NS.clone()))?;
registry.register(Box::new(TOKIO_WORKER_BUSY_RATIO_VEC.clone()))?;
registry.register(Box::new(TOKIO_WORKER_PARK_COUNT_TOTAL.clone()))?;
registry.register(Box::new(TOKIO_WORKER_LOCAL_QUEUE_DEPTH.clone()))?;
registry.register(Box::new(TOKIO_WORKER_STEAL_COUNT_TOTAL.clone()))?;
registry.register(Box::new(TOKIO_WORKER_OVERFLOW_COUNT_TOTAL.clone()))?;
registry.register(Box::new(EVENT_LOOP_DELAY_SECONDS.clone()))?;
registry.register(Box::new(EVENT_LOOP_STALL_TOTAL.clone()))?;
let _ = PROMETHEUS_REGISTERED.set(());
Ok(())
}
/// Run the tokio metrics collector (1s interval) and event-loop canary.
/// Spawn this on the runtime you want to monitor (e.g. primary handle).
pub async fn tokio_metrics_and_canary_loop() {
let canary_interval = Duration::from_millis(10);
let stall_threshold = Duration::from_millis(5);
let collect_interval = Duration::from_secs(1);
let mut next_collect = Instant::now() + collect_interval;
let mut prev_counters = PrevWorkerCounters::new();
loop {
let start = Instant::now();
tokio::time::sleep(canary_interval).await;
let delay = start.elapsed().saturating_sub(canary_interval);
EVENT_LOOP_DELAY_SECONDS.observe(delay.as_secs_f64());
if delay > stall_threshold {
EVENT_LOOP_STALL_TOTAL.inc();
}
if Instant::now() >= next_collect {
next_collect = Instant::now() + collect_interval;
sample_tokio_metrics(&mut prev_counters);
}
}
}
static PREV_BUDGET_FORCED_YIELD: AtomicU64 = AtomicU64::new(0);
/// Per-worker previous samples for the monotonic _TOTAL counters.
/// Owned by the single `tokio_metrics_and_canary_loop` task — no locks needed.
struct PrevWorkerCounters {
park: Vec<u64>,
steal: Vec<u64>,
overflow: Vec<u64>,
}
impl PrevWorkerCounters {
fn new() -> Self {
Self {
park: Vec::new(),
steal: Vec::new(),
overflow: Vec::new(),
}
}
fn ensure_capacity(&mut self, num_workers: usize) {
if self.park.len() < num_workers {
self.park.resize(num_workers, 0);
self.steal.resize(num_workers, 0);
self.overflow.resize(num_workers, 0);
}
}
}
fn sample_tokio_metrics(prev: &mut PrevWorkerCounters) {
let metrics = Handle::current().metrics();
TOKIO_GLOBAL_QUEUE_DEPTH.set(metrics.global_queue_depth() as f64);
let budget = metrics.budget_forced_yield_count();
let prev_budget = PREV_BUDGET_FORCED_YIELD.swap(budget, Ordering::Relaxed);
TOKIO_BUDGET_FORCED_YIELD_TOTAL.inc_by((budget.saturating_sub(prev_budget)) as f64);
TOKIO_BLOCKING_THREADS.set(metrics.num_blocking_threads() as f64);
TOKIO_BLOCKING_IDLE_THREADS.set(metrics.num_idle_blocking_threads() as f64);
TOKIO_BLOCKING_QUEUE_DEPTH.set(metrics.blocking_queue_depth() as f64);
TOKIO_ALIVE_TASKS.set(metrics.num_alive_tasks() as f64);
let num_workers = metrics.num_workers();
prev.ensure_capacity(num_workers);
for w in 0..num_workers {
let worker_label = w.to_string();
let mean_poll = metrics.worker_mean_poll_time(w);
TOKIO_WORKER_MEAN_POLL_TIME_NS
.with_label_values(&[&worker_label])
.set(mean_poll.as_nanos() as i64);
TOKIO_WORKER_LOCAL_QUEUE_DEPTH
.with_label_values(&[&worker_label])
.set(metrics.worker_local_queue_depth(w) as i64);
// Monotonically increasing totals: track deltas so we use inc_by on a Counter.
let park = metrics.worker_park_count(w);
TOKIO_WORKER_PARK_COUNT_TOTAL
.with_label_values(&[&worker_label])
.inc_by(park.saturating_sub(prev.park[w]));
prev.park[w] = park;
let steal = metrics.worker_steal_count(w);
TOKIO_WORKER_STEAL_COUNT_TOTAL
.with_label_values(&[&worker_label])
.inc_by(steal.saturating_sub(prev.steal[w]));
prev.steal[w] = steal;
let overflow = metrics.worker_overflow_count(w);
TOKIO_WORKER_OVERFLOW_COUNT_TOTAL
.with_label_values(&[&worker_label])
.inc_by(overflow.saturating_sub(prev.overflow[w]));
prev.overflow[w] = overflow;
// Busy ratio: total_busy_duration over 1s interval -> ratio. We don't have delta here;
// use mean_poll_time as proxy: if high, worker is busy. Store as 0-1000 (per mille).
let busy_proxy = (mean_poll.as_secs_f64() / 0.001).min(1.0); // 1ms = saturated
TOKIO_WORKER_BUSY_RATIO_VEC
.with_label_values(&[&worker_label])
.set((busy_proxy * 1000.0) as i64);
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment