timing.rs 27.5 KB
Newer Older
1
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
// SPDX-License-Identifier: Apache-2.0

4
//! Per-request tracker for capturing request lifecycle metrics.
5
//!
6
//! This module provides [`RequestTracker`] for tracking timing and routing information
7
8
//! that can be returned to clients via the `nvext` response field.

9
10
use std::sync::Arc;
use std::sync::OnceLock;
11
use std::sync::atomic::{AtomicU64, Ordering};
12
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
13
14
15
16

use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
17
use utoipa::ToSchema;
18

19
20
21
22
use crate::http::service::metrics::{
    WORKER_LAST_INPUT_SEQUENCE_TOKENS_GAUGE, WORKER_LAST_INTER_TOKEN_LATENCY_GAUGE,
    WORKER_LAST_TIME_TO_FIRST_TOKEN_GAUGE,
};
23
24
use crate::protocols::openai::nvext::WorkerIdInfo;

25
26
27
28
29
/// Worker type constants for Prometheus metric labels.
/// These are stored in RequestTracker at routing time to avoid costly MDC lookups
/// when updating per-worker metrics (TTFT, ITL).
pub const WORKER_TYPE_PREFILL: &str = "prefill";
pub const WORKER_TYPE_DECODE: &str = "decode";
30
const UNSET_DP_RANK_LABEL: &str = "none";
31

32
/// Phase of the request in disaggregated serving.
33
///
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/// Used to determine which worker ID field to record when routing.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum RequestPhase {
    /// Prefill-only phase (disaggregated serving)
    Prefill,
    /// Decode phase (disaggregated serving)
    Decode,
    /// Aggregated mode - same worker handles both prefill and decode
    #[default]
    Aggregated,
}

impl std::fmt::Display for RequestPhase {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            RequestPhase::Prefill => write!(f, "prefill"),
            RequestPhase::Decode => write!(f, "decode"),
            RequestPhase::Aggregated => write!(f, "aggregated"),
        }
    }
}

/// Per-request tracker for timing and routing metrics.
///
/// Captures information throughout the request lifecycle:
59
/// - `request_received`: When the request was received
60
/// - `prefill_start_time`: When prefill started (for disaggregated serving)
61
62
/// - `first_token_time`: When the first token was generated
/// - `request_finish_time`: When the last token was generated (updated incrementally)
63
/// - KV cache hit rate information
64
/// - Worker IDs and types for per-worker Prometheus metrics
65
///
66
67
68
69
70
71
72
/// ## Concurrency primitives
///
/// **`OnceLock` (first-write-wins):** Used for values that must capture the earliest
/// observation and ignore later writes. In disaggregated serving, both prefill and decode
/// phases may call `record_first_token`; `OnceLock` ensures the prefill phase's TTFT is
/// preserved. Also used for one-shot metadata: `prefill_start_time`, KV hit info,
/// ISL/cached tokens, worker types, and tokenizer latency.
73
///
74
75
76
77
78
79
/// **`Mutex` (last-write-wins):** Used for values where later phases should overwrite
/// earlier ones. `request_finish_time` is updated incrementally at each output block
/// boundary so that `avg_itl_ms()` stays current during streaming, and the decode
/// phase's final finish naturally overwrites the prefill phase's earlier finish.
/// `phase` also uses a Mutex since it transitions across phases.
///
80
81
/// **`AtomicU64`:** Used for frequently updated counters (`osl_tokens`) and
/// accumulated detokenize timing, where lock-free updates are beneficial.
82
83
#[derive(Debug)]
pub struct RequestTracker {
84
85
86
87
88
89
    /// When the request was received (monotonic clock for duration calculations)
    request_received: Instant,

    /// When the request was received (wall clock time as epoch milliseconds)
    request_received_epoch_ms: u64,

90
91
92
    /// When prefill started (for disaggregated serving) - set once via OnceLock
    prefill_start_time: OnceLock<Instant>,

93
94
95
    /// When the first token was generated (set once via OnceLock).
    /// In disaggregated serving, the prefill phase records this first and the
    /// decode phase's attempt is silently ignored, preserving the real TTFT.
96
97
    first_token_time: OnceLock<Instant>,

98
99
100
    /// When the request finished. Mutex allows the last router phase to
    /// record the final finish time.
    request_finish_time: Mutex<Option<Instant>>,
101
102
103
104
105
106
107

