"examples/multimodal_v1/graphs/agg.py" did not exist on "58df5aca4c3f58feaa903f76b06b09b87a6f6f28"
utils.rs 3.81 KB
Newer Older
1
2
3
4
5
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use std::time::{Duration, Instant};

6
use crate::common::protocols::{MockEngineArgs, WorkerType};
7

8
/// Compute the modeled handoff delay after a prefill worker emits its terminal token.
9
///
10
11
12
13
14
15
16
/// NOTE: this intentionally does not model the internal prefill TTFT itself accurately, and the
/// exact prefill/decode boundary is backend dependent. For now we only care about decode-visible
/// TTFT, which is what the client observes, so modeling the delay as prefill-to-decode handoff is
/// good enough.
pub fn compute_prefill_handoff_delay_ms(
    worker_type: WorkerType,
    completed: bool,
17
    num_input_tokens: usize,
18
19
20
21
22
23
24
25
    kv_transfer_bandwidth: Option<f64>,
    kv_bytes_per_token: Option<usize>,
) -> Option<f64> {
    if worker_type != WorkerType::Prefill || !completed {
        return None;
    }

    match (kv_transfer_bandwidth, kv_bytes_per_token) {
26
27
        (Some(bw), Some(bpt)) if bw > 0.0 => {
            let kv_bytes = num_input_tokens as f64 * bpt as f64;
28
            let delay_ms = kv_bytes / (bw * 1e9) * 1000.0;
29
30
31
32
            tracing::debug!(
                num_input_tokens,
                kv_bytes,
                bandwidth_gb_s = bw,
33
34
                delay_ms = format!("{delay_ms:.2}"),
                "KV handoff delay for prefill completion"
35
            );
36
            Some(delay_ms)
37
38
39
40
41
        }
        _ => None,
    }
}

42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/// Compute the KV transfer delay duration for a given number of input tokens.
///
/// Returns `None` if KV transfer simulation is disabled (bandwidth is 0 or not configured).
pub fn compute_kv_transfer_delay(
    args: &MockEngineArgs,
    num_input_tokens: usize,
) -> Option<Duration> {
    compute_prefill_handoff_delay_ms(
        args.worker_type,
        true,
        num_input_tokens,
        args.kv_transfer_bandwidth,
        args.kv_bytes_per_token,
    )
    .map(|delay_ms| Duration::from_secs_f64(delay_ms / 1000.0))
}

59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/// Sleep for the specified duration using timerfd on Linux for precision.
pub async fn sleep_precise(duration: Duration) {
    sleep_until_precise(Instant::now() + duration).await;
}

/// Sleep until the specified deadline using timerfd on Linux for precision.
///
/// Unlike `sleep_precise`, this accounts for time already elapsed since the
/// deadline's reference point, making it suitable for simulation loops where
/// computation time should be subtracted from the sleep.
pub async fn sleep_until_precise(deadline: Instant) {
    #[cfg(target_os = "linux")]
    {
        if let Ok(delay) = tokio_timerfd::Delay::new(deadline) {
            let _ = delay.await;
        } else {
            tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await;
        }
    }
    #[cfg(not(target_os = "linux"))]
    {
        tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await;
    }
}
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_prefill_handoff_delay_only_applies_to_completed_prefill() {
        let delay_ms = compute_prefill_handoff_delay_ms(
            WorkerType::Prefill,
            true,
            128,
            Some(1.0),
            Some(1_000_000),
        )
        .expect("prefill completion should produce a handoff delay");
        assert!((delay_ms - 128.0).abs() < 1e-9);

        assert!(
            compute_prefill_handoff_delay_ms(
                WorkerType::Prefill,
                false,
                128,
                Some(1.0),
                Some(1_000_000),
            )
            .is_none()
        );
        assert!(
            compute_prefill_handoff_delay_ms(
                WorkerType::Decode,
                true,
                128,
                Some(1.0),
                Some(1_000_000),
            )
            .is_none()
        );
    }
}