tokio_perf.rs 12.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

//! Tokio runtime metrics and event-loop canary

use once_cell::sync::{Lazy, OnceCell};
use prometheus::{Counter, Gauge, Histogram, HistogramOpts, IntCounterVec, IntGaugeVec, Opts};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use tokio::runtime::Handle;
11
use tokio_util::sync::CancellationToken;
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249

use super::prometheus_names::{frontend_perf, name_prefix, tokio_perf as names};
use crate::MetricsRegistry;

fn tokio_metric_name(suffix: &str) -> String {
    format!("{}_{}", name_prefix::TOKIO, suffix)
}

// --- Tokio runtime gauges/counters (updated every 1s by collector) ---

pub static TOKIO_GLOBAL_QUEUE_DEPTH: Lazy<Gauge> = Lazy::new(|| {
    Gauge::new(
        tokio_metric_name(names::GLOBAL_QUEUE_DEPTH),
        "Number of tasks in the runtime global queue",
    )
    .expect("tokio global_queue_depth gauge")
});

pub static TOKIO_BUDGET_FORCED_YIELD_TOTAL: Lazy<Counter> = Lazy::new(|| {
    Counter::new(
        tokio_metric_name(names::BUDGET_FORCED_YIELD_TOTAL),
        "Number of times tasks were forced to yield after exhausting budget",
    )
    .expect("tokio budget_forced_yield_total counter")
});

pub static TOKIO_BLOCKING_THREADS: Lazy<Gauge> = Lazy::new(|| {
    Gauge::new(
        tokio_metric_name(names::BLOCKING_THREADS),
        "Number of blocking threads",
    )
    .expect("tokio blocking_threads gauge")
});

pub static TOKIO_BLOCKING_IDLE_THREADS: Lazy<Gauge> = Lazy::new(|| {
    Gauge::new(
        tokio_metric_name(names::BLOCKING_IDLE_THREADS),
        "Number of idle blocking threads",
    )
    .expect("tokio blocking_idle_threads gauge")
});

pub static TOKIO_BLOCKING_QUEUE_DEPTH: Lazy<Gauge> = Lazy::new(|| {
    Gauge::new(
        tokio_metric_name(names::BLOCKING_QUEUE_DEPTH),
        "Number of tasks in the blocking thread pool queue",
    )
    .expect("tokio blocking_queue_depth gauge")
});

pub static TOKIO_ALIVE_TASKS: Lazy<Gauge> = Lazy::new(|| {
    Gauge::new(
        tokio_metric_name(names::ALIVE_TASKS),
        "Number of alive tasks in the runtime",
    )
    .expect("tokio alive_tasks gauge")
});

// Per-worker metrics (GaugeVec/IntCounterVec with label "worker")
pub static TOKIO_WORKER_MEAN_POLL_TIME_NS: Lazy<IntGaugeVec> = Lazy::new(|| {
    IntGaugeVec::new(
        Opts::new(
            tokio_metric_name(names::WORKER_MEAN_POLL_TIME_NS),
            "Worker mean task poll time (nanoseconds)",
        ),
        &["worker"],
    )
    .expect("tokio worker_mean_poll_time_ns gauge vec")
});

pub static TOKIO_WORKER_BUSY_RATIO_VEC: Lazy<IntGaugeVec> = Lazy::new(|| {
    IntGaugeVec::new(
        Opts::new(
            tokio_metric_name(names::WORKER_BUSY_RATIO),
            "Worker busy ratio (0-1) as integer mill ratio; >950 = saturated",
        ),
        &["worker"],
    )
    .expect("tokio worker_busy_ratio vec")
});

pub static TOKIO_WORKER_PARK_COUNT_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
    IntCounterVec::new(
        Opts::new(
            tokio_metric_name(names::WORKER_PARK_COUNT_TOTAL),
            "Total number of times worker has parked",
        ),
        &["worker"],
    )
    .expect("tokio worker_park_count_total")
});