    /// KV cache overlap blocks (prefix cache hits) - set once via OnceLock
    kv_overlap_blocks: OnceLock<u32>,

    /// Input sequence length in blocks (for hit rate calculation) - set once via OnceLock
    isl_blocks: OnceLock<usize>,

108
109
110
111
112
113
114
115
116
    /// Input sequence length in tokens - set once via OnceLock
    isl_tokens: OnceLock<usize>,

    /// Number of cached tokens (overlap_blocks * block_size) - set once via OnceLock
    cached_tokens: OnceLock<usize>,

    /// Output sequence length in tokens - updated atomically as tokens stream back
    osl_tokens: AtomicU64,

117
118
    /// Prefill worker ID (for disaggregated serving) - set once when known.
    prefill_worker_id: OnceLock<u64>,
119

120
121
    /// Prefill DP rank - set once when known.
    prefill_dp_rank: OnceLock<u32>,
122

123
124
    /// Decode worker ID - set once when known.
    decode_worker_id: OnceLock<u64>,
125

126
127
    /// Decode DP rank - set once when known.
    decode_dp_rank: OnceLock<u32>,
128
129
130
131
132
133
134

    /// Worker type for the prefill worker ("prefill" or "decode").
    /// Stored at routing time to avoid MDC lookup when updating Prometheus metrics.
    /// In aggregated mode, this will be "decode" since the same worker handles both.
    /// This is necessary because TTFT metrics need to know the worker type label,
    /// and looking up MDC by worker_id would require iterating all cards (O(n)).
    prefill_worker_type: OnceLock<&'static str>,
135

136
137
138
    /// Worker type for the decode worker (always "decode").
    /// Stored for symmetry with prefill_worker_type, though decode is always "decode".
    decode_worker_type: OnceLock<&'static str>,
139
140
141

    /// Request phase (Prefill/Decode/Aggregated)
    phase: Mutex<RequestPhase>,
142
143
144
145

    /// Semaphore for coordinating phase transitions.
    /// Acquiring a permit blocks subsequent set_phase calls until the permit is dropped.
    /// This prevents race conditions in the bootstrap optimization path where prefill
146
    /// runs in background and needs to complete worker recording before phase changes.
147
    phase_semaphore: Arc<Semaphore>,
148
149

    /// How long it took to tokenize the input
150
151
152
153
154
155
156
    tokenize_latency: OnceLock<Duration>,

    /// Accumulated time spent detokenizing output tokens for this request (nanoseconds)
    detokenize_total_ns: AtomicU64,

    /// Number of detokenize samples accumulated for this request
    detokenize_count: AtomicU64,
157
158
159

    /// Router scheduler queue depth at routing time (how many requests were pending)
    router_queue_depth: OnceLock<usize>,
160
161
}

162
163
impl RequestTracker {
    /// Create a new request tracker, capturing the current time as request received.
164
165
166
167
168
169
170
    pub fn new() -> Self {
        let now = Instant::now();
        let epoch_ms = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .map(|d| d.as_millis() as u64)
            .unwrap_or(0);

171
        RequestTracker {
172
173
            request_received: now,
            request_received_epoch_ms: epoch_ms,
174
            prefill_start_time: OnceLock::new(),
175
            first_token_time: OnceLock::new(),
176
            request_finish_time: Mutex::new(None),
177
178
            kv_overlap_blocks: OnceLock::new(),
            isl_blocks: OnceLock::new(),
179
180
181
            isl_tokens: OnceLock::new(),
            cached_tokens: OnceLock::new(),
            osl_tokens: AtomicU64::new(0),
182
183
184
185
            prefill_worker_id: OnceLock::new(),
            prefill_dp_rank: OnceLock::new(),
            decode_worker_id: OnceLock::new(),
            decode_dp_rank: OnceLock::new(),
186
187
            prefill_worker_type: OnceLock::new(),
            decode_worker_type: OnceLock::new(),
188
            phase: Mutex::new(RequestPhase::Aggregated),
189
            phase_semaphore: Arc::new(Semaphore::new(1)),
190
191
192
            tokenize_latency: OnceLock::new(),
            detokenize_total_ns: AtomicU64::new(0),
            detokenize_count: AtomicU64::new(0),
193
            router_queue_depth: OnceLock::new(),
194
195
196
        }
    }

197
198
199
200
201
    /// Record when prefill started. Returns true if this was the first call.
    pub fn record_prefill_start(&self) -> bool {
        self.prefill_start_time.set(Instant::now()).is_ok()
    }

202
203
    pub fn record_first_token(&self) {
        let _ = self.first_token_time.set(Instant::now());
204
205
    }

206
207
    pub fn record_finish(&self) {
        *self.request_finish_time.lock() = Some(Instant::now());
208
209
    }

210
211
212
213
214
215
216
    /// Record KV cache hit information. Returns true if this was the first call.
    pub fn record_kv_hit(&self, overlap_blocks: u32, isl_blocks: usize) -> bool {
        let overlap_set = self.kv_overlap_blocks.set(overlap_blocks).is_ok();
        let isl_set = self.isl_blocks.set(isl_blocks).is_ok();
        overlap_set && isl_set
    }

217
218
    /// Record input sequence length in tokens and cached token count when known.
    pub fn record_isl(&self, isl_tokens: usize, cached_tokens: Option<usize>) {
219
        let _ = self.isl_tokens.set(isl_tokens);
220
221
222
        if let Some(cached_tokens) = cached_tokens {
            let _ = self.cached_tokens.set(cached_tokens);
        }
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
    }

    pub fn isl_tokens(&self) -> Option<usize> {
        self.isl_tokens.get().copied()
    }

    pub fn cached_tokens(&self) -> Option<usize> {
        self.cached_tokens.get().copied()
    }

    /// Record current output sequence length in tokens. Updated at each output block boundary.
    pub fn record_osl(&self, osl: usize) {
        self.osl_tokens.store(osl as u64, Ordering::Relaxed);
    }

    pub fn osl_tokens(&self) -> u64 {
        self.osl_tokens.load(Ordering::Relaxed)
    }

242
243
244
245
246
247
248
249
250
251
252
253
254
255
    /// Time from request received to prefill start (queue/wait time) in milliseconds.
    pub fn prefill_wait_time_ms(&self) -> Option<f64> {
        self.prefill_start_time
            .get()
            .map(|t| t.duration_since(self.request_received).as_secs_f64() * 1000.0)
    }

    /// Time from prefill start to first token (prefill execution time) in milliseconds.
    pub fn prefill_time_ms(&self) -> Option<f64> {
        let prefill_start = self.prefill_start_time.get()?;
        let first_token = self.first_token_time.get()?;
        Some(first_token.duration_since(*prefill_start).as_secs_f64() * 1000.0)
    }

256
    pub fn ttft_ms(&self) -> Option<f64> {
257
258
259
260
261
262
263
        let first_token = self.first_token_time.get()?;
        Some(
            first_token
                .duration_since(self.request_received)
                .as_secs_f64()
                * 1000.0,
        )
264
265
266
    }

    pub fn total_time_ms(&self) -> Option<f64> {
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
        let finish = (*self.request_finish_time.lock())?;
        Some(finish.duration_since(self.request_received).as_secs_f64() * 1000.0)
    }

    /// Average inter-token latency in milliseconds.
    /// Computed as (finish_time - first_token_time) / (osl - 1).
    /// Returns None if fewer than 2 output tokens or times not recorded.
    pub fn avg_itl_ms(&self) -> Option<f64> {
        let first_token = *self.first_token_time.get()?;
        let finish = (*self.request_finish_time.lock())?;
        let osl = self.osl_tokens.load(Ordering::Relaxed);
        if osl < 2 {
            return None;
        }
        let decode_duration = finish.duration_since(first_token).as_secs_f64() * 1000.0;
        Some(decode_duration / (osl - 1) as f64)
283
284
285
286
287
288
    }

    pub fn request_received_epoch_ms(&self) -> u64 {
        self.request_received_epoch_ms
    }

289
290
291
292
293
294
295
296
297
298
    /// KV cache hit rate as a ratio (0.0 to 1.0).
    pub fn kv_hit_rate(&self) -> Option<f64> {
        let overlap = *self.kv_overlap_blocks.get()?;
        let isl = *self.isl_blocks.get()?;
        if isl == 0 {
            return None;
        }
        Some(overlap as f64 / isl as f64)
    }

299
300
301
    /// Set the request phase and return a permit that blocks subsequent phase changes.
    ///
    /// The returned permit must be dropped to allow the next `set_phase` call to proceed.
302
303
    /// In the bootstrap optimization path, the permit is held and passed to the spawned
    /// prefill task, ensuring routing completes before the phase changes.
304
305
306
307
308
309
310
311
312
    pub async fn set_phase(&self, phase: RequestPhase) -> OwnedSemaphorePermit {
        let permit = self
            .phase_semaphore
            .clone()
            .acquire_owned()
            .await
            .expect("phase semaphore should never be closed");
        *self.phase.lock() = phase;
        permit
313
314
315
316
    }

