frontend_perf.rs 7.76 KB
Newer Older
1
2
3
4
5
6
7
// SPDX-FileCopyrightText: Copyright (c) 2026-2027 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

//! Frontend pipeline stage and finer-grained perf metrics.
//! Used by both runtime (route, transport_roundtrip) and llm (preprocess, postprocess, tokenize, template, detokenize).

use once_cell::sync::{Lazy, OnceCell};
8
use prometheus::{Counter, Histogram, HistogramOpts, HistogramVec, IntGaugeVec, Opts, Registry};
9
10
11
12

use super::prometheus_names::{frontend_perf, name_prefix};
use crate::MetricsRegistry;

13
14
pub use super::prometheus_names::frontend_perf::{STAGE_DISPATCH, STAGE_PREPROCESS, STAGE_ROUTE};

15
16
17
18
fn frontend_metric_name(suffix: &str) -> String {
    format!("{}_{}", name_prefix::FRONTEND, suffix)
}

19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/// Per-stage inflight request count: preprocess, route, dispatch.
/// Labels: stage (pipeline stage), phase (prefill/decode/aggregated or empty for preprocess).
pub static STAGE_REQUESTS: Lazy<IntGaugeVec> = Lazy::new(|| {
    IntGaugeVec::new(
        Opts::new(
            frontend_metric_name(frontend_perf::STAGE_REQUESTS),
            "Number of requests currently in the given pipeline stage",
        ),
        &["stage", "phase"],
    )
    .expect("failed to create dynamo_frontend_stage_requests gauge")
});

/// RAII guard that increments a per-stage gauge on creation and decrements on drop.
///
/// Used to track how many requests are in each frontend pipeline stage at any given time.
/// Create with [`StageGuard::new`] at stage entry; the gauge decrements automatically when
/// the guard is dropped (end of scope, explicit drop, or stream completion).
pub struct StageGuard {
    gauge: prometheus::IntGauge,
}

impl StageGuard {
    /// Increment the stage gauge and return a guard that decrements on drop.
    ///
    /// * `stage` — pipeline stage name; use `frontend_perf::STAGE_{PREPROCESS,ROUTE,DISPATCH}`
    ///   constants from [`crate::metrics::prometheus_names`].
    /// * `phase` — request phase; use [`RequestPhase::to_string`] output
    ///   (`"prefill"|"decode"|"aggregated"`), or `""` for stages without a phase.
    pub fn new(stage: &str, phase: &str) -> Self {
        let gauge = STAGE_REQUESTS.with_label_values(&[stage, phase]);
        gauge.inc();
        Self { gauge }
    }
}

impl Drop for StageGuard {
    fn drop(&mut self) {
        self.gauge.dec();
    }
}

61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
/// Per-stage latency: preprocess, route, transport_roundtrip, postprocess.
pub static STAGE_DURATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
    HistogramVec::new(
        HistogramOpts::new(
            frontend_metric_name(frontend_perf::STAGE_DURATION_SECONDS),
            "Pipeline stage duration (seconds)",
        )
        .buckets(vec![
            0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 2.5, 5.0,
        ]),
        &["stage"],
    )
    .expect("stage_duration_seconds histogram vec")
});

/// Tokenization time in preprocessor (gather_tokens).
pub static TOKENIZE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
    Histogram::with_opts(
        HistogramOpts::new(
            frontend_metric_name(frontend_perf::TOKENIZE_SECONDS),
            "Tokenization time in preprocessor (seconds)",
        )
        .buckets(vec![
            0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0,
        ]),
    )
    .expect("tokenize_seconds histogram")
});

/// Template application time in preprocessor (apply_template).
pub static TEMPLATE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
    Histogram::with_opts(
        HistogramOpts::new(
            frontend_metric_name(frontend_perf::TEMPLATE_SECONDS),
            "Template application time in preprocessor (seconds)",
        )
        .buckets(vec![
            0.00001, 0.00005, 0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05,
        ]),
    )
    .expect("template_seconds histogram")
});

104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
/// Cumulative detokenization time across all tokens (microseconds).
/// Use `rate(total) / rate(count)` in Prometheus to derive per-token average.
pub static DETOKENIZE_TOTAL_US: Lazy<Counter> = Lazy::new(|| {
    Counter::with_opts(Opts::new(
        frontend_metric_name(frontend_perf::DETOKENIZE_TOTAL_US),
        "Cumulative detokenization time (microseconds)",
    ))
    .expect("detokenize_total_us counter")
});