pub static TOKIO_WORKER_LOCAL_QUEUE_DEPTH: Lazy<IntGaugeVec> = Lazy::new(|| {
    IntGaugeVec::new(
        Opts::new(
            tokio_metric_name(names::WORKER_LOCAL_QUEUE_DEPTH),
            "Number of tasks in worker local queue",
        ),
        &["worker"],
    )
    .expect("tokio worker_local_queue_depth")
});

pub static TOKIO_WORKER_STEAL_COUNT_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
    IntCounterVec::new(
        Opts::new(
            tokio_metric_name(names::WORKER_STEAL_COUNT_TOTAL),
            "Total number of tasks stolen by worker",
        ),
        &["worker"],
    )
    .expect("tokio worker_steal_count_total")
});

pub static TOKIO_WORKER_OVERFLOW_COUNT_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
    IntCounterVec::new(
        Opts::new(
            tokio_metric_name(names::WORKER_OVERFLOW_COUNT_TOTAL),
            "Total number of times worker local queue overflowed",
        ),
        &["worker"],
    )
    .expect("tokio worker_overflow_count_total")
});

// --- Event loop canary ---
pub static EVENT_LOOP_DELAY_SECONDS: Lazy<Histogram> = Lazy::new(|| {
    Histogram::with_opts(
        HistogramOpts::new(
            format!(
                "{}_{}",
                name_prefix::FRONTEND,
                frontend_perf::EVENT_LOOP_DELAY_SECONDS
            ),
            "Event loop delay canary: drift from 10ms sleep (seconds)",
        )
        .buckets(vec![
            0.0, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0,
        ]),
    )
    .expect("event_loop_delay_seconds histogram")
});

pub static EVENT_LOOP_STALL_TOTAL: Lazy<Counter> = Lazy::new(|| {
    Counter::new(
        format!(
            "{}_{}",
            name_prefix::FRONTEND,
            frontend_perf::EVENT_LOOP_STALL_TOTAL
        ),
        "Number of event loop stalls (delay > 5ms)",
    )
    .expect("event_loop_stall_total counter")
});

/// Guards idempotency for the `MetricsRegistry` registration path.
static REGISTERED: OnceCell<()> = OnceCell::new();

/// Guards idempotency for the raw `prometheus::Registry` registration path.
/// Kept separate from `REGISTERED` so that calling `ensure_tokio_perf_metrics_registered`
/// first does not silently prevent the metrics from being registered in the prometheus registry.
static PROMETHEUS_REGISTERED: OnceCell<()> = OnceCell::new();

/// Register tokio perf and canary metrics with the given registry. Idempotent.
pub fn ensure_tokio_perf_metrics_registered(registry: &MetricsRegistry) {
    let _ = REGISTERED.get_or_init(|| {
        registry
            .add_metric(Box::new(TOKIO_GLOBAL_QUEUE_DEPTH.clone()))
            .ok();
        registry
            .add_metric(Box::new(TOKIO_BUDGET_FORCED_YIELD_TOTAL.clone()))
            .ok();
        registry
            .add_metric(Box::new(TOKIO_BLOCKING_THREADS.clone()))
            .ok();
        registry
            .add_metric(Box::new(TOKIO_BLOCKING_IDLE_THREADS.clone()))
            .ok();
        registry
            .add_metric(Box::new(TOKIO_BLOCKING_QUEUE_DEPTH.clone()))
            .ok();
        registry
            .add_metric(Box::new(TOKIO_ALIVE_TASKS.clone()))
            .ok();
        registry
            .add_metric(Box::new(TOKIO_WORKER_MEAN_POLL_TIME_NS.clone()))
            .ok();
        registry
            .add_metric(Box::new(TOKIO_WORKER_BUSY_RATIO_VEC.clone()))
            .ok();
        registry
            .add_metric(Box::new(TOKIO_WORKER_PARK_COUNT_TOTAL.clone()))
            .ok();
        registry
            .add_metric(Box::new(TOKIO_WORKER_LOCAL_QUEUE_DEPTH.clone()))
            .ok();
        registry
            .add_metric(Box::new(TOKIO_WORKER_STEAL_COUNT_TOTAL.clone()))
            .ok();
        registry
            .add_metric(Box::new(TOKIO_WORKER_OVERFLOW_COUNT_TOTAL.clone()))
            .ok();
        registry
            .add_metric(Box::new(EVENT_LOOP_DELAY_SECONDS.clone()))
            .ok();
        registry
            .add_metric(Box::new(EVENT_LOOP_STALL_TOTAL.clone()))
            .ok();
    });
}

