timing.rs 10.6 KB
Newer Older
1
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
// SPDX-License-Identifier: Apache-2.0

4
//! Per-request tracker for capturing request lifecycle metrics.
5
//!
6
//! This module provides [`RequestTracker`] for tracking timing and routing information
7
8
//! that can be returned to clients via the `nvext` response field.

9
use std::sync::{Arc, OnceLock};
10
use std::time::{Instant, SystemTime, UNIX_EPOCH};
11
12
13
14

use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
15
use utoipa::ToSchema;
16

17
18
19
use crate::protocols::openai::nvext::WorkerIdInfo;

/// Phase of the request in disaggregated serving.
20
///
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/// Used to determine which worker ID field to record when routing.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum RequestPhase {
    /// Prefill-only phase (disaggregated serving)
    Prefill,
    /// Decode phase (disaggregated serving)
    Decode,
    /// Aggregated mode - same worker handles both prefill and decode
    #[default]
    Aggregated,
}

impl std::fmt::Display for RequestPhase {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            RequestPhase::Prefill => write!(f, "prefill"),
            RequestPhase::Decode => write!(f, "decode"),
            RequestPhase::Aggregated => write!(f, "aggregated"),
        }
    }
}

/// Per-request tracker for timing and routing metrics.
///
/// Captures information throughout the request lifecycle:
46
/// - `request_received`: When the request was received
47
/// - `prefill_start_time`: When prefill started (for disaggregated serving)
48
49
/// - `first_token_time`: When the first token was generated (set once via OnceLock)
/// - `request_finish_time`: When the request finished (set once via OnceLock)
50
/// - KV cache hit rate information
51
///
52
/// The `OnceLock` fields ensure that values are set exactly once,
53
54
/// which is important for disaggregated serving where the "first token"
/// might appear multiple times.
55
56
#[derive(Debug)]
pub struct RequestTracker {
57
58
59
60
61
62
    /// When the request was received (monotonic clock for duration calculations)
    request_received: Instant,

    /// When the request was received (wall clock time as epoch milliseconds)
    request_received_epoch_ms: u64,

63
64
65
    /// When prefill started (for disaggregated serving) - set once via OnceLock
    prefill_start_time: OnceLock<Instant>,

66
67
68
69
70
    /// When the first token was generated - set once via OnceLock
    first_token_time: OnceLock<Instant>,

    /// When the request finished - set once via OnceLock
    request_finish_time: OnceLock<Instant>,
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85

    /// KV cache overlap blocks (prefix cache hits) - set once via OnceLock
    kv_overlap_blocks: OnceLock<u32>,

    /// Input sequence length in blocks (for hit rate calculation) - set once via OnceLock
    isl_blocks: OnceLock<usize>,

    /// Prefill worker ID (for disaggregated serving) - set once via OnceLock
    prefill_worker_id: OnceLock<u64>,

    /// Decode worker ID - set once via OnceLock
    decode_worker_id: OnceLock<u64>,

    /// Request phase (Prefill/Decode/Aggregated)
    phase: Mutex<RequestPhase>,
86
87
88
89
90
91

    /// Semaphore for coordinating phase transitions.
    /// Acquiring a permit blocks subsequent set_phase calls until the permit is dropped.
    /// This prevents race conditions in the bootstrap optimization path where prefill
    /// runs in background and needs to complete record_worker before phase changes.
    phase_semaphore: Arc<Semaphore>,
92
93
}

94
95
impl RequestTracker {
    /// Create a new request tracker, capturing the current time as request received.
96
97
98
99
100
101
102
    pub fn new() -> Self {
        let now = Instant::now();
        let epoch_ms = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .map(|d| d.as_millis() as u64)
            .unwrap_or(0);

103
        RequestTracker {
104
105
            request_received: now,
            request_received_epoch_ms: epoch_ms,
106
            prefill_start_time: OnceLock::new(),
107
108
            first_token_time: OnceLock::new(),
            request_finish_time: OnceLock::new(),
109
110
111
112
113
            kv_overlap_blocks: OnceLock::new(),
            isl_blocks: OnceLock::new(),
            prefill_worker_id: OnceLock::new(),
            decode_worker_id: OnceLock::new(),
            phase: Mutex::new(RequestPhase::Aggregated),
114
            phase_semaphore: Arc::new(Semaphore::new(1)),
115
116
117
        }
    }

118
119
120
121
122
    /// Record when prefill started. Returns true if this was the first call.
    pub fn record_prefill_start(&self) -> bool {
        self.prefill_start_time.set(Instant::now()).is_ok()
    }

123
124
125
126
127
128
129
130
    pub fn record_first_token(&self) -> bool {
        self.first_token_time.set(Instant::now()).is_ok()
    }

    pub fn record_finish(&self) -> bool {
        self.request_finish_time.set(Instant::now()).is_ok()
    }

131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
    /// Record KV cache hit information. Returns true if this was the first call.
    pub fn record_kv_hit(&self, overlap_blocks: u32, isl_blocks: usize) -> bool {
        let overlap_set = self.kv_overlap_blocks.set(overlap_blocks).is_ok();
        let isl_set = self.isl_blocks.set(isl_blocks).is_ok();
        overlap_set && isl_set
    }

    /// Time from request received to prefill start (queue/wait time) in milliseconds.
    pub fn prefill_wait_time_ms(&self) -> Option<f64> {
        self.prefill_start_time
            .get()
            .map(|t| t.duration_since(self.request_received).as_secs_f64() * 1000.0)
    }