    /// Get the current request phase.
    pub fn phase(&self) -> RequestPhase {
317
        *self.phase.lock()
318
319
    }

320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
    fn record_once_u64(slot: &OnceLock<u64>, value: u64, field_name: &'static str) {
        if let Some(existing) = slot.get() {
            if *existing != value {
                tracing::error!(
                    field = field_name,
                    existing = *existing,
                    new = value,
                    "Conflicting request tracker write"
                );
            }
            return;
        }
        let _ = slot.set(value);
    }

    fn record_once_u32(slot: &OnceLock<u32>, value: u32, field_name: &'static str) {
        if let Some(existing) = slot.get() {
            if *existing != value {
                tracing::error!(
                    field = field_name,
                    existing = *existing,
                    new = value,
                    "Conflicting request tracker write"
                );
344
            }
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
            return;
        }
        let _ = slot.set(value);
    }

    fn record_once_worker_type(
        slot: &OnceLock<&'static str>,
        value: &'static str,
        field_name: &'static str,
    ) {
        if let Some(existing) = slot.get() {
            if *existing != value {
                tracing::error!(
                    field = field_name,
                    existing = *existing,
                    new = value,
                    "Conflicting request tracker write"
                );
363
            }
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
            return;
        }
        let _ = slot.set(value);
    }

    fn record_prefill_worker(
        &self,
        instance_id: u64,
        dp_rank: Option<u32>,
        worker_type: &'static str,
    ) {
        Self::record_once_u64(&self.prefill_worker_id, instance_id, "prefill_worker_id");
        if let Some(rank) = dp_rank {
            Self::record_once_u32(&self.prefill_dp_rank, rank, "prefill_dp_rank");
        }
        Self::record_once_worker_type(
            &self.prefill_worker_type,
            worker_type,
            "prefill_worker_type",
        );
    }

    fn record_decode_worker(
        &self,
        instance_id: u64,
        dp_rank: Option<u32>,
        worker_type: &'static str,
    ) {
        Self::record_once_u64(&self.decode_worker_id, instance_id, "decode_worker_id");
        if let Some(rank) = dp_rank {
            Self::record_once_u32(&self.decode_dp_rank, rank, "decode_dp_rank");
        }
        Self::record_once_worker_type(&self.decode_worker_type, worker_type, "decode_worker_type");
    }

    /// Record worker ID, optional DP rank, and worker type based on the current phase.
    ///
    /// Worker ID and type are recorded as soon as they are known. DP rank is recorded only
    /// when it is concrete, allowing the unresolved rank to remain unset until later.
    pub fn record_worker(&self, instance_id: u64, dp_rank: Option<u32>, worker_type: &'static str) {
        match self.phase() {
            RequestPhase::Prefill => self.record_prefill_worker(instance_id, dp_rank, worker_type),
            RequestPhase::Decode => self.record_decode_worker(instance_id, dp_rank, worker_type),
407
            RequestPhase::Aggregated => {
408
409
                self.record_prefill_worker(instance_id, dp_rank, worker_type);
                self.record_decode_worker(instance_id, dp_rank, worker_type);
410
411
412
413
            }
        }
    }

414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
    pub fn record_tokenize_latency(&self, l: Duration) {
        let _ = self.tokenize_latency.set(l);
    }

    pub fn tokenize_latency(&self) -> Option<Duration> {
        self.tokenize_latency.get().copied()
    }

    pub fn record_detokenize_latency(&self, l: Duration) {
        // u128 -> u64 is safe because max u64 in nanos is over 500 years
        let delta_ns = u64::try_from(l.as_nanos()).unwrap_or(u64::MAX);
        // On an x86 system these atomics are very cheap
        let _ = self.detokenize_total_ns.fetch_update(
            Ordering::Relaxed,
            Ordering::Relaxed,
            // Saturating add to avoid wrapping to a nonsensical average on overflow.
            |current| Some(current.saturating_add(delta_ns)),
        );
        self.detokenize_count.fetch_add(1, Ordering::Relaxed);
    }