/// Register tokio perf and canary metrics with a raw Prometheus registry.
pub fn ensure_tokio_perf_metrics_registered_prometheus(
    registry: &prometheus::Registry,
) -> Result<(), prometheus::Error> {
    if PROMETHEUS_REGISTERED.get().is_some() {
        return Ok(());
    }
    registry.register(Box::new(TOKIO_GLOBAL_QUEUE_DEPTH.clone()))?;
    registry.register(Box::new(TOKIO_BUDGET_FORCED_YIELD_TOTAL.clone()))?;
    registry.register(Box::new(TOKIO_BLOCKING_THREADS.clone()))?;
    registry.register(Box::new(TOKIO_BLOCKING_IDLE_THREADS.clone()))?;
    registry.register(Box::new(TOKIO_BLOCKING_QUEUE_DEPTH.clone()))?;
    registry.register(Box::new(TOKIO_ALIVE_TASKS.clone()))?;
    registry.register(Box::new(TOKIO_WORKER_MEAN_POLL_TIME_NS.clone()))?;
    registry.register(Box::new(TOKIO_WORKER_BUSY_RATIO_VEC.clone()))?;
    registry.register(Box::new(TOKIO_WORKER_PARK_COUNT_TOTAL.clone()))?;
    registry.register(Box::new(TOKIO_WORKER_LOCAL_QUEUE_DEPTH.clone()))?;
    registry.register(Box::new(TOKIO_WORKER_STEAL_COUNT_TOTAL.clone()))?;
    registry.register(Box::new(TOKIO_WORKER_OVERFLOW_COUNT_TOTAL.clone()))?;
    registry.register(Box::new(EVENT_LOOP_DELAY_SECONDS.clone()))?;
    registry.register(Box::new(EVENT_LOOP_STALL_TOTAL.clone()))?;
    let _ = PROMETHEUS_REGISTERED.set(());
    Ok(())
}

/// Run the tokio metrics collector (1s interval) and event-loop canary.
/// Spawn this on the runtime you want to monitor (e.g. primary handle).
250
251
/// The loop exits cleanly when `cancel` is triggered.
pub async fn tokio_metrics_and_canary_loop(cancel: CancellationToken) {
252
253
254
255
256
257
258
    let canary_interval = Duration::from_millis(10);
    let stall_threshold = Duration::from_millis(5);
    let collect_interval = Duration::from_secs(1);
    let mut next_collect = Instant::now() + collect_interval;
    let mut prev_counters = PrevWorkerCounters::new();
    loop {
        let start = Instant::now();
259
260
261
262
263
264
265
        tokio::select! {
            _ = tokio::time::sleep(canary_interval) => {}
            _ = cancel.cancelled() => {
                tracing::debug!("tokio metrics and canary loop shutting down");
                return;
            }
        }
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
        let delay = start.elapsed().saturating_sub(canary_interval);
        EVENT_LOOP_DELAY_SECONDS.observe(delay.as_secs_f64());
        if delay > stall_threshold {
            EVENT_LOOP_STALL_TOTAL.inc();
        }
        if Instant::now() >= next_collect {
            next_collect = Instant::now() + collect_interval;
            sample_tokio_metrics(&mut prev_counters);
        }
    }
}