    /// Time from prefill start to first token (prefill execution time) in milliseconds.
    pub fn prefill_time_ms(&self) -> Option<f64> {
        let prefill_start = self.prefill_start_time.get()?;
        let first_token = self.first_token_time.get()?;
        Some(first_token.duration_since(*prefill_start).as_secs_f64() * 1000.0)
    }

152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
    pub fn ttft_ms(&self) -> Option<f64> {
        self.first_token_time
            .get()
            .map(|t| t.duration_since(self.request_received).as_secs_f64() * 1000.0)
    }

    pub fn total_time_ms(&self) -> Option<f64> {
        self.request_finish_time
            .get()
            .map(|t| t.duration_since(self.request_received).as_secs_f64() * 1000.0)
    }

    pub fn request_received_epoch_ms(&self) -> u64 {
        self.request_received_epoch_ms
    }

168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
    /// KV cache hit rate as a ratio (0.0 to 1.0).
    pub fn kv_hit_rate(&self) -> Option<f64> {
        let overlap = *self.kv_overlap_blocks.get()?;
        let isl = *self.isl_blocks.get()?;
        if isl == 0 {
            return None;
        }
        Some(overlap as f64 / isl as f64)
    }

    /// Record the prefill worker ID. Returns true if this was the first call.
    pub fn record_prefill_worker(&self, id: u64) -> bool {
        self.prefill_worker_id.set(id).is_ok()
    }

    /// Record the decode worker ID. Returns true if this was the first call.
    pub fn record_decode_worker(&self, id: u64) -> bool {
        self.decode_worker_id.set(id).is_ok()
    }

188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
    /// Set the request phase and return a permit that blocks subsequent phase changes.
    ///
    /// The returned permit must be dropped to allow the next `set_phase` call to proceed.
    /// Under normal operation, callers can simply ignore the returned permit (letting it
    /// drop immediately). In the bootstrap optimization path, the permit is held and
    /// passed to the spawned prefill task, which drops it after `record_worker` completes.
    ///
    /// This prevents the race condition where the phase changes to Decode before the
    /// background prefill task has recorded its worker ID.
    pub async fn set_phase(&self, phase: RequestPhase) -> OwnedSemaphorePermit {
        let permit = self
            .phase_semaphore
            .clone()
            .acquire_owned()
            .await
            .expect("phase semaphore should never be closed");
        *self.phase.lock() = phase;
        permit
206
207
208
209
    }

    /// Get the current request phase.
    pub fn phase(&self) -> RequestPhase {
210
        *self.phase.lock()
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
    }

    /// Record worker ID based on the current phase.
    ///
    /// - Prefill phase: records as prefill_worker_id
    /// - Decode phase: records as decode_worker_id
    /// - Aggregated phase: records as both prefill and decode worker
    pub fn record_worker(&self, instance_id: u64) {
        match self.phase() {
            RequestPhase::Prefill => {
                self.record_prefill_worker(instance_id);
            }
            RequestPhase::Decode => {
                self.record_decode_worker(instance_id);
            }
            RequestPhase::Aggregated => {
                self.record_prefill_worker(instance_id);
                self.record_decode_worker(instance_id);
            }
        }
    }

    /// Get worker ID information if any worker IDs have been recorded.
    pub fn get_worker_info(&self) -> Option<WorkerIdInfo> {
        let prefill = self.prefill_worker_id.get().copied();
        let decode = self.decode_worker_id.get().copied();

        if prefill.is_none() && decode.is_none() {
            return None;
        }

        Some(WorkerIdInfo {
            prefill_worker_id: prefill,
            decode_worker_id: decode,
        })
    }

248
249
250
    pub fn get_timing_info(&self) -> TimingInfo {
        TimingInfo {
            request_received_ms: self.request_received_epoch_ms,
251
252
            prefill_wait_time_ms: self.prefill_wait_time_ms(),
            prefill_time_ms: self.prefill_time_ms(),
253
254
            ttft_ms: self.ttft_ms(),
            total_time_ms: self.total_time_ms(),
255
            kv_hit_rate: self.kv_hit_rate(),
256
257
258
259
        }
    }
}

260
impl Default for RequestTracker {
261
262
263
264
265
266
267
268
269
    fn default() -> Self {
        Self::new()
    }
}

/// Timing information for response injection.
///
/// This struct is serialized and included in the response's `nvext` field
/// when the client requests timing information via `extra_fields: ["timing"]`.
270
#[derive(ToSchema, Serialize, Deserialize, Debug, Clone, PartialEq)]
271
272
273
274
pub struct TimingInfo {
    /// When the request was received (epoch milliseconds)
    pub request_received_ms: u64,

275
276
277
278
279
280
281
282
    /// Time from request received to prefill start (queue/wait time) in milliseconds
    #[serde(skip_serializing_if = "Option::is_none")]
    pub prefill_wait_time_ms: Option<f64>,

    /// Time from prefill start to first token (prefill execution time) in milliseconds
    #[serde(skip_serializing_if = "Option::is_none")]
    pub prefill_time_ms: Option<f64>,

283
284
285
286
287
288
289
    /// Time to first token in milliseconds
    #[serde(skip_serializing_if = "Option::is_none")]
    pub ttft_ms: Option<f64>,

    /// Total request time in milliseconds
    #[serde(skip_serializing_if = "Option::is_none")]
    pub total_time_ms: Option<f64>,
290
291
292
293

    /// KV cache hit rate (0.0 to 1.0) - ratio of cached blocks to total input blocks
    #[serde(skip_serializing_if = "Option::is_none")]
    pub kv_hit_rate: Option<f64>,
294
}