    pub fn detokenize_total_latency(&self) -> Option<Duration> {
        let total_ns = self.detokenize_total_ns.load(Ordering::Relaxed);
        let count = self.detokenize_count.load(Ordering::Relaxed);
        if count == 0 {
            // We recorded no observations
            None
        } else {
            Some(Duration::from_nanos(total_ns))
        }
444
445
    }

446
447
    pub fn detokenize_count(&self) -> u64 {
        self.detokenize_count.load(Ordering::Relaxed)
448
449
    }

450
451
452
453
454
455
456
457
458
459
    /// Record router scheduler queue depth at routing time.
    pub fn record_router_queue_depth(&self, depth: usize) {
        let _ = self.router_queue_depth.set(depth);
    }

    /// Get the router scheduler queue depth recorded at routing time.
    pub fn router_queue_depth(&self) -> Option<usize> {
        self.router_queue_depth.get().copied()
    }

460
461
    /// Get worker ID information if any worker IDs have been recorded.
    pub fn get_worker_info(&self) -> Option<WorkerIdInfo> {
462
463
        let prefill = self.prefill_worker_id();
        let decode = self.decode_worker_id();
464
465
466
467
468
469
470

        if prefill.is_none() && decode.is_none() {
            return None;
        }

        Some(WorkerIdInfo {
            prefill_worker_id: prefill,
471
            prefill_dp_rank: self.prefill_dp_rank(),
472
            decode_worker_id: decode,
473
            decode_dp_rank: self.decode_dp_rank(),
474
475
476
        })
    }

477
478
    /// Get the decode worker ID if recorded.
    pub fn decode_worker_id(&self) -> Option<u64> {
479
        self.decode_worker_id.get().copied()
480
481
482
483
    }

    /// Get the decode DP rank if recorded.
    pub fn decode_dp_rank(&self) -> Option<u32> {
484
        self.decode_dp_rank.get().copied()
485
486
487
488
    }

    /// Get the prefill worker ID if recorded.
    pub fn prefill_worker_id(&self) -> Option<u64> {
489
        self.prefill_worker_id.get().copied()
490
491
492
493
    }

    /// Get the prefill DP rank if recorded.
    pub fn prefill_dp_rank(&self) -> Option<u32> {
494
        self.prefill_dp_rank.get().copied()
495
496
497
498
499
500
501
502
503
504
505
506
    }

    /// Get the prefill worker type if recorded.
    pub fn prefill_worker_type(&self) -> Option<&'static str> {
        self.prefill_worker_type.get().copied()
    }

    /// Get the decode worker type if recorded.
    pub fn decode_worker_type(&self) -> Option<&'static str> {
        self.decode_worker_type.get().copied()
    }

507
508
509
510
511
512
513
514
515
    /// Write TTFT and ISL to per-worker last gauges using prefill worker labels.
    /// Called from the Python binding path on first token.
    pub fn observe_first_token_gauges(&self) {
        let Some(worker_id) = self.prefill_worker_id() else {
            return;
        };
        let worker_id_str = worker_id.to_string();
        let dp_rank_str = self
            .prefill_dp_rank()
516
            .map_or(UNSET_DP_RANK_LABEL.to_string(), |r| r.to_string());
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
        let worker_type = self.prefill_worker_type().unwrap_or(WORKER_TYPE_PREFILL);
        let labels = &[worker_id_str.as_str(), dp_rank_str.as_str(), worker_type];

        if let Some(ttft) = self.ttft_ms() {
            WORKER_LAST_TIME_TO_FIRST_TOKEN_GAUGE
                .with_label_values(labels)
                .set(ttft / 1000.0);
        }
        if let Some(isl) = self.isl_tokens() {
            WORKER_LAST_INPUT_SEQUENCE_TOKENS_GAUGE
                .with_label_values(labels)
                .set(isl as i64);
        }
    }

    /// Write avg ITL to per-worker last gauge using decode worker labels.
    /// Called at each output block boundary and from the Python binding path.
    pub fn observe_finish_gauges(&self) {
        let Some(worker_id) = self.decode_worker_id() else {
            return;
        };
        let worker_id_str = worker_id.to_string();
        let dp_rank_str = self
            .decode_dp_rank()
541
            .map_or(UNSET_DP_RANK_LABEL.to_string(), |r| r.to_string());
542
543
544
545
546
547
548
549
550
551
        let worker_type = self.decode_worker_type().unwrap_or(WORKER_TYPE_DECODE);
        let labels = &[worker_id_str.as_str(), dp_rank_str.as_str(), worker_type];

        if let Some(avg_itl) = self.avg_itl_ms() {
            WORKER_LAST_INTER_TOKEN_LATENCY_GAUGE
                .with_label_values(labels)
                .set(avg_itl / 1000.0);
        }
    }

552
553
554
    pub fn get_timing_info(&self) -> TimingInfo {
        TimingInfo {
            request_received_ms: self.request_received_epoch_ms,
555
556
            prefill_wait_time_ms: self.prefill_wait_time_ms(),
            prefill_time_ms: self.prefill_time_ms(),
557
558
            ttft_ms: self.ttft_ms(),
            total_time_ms: self.total_time_ms(),
559
            kv_hit_rate: self.kv_hit_rate(),
560
            router_queue_depth: self.router_queue_depth(),
561
562
563
564
        }
    }
}

565
impl Default for RequestTracker {
566
567
568
569
570
571
572
573
574
    fn default() -> Self {
        Self::new()
    }
}

/// Timing information for response injection.
///
/// This struct is serialized and included in the response's `nvext` field
/// when the client requests timing information via `extra_fields: ["timing"]`.
575
#[derive(ToSchema, Serialize, Deserialize, Debug, Clone, PartialEq)]
576
577
578
579
pub struct TimingInfo {
    /// When the request was received (epoch milliseconds)
    pub request_received_ms: u64,

580
581
582
583
584
585
586
587
    /// Time from request received to prefill start (queue/wait time) in milliseconds
    #[serde(skip_serializing_if = "Option::is_none")]
    pub prefill_wait_time_ms: Option<f64>,

    /// Time from prefill start to first token (prefill execution time) in milliseconds
    #[serde(skip_serializing_if = "Option::is_none")]
    pub prefill_time_ms: Option<f64>,

588
589
590
591
592
593
594
    /// Time to first token in milliseconds
    #[serde(skip_serializing_if = "Option::is_none")]
    pub ttft_ms: Option<f64>,

    /// Total request time in milliseconds
    #[serde(skip_serializing_if = "Option::is_none")]
    pub total_time_ms: Option<f64>,
595
596
597
598

    /// KV cache hit rate (0.0 to 1.0) - ratio of cached blocks to total input blocks
    #[serde(skip_serializing_if = "Option::is_none")]
    pub kv_hit_rate: Option<f64>,
599
600
601
602

    /// Number of requests pending in the router scheduler queue at routing time
    #[serde(skip_serializing_if = "Option::is_none")]
    pub router_queue_depth: Option<usize>,
603
}
604
605
606
607
608
609
610
611
612
613
614

#[cfg(test)]
mod tests {
    use super::*;
    use std::thread;
    use std::time::Duration;

    #[test]
    fn test_record_isl_osl() {
        let tracker = RequestTracker::new();

615
        tracker.record_isl(512, Some(256));
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
        assert_eq!(tracker.isl_tokens(), Some(512));
        assert_eq!(tracker.cached_tokens(), Some(256));

        tracker.record_osl(100);
        assert_eq!(tracker.osl_tokens(), 100);
    }

    #[test]
    fn test_ttft_ms() {
        let tracker = RequestTracker::new();
        thread::sleep(Duration::from_millis(10));
        tracker.record_first_token();

        let ttft = tracker.ttft_ms().unwrap();
        assert!(ttft >= 5.0, "TTFT should be at least 5ms, got {ttft}");
    }

    #[test]
    fn test_ttft_ms_none_before_first_token() {
        let tracker = RequestTracker::new();
        assert!(tracker.ttft_ms().is_none());
    }

    #[test]
    fn test_avg_itl_ms() {
        let tracker = RequestTracker::new();
        tracker.record_first_token();
        thread::sleep(Duration::from_millis(20));
        tracker.record_osl(11); // 11 tokens => 10 inter-token gaps
        tracker.record_finish();

        let itl = tracker.avg_itl_ms().unwrap();
        assert!(itl > 0.0, "avg ITL should be positive, got {itl}");
    }