static PREV_BUDGET_FORCED_YIELD: AtomicU64 = AtomicU64::new(0);

/// Per-worker previous samples for the monotonic _TOTAL counters.
/// Owned by the single `tokio_metrics_and_canary_loop` task — no locks needed.
struct PrevWorkerCounters {
    park: Vec<u64>,
    steal: Vec<u64>,
    overflow: Vec<u64>,
}

impl PrevWorkerCounters {
    fn new() -> Self {
        Self {
            park: Vec::new(),
            steal: Vec::new(),
            overflow: Vec::new(),
        }
    }

    fn ensure_capacity(&mut self, num_workers: usize) {
        if self.park.len() < num_workers {
            self.park.resize(num_workers, 0);
            self.steal.resize(num_workers, 0);
            self.overflow.resize(num_workers, 0);
        }
    }
}

fn sample_tokio_metrics(prev: &mut PrevWorkerCounters) {
    let metrics = Handle::current().metrics();

    TOKIO_GLOBAL_QUEUE_DEPTH.set(metrics.global_queue_depth() as f64);
    let budget = metrics.budget_forced_yield_count();
    let prev_budget = PREV_BUDGET_FORCED_YIELD.swap(budget, Ordering::Relaxed);
    TOKIO_BUDGET_FORCED_YIELD_TOTAL.inc_by((budget.saturating_sub(prev_budget)) as f64);
    TOKIO_BLOCKING_THREADS.set(metrics.num_blocking_threads() as f64);
    TOKIO_BLOCKING_IDLE_THREADS.set(metrics.num_idle_blocking_threads() as f64);
    TOKIO_BLOCKING_QUEUE_DEPTH.set(metrics.blocking_queue_depth() as f64);
    TOKIO_ALIVE_TASKS.set(metrics.num_alive_tasks() as f64);

    let num_workers = metrics.num_workers();
    prev.ensure_capacity(num_workers);

    for w in 0..num_workers {
        let worker_label = w.to_string();
        let mean_poll = metrics.worker_mean_poll_time(w);

        TOKIO_WORKER_MEAN_POLL_TIME_NS
            .with_label_values(&[&worker_label])
            .set(mean_poll.as_nanos() as i64);

        TOKIO_WORKER_LOCAL_QUEUE_DEPTH
            .with_label_values(&[&worker_label])
            .set(metrics.worker_local_queue_depth(w) as i64);

        // Monotonically increasing totals: track deltas so we use inc_by on a Counter.
        let park = metrics.worker_park_count(w);
        TOKIO_WORKER_PARK_COUNT_TOTAL
            .with_label_values(&[&worker_label])
            .inc_by(park.saturating_sub(prev.park[w]));
        prev.park[w] = park;

        let steal = metrics.worker_steal_count(w);
        TOKIO_WORKER_STEAL_COUNT_TOTAL
            .with_label_values(&[&worker_label])
            .inc_by(steal.saturating_sub(prev.steal[w]));
        prev.steal[w] = steal;

        let overflow = metrics.worker_overflow_count(w);
        TOKIO_WORKER_OVERFLOW_COUNT_TOTAL
            .with_label_values(&[&worker_label])
            .inc_by(overflow.saturating_sub(prev.overflow[w]));
        prev.overflow[w] = overflow;

        // Busy ratio: total_busy_duration over 1s interval -> ratio. We don't have delta here;
        // use mean_poll_time as proxy: if high, worker is busy. Store as 0-1000 (per mille).
        let busy_proxy = (mean_poll.as_secs_f64() / 0.001).min(1.0); // 1ms = saturated
        TOKIO_WORKER_BUSY_RATIO_VEC
            .with_label_values(&[&worker_label])
            .set((busy_proxy * 1000.0) as i64);
    }
}