/// Total number of tokens detokenized.
pub static DETOKENIZE_TOKEN_COUNT: Lazy<Counter> = Lazy::new(|| {
    Counter::with_opts(Opts::new(
        frontend_metric_name(frontend_perf::DETOKENIZE_TOKEN_COUNT),
        "Total tokens detokenized",
    ))
    .expect("detokenize_token_count counter")
121
122
123
124
125
126
127
128
129
130
131
132
133
});

/// Guards idempotency for the `MetricsRegistry` registration path.
static REGISTERED: OnceCell<()> = OnceCell::new();

/// Guards idempotency for the raw `prometheus::Registry` registration path.
/// Kept separate from `REGISTERED` so that calling `ensure_frontend_perf_metrics_registered`
/// first does not silently prevent the metrics from being registered in the prometheus registry.
static PROMETHEUS_REGISTERED: OnceCell<()> = OnceCell::new();

/// Register frontend perf metrics with the given registry. Idempotent.
pub fn ensure_frontend_perf_metrics_registered(registry: &MetricsRegistry) {
    let _ = REGISTERED.get_or_init(|| {
134
        registry.add_metric(Box::new(STAGE_REQUESTS.clone())).ok();
135
136
137
138
139
140
        registry
            .add_metric(Box::new(STAGE_DURATION_SECONDS.clone()))
            .ok();
        registry.add_metric(Box::new(TOKENIZE_SECONDS.clone())).ok();
        registry.add_metric(Box::new(TEMPLATE_SECONDS.clone())).ok();
        registry
141
142
143
144
            .add_metric(Box::new(DETOKENIZE_TOTAL_US.clone()))
            .ok();
        registry
            .add_metric(Box::new(DETOKENIZE_TOKEN_COUNT.clone()))
145
146
147
148
149
150
151
152
153
154
155
156
            .ok();
    });
}

/// Register frontend perf metrics with a raw Prometheus registry (e.g. for LLM HTTP service /metrics).
/// Idempotent. Call this when the service exposes /metrics from its own registry.
pub fn ensure_frontend_perf_metrics_registered_prometheus(
    registry: &Registry,
) -> Result<(), prometheus::Error> {
    if PROMETHEUS_REGISTERED.get().is_some() {
        return Ok(());
    }
157
    registry.register(Box::new(STAGE_REQUESTS.clone()))?;
158
159
160
    registry.register(Box::new(STAGE_DURATION_SECONDS.clone()))?;
    registry.register(Box::new(TOKENIZE_SECONDS.clone()))?;
    registry.register(Box::new(TEMPLATE_SECONDS.clone()))?;
161
162
    registry.register(Box::new(DETOKENIZE_TOTAL_US.clone()))?;
    registry.register(Box::new(DETOKENIZE_TOKEN_COUNT.clone()))?;
163
164
165
    let _ = PROMETHEUS_REGISTERED.set(());
    Ok(())
}
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_stage_guard_inc_dec() {
        let gauge = STAGE_REQUESTS.with_label_values(&["test_stage", "test_phase"]);
        assert_eq!(gauge.get(), 0);

        {
            let _guard = StageGuard::new("test_stage", "test_phase");
            assert_eq!(gauge.get(), 1);

            {
                let _guard2 = StageGuard::new("test_stage", "test_phase");
                assert_eq!(gauge.get(), 2);
            }
            // guard2 dropped
            assert_eq!(gauge.get(), 1);
        }
        // guard dropped
        assert_eq!(gauge.get(), 0);
    }

    #[test]
    fn test_stage_guard_different_labels() {
        let preprocess = STAGE_REQUESTS.with_label_values(&["preprocess_t", ""]);
        let route_prefill = STAGE_REQUESTS.with_label_values(&["route_t", "prefill"]);
        let route_decode = STAGE_REQUESTS.with_label_values(&["route_t", "decode"]);

        let _g1 = StageGuard::new("preprocess_t", "");
        let _g2 = StageGuard::new("route_t", "prefill");
        let _g3 = StageGuard::new("route_t", "decode");

        assert_eq!(preprocess.get(), 1);
        assert_eq!(route_prefill.get(), 1);
        assert_eq!(route_decode.get(), 1);

        drop(_g2);
        assert_eq!(preprocess.get(), 1);
        assert_eq!(route_prefill.get(), 0);
        assert_eq!(route_decode.get(), 1);
    }
}