    #[test]
    fn test_avg_itl_ms_none_with_single_token() {
        let tracker = RequestTracker::new();
        tracker.record_first_token();
        tracker.record_osl(1);
        tracker.record_finish();

        assert!(
            tracker.avg_itl_ms().is_none(),
            "avg ITL should be None with < 2 output tokens"
        );
    }

    #[test]
    fn test_kv_hit_rate() {
        let tracker = RequestTracker::new();
        tracker.record_kv_hit(3, 10);

        let rate = tracker.kv_hit_rate().unwrap();
        assert!(
            (rate - 0.3).abs() < f64::EPSILON,
            "KV hit rate should be 0.3, got {rate}"
        );
    }

    #[test]
    fn test_kv_hit_rate_zero_isl() {
        let tracker = RequestTracker::new();
        tracker.record_kv_hit(0, 0);
        assert!(
            tracker.kv_hit_rate().is_none(),
            "KV hit rate should be None when isl_blocks is 0"
        );
    }

    #[test]
    fn test_total_time_ms() {
        let tracker = RequestTracker::new();
        thread::sleep(Duration::from_millis(10));
        tracker.record_finish();

        let total = tracker.total_time_ms().unwrap();
        assert!(
            total >= 5.0,
            "total time should be at least 5ms, got {total}"
        );
    }

699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
    #[test]
    fn test_router_queue_depth() {
        let tracker = RequestTracker::new();
        assert!(tracker.router_queue_depth().is_none());

        tracker.record_router_queue_depth(42);
        assert_eq!(tracker.router_queue_depth(), Some(42));

        // OnceLock: second write is ignored
        tracker.record_router_queue_depth(99);
        assert_eq!(tracker.router_queue_depth(), Some(42));

        let timing = tracker.get_timing_info();
        assert_eq!(timing.router_queue_depth, Some(42));
    }

715
716
717
718
    #[test]
    fn test_observe_first_token_gauges_no_panic_without_worker() {
        let tracker = RequestTracker::new();
        tracker.record_first_token();
719
        tracker.record_isl(100, Some(50));
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
        // No worker recorded — should return early without panic
        tracker.observe_first_token_gauges();
    }

    #[test]
    fn test_observe_finish_gauges_no_panic_without_worker() {
        let tracker = RequestTracker::new();
        tracker.record_first_token();
        tracker.record_osl(10);
        tracker.record_finish();
        // No worker recorded — should return early without panic
        tracker.observe_finish_gauges();
    }

    #[test]
    fn test_observe_first_token_gauges_with_worker() {
        let tracker = RequestTracker::new();
737
        tracker.record_worker(42, Some(0), WORKER_TYPE_PREFILL);
738
739
        thread::sleep(Duration::from_millis(5));
        tracker.record_first_token();
740
        tracker.record_isl(256, Some(128));
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761

        tracker.observe_first_token_gauges();

        let labels = &["42", "0", WORKER_TYPE_PREFILL];
        let ttft_val = WORKER_LAST_TIME_TO_FIRST_TOKEN_GAUGE
            .with_label_values(labels)
            .get();
        assert!(
            ttft_val > 0.0,
            "TTFT gauge should be positive after observe, got {ttft_val}"
        );

        let isl_val = WORKER_LAST_INPUT_SEQUENCE_TOKENS_GAUGE
            .with_label_values(labels)
            .get();
        assert_eq!(isl_val, 256, "ISL gauge should be 256, got {isl_val}");
    }

    #[test]
    fn test_observe_finish_gauges_with_worker() {
        let tracker = RequestTracker::new();
762
        tracker.record_worker(99, Some(1), WORKER_TYPE_DECODE);
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
        tracker.record_first_token();
        thread::sleep(Duration::from_millis(10));
        tracker.record_osl(5);
        tracker.record_finish();

        tracker.observe_finish_gauges();

        let labels = &["99", "1", WORKER_TYPE_DECODE];
        let itl_val = WORKER_LAST_INTER_TOKEN_LATENCY_GAUGE
            .with_label_values(labels)
            .get();
        assert!(
            itl_val > 0.0,
            "ITL gauge should be positive after observe, got {itl_val}"
        );
    }
}