Unverified Commit 2d517e77 authored by Yongming Ding's avatar Yongming Ding Committed by GitHub
Browse files

feat(mocker): improve mocker's perf timing accuracy (#6100)


Signed-off-by: default avatarYongming Ding <yongmingd@nvidia.com>
parent a82acfa0
......@@ -2128,6 +2128,7 @@ dependencies = [
"serde",
"serde_json",
"tokio",
"tokio-timerfd",
"tokio-util",
"tracing",
"uuid",
......@@ -4314,6 +4315,12 @@ dependencies = [
"zlib-rs",
]
[[package]]
name = "linux-raw-sys"
version = "0.4.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab"
[[package]]
name = "linux-raw-sys"
version = "0.11.0"
......@@ -6867,6 +6874,19 @@ dependencies = [
"semver",
]
[[package]]
name = "rustix"
version = "0.38.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154"
dependencies = [
"bitflags 2.10.0",
"errno",
"libc",
"linux-raw-sys 0.4.15",
"windows-sys 0.59.0",
]
[[package]]
name = "rustix"
version = "1.1.2"
......@@ -6876,7 +6896,7 @@ dependencies = [
"bitflags 2.10.0",
"errno",
"libc",
"linux-raw-sys",
"linux-raw-sys 0.11.0",
"windows-sys 0.61.2",
]
......@@ -7796,7 +7816,7 @@ dependencies = [
"fastrand",
"getrandom 0.3.4",
"once_cell",
"rustix",
"rustix 1.1.2",
"windows-sys 0.61.2",
]
......@@ -7905,6 +7925,15 @@ dependencies = [
"time-core",
]
[[package]]
name = "timerfd"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84e482e368cf7efa2c8b570f476e5b9fd9fd5e9b9219fc567832b05f13511091"
dependencies = [
"rustix 0.38.44",
]
[[package]]
name = "tiny-keccak"
version = "2.0.2"
......@@ -8070,6 +8099,19 @@ dependencies = [
"tokio-stream",
]
[[package]]
name = "tokio-timerfd"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87eecdae9a9b793843b1df7a64bc136f203443c1ca9889b3c4a39590afa51094"
dependencies = [
"futures-core",
"libc",
"slab",
"timerfd",
"tokio",
]
[[package]]
name = "tokio-tungstenite"
version = "0.26.2"
......
......@@ -1760,6 +1760,7 @@ dependencies = [
"serde",
"serde_json",
"tokio",
"tokio-timerfd",
"tokio-util",
"tracing",
"uuid",
......@@ -3742,6 +3743,12 @@ dependencies = [
"zlib-rs",
]
[[package]]
name = "linux-raw-sys"
version = "0.4.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab"
[[package]]
name = "linux-raw-sys"
version = "0.11.0"
......@@ -6156,6 +6163,19 @@ dependencies = [
"semver",
]
[[package]]
name = "rustix"
version = "0.38.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154"
dependencies = [
"bitflags 2.10.0",
"errno",
"libc",
"linux-raw-sys 0.4.15",
"windows-sys 0.59.0",
]
[[package]]
name = "rustix"
version = "1.1.2"
......@@ -6165,7 +6185,7 @@ dependencies = [
"bitflags 2.10.0",
"errno",
"libc",
"linux-raw-sys",
"linux-raw-sys 0.11.0",
"windows-sys 0.61.2",
]
......@@ -6970,7 +6990,7 @@ dependencies = [
"fastrand",
"getrandom 0.3.4",
"once_cell",
"rustix",
"rustix 1.1.2",
"windows-sys 0.61.2",
]
......@@ -7070,6 +7090,15 @@ dependencies = [
"time-core",
]
[[package]]
name = "timerfd"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84e482e368cf7efa2c8b570f476e5b9fd9fd5e9b9219fc567832b05f13511091"
dependencies = [
"rustix 0.38.44",
]
[[package]]
name = "tiny-keccak"
version = "2.0.2"
......@@ -7211,6 +7240,19 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-timerfd"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87eecdae9a9b793843b1df7a64bc136f203443c1ca9889b3c4a39590afa51094"
dependencies = [
"futures-core",
"libc",
"slab",
"timerfd",
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.7.17"
......
......@@ -35,5 +35,8 @@ ndarray = "0.16"
ndarray-npy = "0.9"
ndarray-interp = "0.5"
[target.'cfg(target_os = "linux")'.dependencies]
tokio-timerfd = "0.2"
[dev-dependencies]
rstest = "0.18.2"
......@@ -41,8 +41,11 @@ use dynamo_kv_router::protocols::DpRank;
use dynamo_tokens::blocks::UniqueBlock;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::mpsc;
use tokio::time::Duration;
#[cfg(target_os = "linux")]
use tokio_timerfd::Delay;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
use validator::Validate;
......@@ -306,6 +309,7 @@ impl Scheduler {
args.speedup_ratio,
)
.await;
simulate_decode(
&mut state,
&mut kv_manager,
......@@ -387,7 +391,7 @@ async fn simulate_prefill(
worker_type: WorkerType,
speedup_ratio: f64,
) -> Duration {
let start_time = tokio::time::Instant::now();
let start_time = Instant::now();
let mut total_time = Duration::ZERO;
while let Some((prefill_compute, maybe_creation_signal, is_full_prefill)) =
......@@ -411,10 +415,21 @@ async fn simulate_prefill(
break;
}
}
if speedup_ratio > 0.0 {
let deadline =
start_time + Duration::from_secs_f64(total_time.as_secs_f64() / speedup_ratio);
tokio::time::sleep_until(deadline).await;
if speedup_ratio > 0.0 && total_time > Duration::ZERO {
let sleep_duration = Duration::from_secs_f64(total_time.as_secs_f64() / speedup_ratio);
let deadline = start_time + sleep_duration;
#[cfg(target_os = "linux")]
{
if let Ok(delay) = Delay::new(deadline) {
let _ = delay.await;
}
}
#[cfg(not(target_os = "linux"))]
{
tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await;
}
}
total_time
......@@ -430,7 +445,8 @@ async fn simulate_decode(
block_size: usize,
speedup_ratio: f64,
) -> Duration {
let start_time = tokio::time::Instant::now();
let start_time = Instant::now();
// Compute decode timing
let active_kv_tokens = kv_manager.num_active_blocks() * block_size;
......@@ -493,10 +509,21 @@ async fn simulate_decode(
state.complete(&uuid);
}
}
if speedup_ratio > 0.0 {
let deadline =
start_time + Duration::from_secs_f64(total_time.as_secs_f64() / speedup_ratio);
tokio::time::sleep_until(deadline).await;
if speedup_ratio > 0.0 && total_time > Duration::ZERO {
let sleep_duration = Duration::from_secs_f64(total_time.as_secs_f64() / speedup_ratio);
let deadline = start_time + sleep_duration;
#[cfg(target_os = "linux")]
{
if let Ok(delay) = Delay::new(deadline) {
let _ = delay.await;
}
}
#[cfg(not(target_os = "linux"))]
{
tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await;
}
}
total_time
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment