Unverified Commit 5d958623 authored by Yongming Ding's avatar Yongming Ding Committed by GitHub
Browse files

feat(mocker): add KV cache transfer latency for disagg serving (#6504)


Signed-off-by: default avatarYongming Ding <yongmingd@nvidia.com>
parent 8fbe7228
...@@ -27,6 +27,9 @@ The mocker engine now supports a vLLM-style CLI interface with individual argume ...@@ -27,6 +27,9 @@ The mocker engine now supports a vLLM-style CLI interface with individual argume
- `--num-workers`: Number of mocker workers to launch in the same process (default: 1). All workers share the same tokio runtime and thread pool - `--num-workers`: Number of mocker workers to launch in the same process (default: 1). All workers share the same tokio runtime and thread pool
- `--stagger-delay`: Delay in seconds between launching each worker to avoid overwhelming etcd/NATS/frontend. Set to 0 to disable staggering. Use -1 for auto mode (stagger dependent on number of workers). Default: -1 (auto) - `--stagger-delay`: Delay in seconds between launching each worker to avoid overwhelming etcd/NATS/frontend. Set to 0 to disable staggering. Use -1 for auto mode (stagger dependent on number of workers). Default: -1 (auto)
- `--disaggregation-mode prefill` / `--disaggregation-mode decode`: Whether the worker is a prefill or decode worker for disaggregated deployment. If not specified, mocker will be in aggregated mode. - `--disaggregation-mode prefill` / `--disaggregation-mode decode`: Whether the worker is a prefill or decode worker for disaggregated deployment. If not specified, mocker will be in aggregated mode.
- `--kv-transfer-bandwidth`: KV cache transfer bandwidth in GB/s for disaggregated serving latency simulation (default: 64.0, inter-node InfiniBand). Set to 0 to disable. For intra-node NVLink, typical value is ~450.
- `--kv-cache-dtype`: Data type for KV cache, used to compute kv_bytes_per_token. "auto" uses the model's torch dtype (default).
- `--kv-bytes-per-token`: KV cache bytes per token. If not specified, auto-computed from model config.
**Environment variables:** **Environment variables:**
......
...@@ -11,6 +11,7 @@ from pathlib import Path ...@@ -11,6 +11,7 @@ from pathlib import Path
from dynamo.common.utils.namespace import get_worker_namespace from dynamo.common.utils.namespace import get_worker_namespace
from . import __version__ from . import __version__
from .utils.kv_cache import DEFAULT_KV_TRANSFER_BANDWIDTH_GBPS
from .utils.planner_profiler_perf_data_converter import ( from .utils.planner_profiler_perf_data_converter import (
convert_profile_results_to_npz, convert_profile_results_to_npz,
is_mocker_format_npz, is_mocker_format_npz,
...@@ -119,7 +120,11 @@ def create_temp_engine_args_file(args) -> Path: ...@@ -119,7 +120,11 @@ def create_temp_engine_args_file(args) -> Path:
"is_decode": getattr(args, "is_decode_worker", None), "is_decode": getattr(args, "is_decode_worker", None),
"enable_local_indexer": not getattr(args, "durable_kv_events", False), "enable_local_indexer": not getattr(args, "durable_kv_events", False),
# Note: bootstrap_port and zmq_kv_events_port are NOT included here # Note: bootstrap_port and zmq_kv_events_port are NOT included here
# — they are per-worker and set in launch_workers() # - they are per-worker and set in launch_workers()
# Note: kv_bytes_per_token and kv_cache_dtype are NOT included here
# - kv_bytes_per_token is auto-computed in main.py after model prefetch,
# - kv_cache_dtype is only used Python-side for the auto-computation.
"kv_transfer_bandwidth": getattr(args, "kv_transfer_bandwidth", None),
} }
# Parse --reasoning JSON string into a nested object # Parse --reasoning JSON string into a nested object
...@@ -386,6 +391,40 @@ def parse_args(): ...@@ -386,6 +391,40 @@ def parse_args():
"Prefill workers listen on these ports; decode workers connect to them. " "Prefill workers listen on these ports; decode workers connect to them. "
"If not specified, bootstrap rendezvous is disabled.", "If not specified, bootstrap rendezvous is disabled.",
) )
# KV cache transfer latency simulation
parser.add_argument(
"--kv-transfer-bandwidth",
type=float,
default=DEFAULT_KV_TRANSFER_BANDWIDTH_GBPS,
help="KV cache transfer bandwidth in GB/s for disaggregated serving latency simulation. "
"Default: 64.0 (inter-node InfiniBand). Set to 0 to disable KV transfer delay. "
"For intra-node NVLink, typical value is ~450.",
)
parser.add_argument(
"--kv-cache-dtype",
type=str,
default="auto",
choices=[
"auto",
"bfloat16",
"fp8",
"fp8_ds_mla",
"fp8_e4m3",
"fp8_e5m2",
"fp8_inc",
],
help="Data type for KV cache, used to compute kv_bytes_per_token. "
"'auto' uses the model's dtype (default).",
)
parser.add_argument(
"--kv-bytes-per-token",
type=int,
default=None,
help="KV cache bytes per token. If not specified, auto-computed from model config "
"using: num_layers * 2 * num_kv_heads * head_dim * dtype_bytes.",
)
parser.add_argument( parser.add_argument(
"--stagger-delay", "--stagger-delay",
type=float, type=float,
......
...@@ -21,6 +21,7 @@ from dynamo.runtime import DistributedRuntime ...@@ -21,6 +21,7 @@ from dynamo.runtime import DistributedRuntime
from dynamo.runtime.logging import configure_dynamo_logging from dynamo.runtime.logging import configure_dynamo_logging
from .args import create_temp_engine_args_file, parse_args, resolve_planner_profile_data from .args import create_temp_engine_args_file, parse_args, resolve_planner_profile_data
from .utils.kv_cache import compute_kv_bytes_per_token
configure_dynamo_logging() configure_dynamo_logging()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -81,6 +82,20 @@ async def worker(): ...@@ -81,6 +82,20 @@ async def worker():
if args.num_workers > 1 and args.model_path: if args.num_workers > 1 and args.model_path:
await prefetch_model(args.model_path) await prefetch_model(args.model_path)
# Auto-compute kv_bytes_per_token from model config if not explicitly set
if args.kv_bytes_per_token is None and args.model_path:
args.kv_bytes_per_token = compute_kv_bytes_per_token(
args.model_path, args.kv_cache_dtype
)
# Inject kv_bytes_per_token into engine args JSON (computed after model prefetch)
if args.kv_bytes_per_token is not None and not args.extra_engine_args:
with open(extra_engine_args_path) as f:
engine_args = json.load(f)
engine_args["kv_bytes_per_token"] = args.kv_bytes_per_token
with open(extra_engine_args_path, "w") as f:
json.dump(engine_args, f, indent=2)
try: try:
logger.info( logger.info(
f"Launching {args.num_workers} mocker worker(s) with isolated DistributedRuntime instances" f"Launching {args.num_workers} mocker worker(s) with isolated DistributedRuntime instances"
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import logging
from transformers import AutoConfig
from dynamo.profiler.utils.model_info import get_model_info
logger = logging.getLogger(__name__)
# Mapping from dtype strings to byte sizes for KV cache.
# Used when --kv-cache-dtype is "auto" to infer from model config's dtype,
# or when explicitly set via CLI (matching vLLM's --kv-cache-dtype choices).
TORCH_DTYPE_BYTES = {
# auto-detected from model config (torch.dtype str representations)
"float16": 2,
"bfloat16": 2,
"float32": 4,
"float8_e4m3fn": 1,
"float8_e5m2": 1,
# vLLM CLI choices
"fp8": 1,
"fp8_ds_mla": 1,
"fp8_e4m3": 1,
"fp8_inc": 1,
}
# Default KV transfer bandwidth in GB/s.
# 64 GB/s corresponds to inter-node InfiniBand.
# For intra-node NVLink, typical value is ~450 GB/s.
DEFAULT_KV_TRANSFER_BANDWIDTH_GBPS = 64.0
def _normalize_dtype_str(dtype) -> str:
"""Normalize a dtype to a plain string like 'float16'.
Handles torch.dtype objects (str() gives 'torch.float16') and plain strings.
"""
s = str(dtype)
if s.startswith("torch."):
s = s[len("torch.") :]
return s
def get_kv_cache_dtype_bytes(config, kv_cache_dtype: str = "auto") -> int:
"""Get the byte size per element for KV cache based on dtype.
When kv_cache_dtype is "auto", uses the model's dtype from config.
Follows vLLM's --kv-cache-dtype convention.
"""
if kv_cache_dtype == "auto":
dtype = _normalize_dtype_str(getattr(config, "dtype", "float16"))
return TORCH_DTYPE_BYTES.get(dtype, 2)
return TORCH_DTYPE_BYTES.get(kv_cache_dtype, 2)
def compute_kv_bytes_per_token(
model_path: str, kv_cache_dtype: str = "auto"
) -> int | None:
"""Compute KV cache bytes per token from model config.
Formula: num_layers * 2 (K+V) * num_kv_heads * head_dim * dtype_bytes
Uses get_model_info from dynamo.profiler for robust detection of num_kv_heads
across different model architectures.
Args:
model_path: Path to model directory or HuggingFace model ID.
kv_cache_dtype: KV cache dtype. "auto" uses model's torch_dtype.
Returns:
KV bytes per token, or None if model config cannot be parsed.
"""
try:
info = get_model_info(model_path)
config = AutoConfig.from_pretrained(model_path, trust_remote_code=False)
num_layers = config.num_hidden_layers
num_kv_heads = info.num_kv_heads
head_dim = config.hidden_size // config.num_attention_heads
dtype_bytes = get_kv_cache_dtype_bytes(config, kv_cache_dtype)
kv_bytes = num_layers * 2 * num_kv_heads * head_dim * dtype_bytes
logger.debug(
f"Auto-computed kv_bytes_per_token={kv_bytes} "
f"({num_layers} layers, {num_kv_heads} kv_heads, {head_dim} head_dim, "
f"{dtype_bytes} dtype_bytes)"
)
return kv_bytes
except Exception as e:
logger.warning(f"Could not compute kv_bytes_per_token from model config: {e}")
return None
...@@ -91,6 +91,9 @@ python -m dynamo.mocker \ ...@@ -91,6 +91,9 @@ python -m dynamo.mocker \
| `--disaggregation-mode` | `agg` | Worker mode: `agg` (aggregated), `prefill`, or `decode` | | `--disaggregation-mode` | `agg` | Worker mode: `agg` (aggregated), `prefill`, or `decode` |
| `--durable-kv-events` | False | Enable durable KV events via JetStream (disables local indexer) | | `--durable-kv-events` | False | Enable durable KV events via JetStream (disables local indexer) |
| `--bootstrap-ports` | None | Ports for P/D rendezvous | | `--bootstrap-ports` | None | Ports for P/D rendezvous |
| `--kv-transfer-bandwidth` | 64.0 | KV cache transfer bandwidth in GB/s. Set to 0 to disable |
| `--kv-cache-dtype` | auto | KV cache dtype for bytes-per-token computation |
| `--kv-bytes-per-token` | Auto-computed | KV cache bytes per token (override auto-computation) |
## Architecture ## Architecture
...@@ -160,6 +163,16 @@ The mocker supports two timing prediction modes: ...@@ -160,6 +163,16 @@ The mocker supports two timing prediction modes:
For disaggregated prefill/decode deployments, prefill and decode workers coordinate via a simple TCP-based rendezvous protocol. The decode worker connects to the prefill worker's bootstrap port and waits until the prefill phase completes and KV cache is ready. Either side can arrive first—the rendezvous completes when both are ready. For disaggregated prefill/decode deployments, prefill and decode workers coordinate via a simple TCP-based rendezvous protocol. The decode worker connects to the prefill worker's bootstrap port and waits until the prefill phase completes and KV cache is ready. Either side can arrive first—the rendezvous completes when both are ready.
### KV Transfer Latency Simulation
The mocker simulates KV cache transfer time between prefill and decode workers. Before the prefill worker emits its first (and only) token, it sleeps for a duration based on:
- **kv_bytes_per_token** (auto-computed from model config): `num_layers * 2 * num_kv_heads * head_dim * dtype_bytes`. The `dtype_bytes` is determined by `--kv-cache-dtype`: when set to `auto` (default), it uses the model's `dtype` from config; when explicitly set (e.g., `fp8`), it uses the specified dtype instead. It can also be overridden directly with `--kv-bytes-per-token`.
- **kv_transfer_bandwidth** (default: 64.0 GB/s, inter-node InfiniBand)
- **Transfer time**: `num_input_tokens * kv_bytes_per_token / bandwidth`
This delay is injected after the scheduler's prefill compute simulation completes, modeling the sequential flow: prefill computation → KV transfer → decode begins. Set `--kv-transfer-bandwidth 0` to disable.
## Integration with Dynamo ## Integration with Dynamo
### KV Event Publishing ### KV Event Publishing
...@@ -199,7 +212,6 @@ The mocker is particularly useful for: ...@@ -199,7 +212,6 @@ The mocker is particularly useful for:
The following features are not yet supported by the mocker: The following features are not yet supported by the mocker:
- **KV transfer latency simulation** - Disaggregated serving simulates the rendezvous handshake but does not model the actual KV cache transfer time between prefill and decode workers
- **Multi-tier memory** - No support for offloading KV cache to CPU/disk or onboarding back to GPU; potential future integration with KVBM - **Multi-tier memory** - No support for offloading KV cache to CPU/disk or onboarding back to GPU; potential future integration with KVBM
- **Multimodal support** - Currently only simulates text token processing; no vision encoder or cross-attention simulation - **Multimodal support** - Currently only simulates text token processing; no vision encoder or cross-attention simulation
- **Native Rust reference counting** - Work in progress to use native Rc/Arc for block reference counting, enabling natural RAII patterns for simpler tracking - **Native Rust reference counting** - Work in progress to use native Rc/Arc for block reference counting, enabling natural RAII patterns for simpler tracking
...@@ -41,6 +41,7 @@ use dynamo_mocker::common::protocols::OutputSignal; ...@@ -41,6 +41,7 @@ use dynamo_mocker::common::protocols::OutputSignal;
pub use dynamo_mocker::common::protocols::{ pub use dynamo_mocker::common::protocols::{
DirectRequest, KvCacheEventSink, MockEngineArgs, MockEngineArgsBuilder, DirectRequest, KvCacheEventSink, MockEngineArgs, MockEngineArgsBuilder,
}; };
use dynamo_mocker::common::utils::{compute_kv_transfer_delay, sleep_precise};
pub use dynamo_mocker::common::{bootstrap, perf_model, protocols, running_mean, sequence}; pub use dynamo_mocker::common::{bootstrap, perf_model, protocols, running_mean, sequence};
pub use dynamo_mocker::scheduler::Scheduler; pub use dynamo_mocker::scheduler::Scheduler;
pub use dynamo_mocker::{kv_manager, scheduler}; pub use dynamo_mocker::{kv_manager, scheduler};
...@@ -538,6 +539,14 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<LLMEngineOutput>, Error> ...@@ -538,6 +539,14 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<LLMEngineOutput>, Error>
let bootstrap_server = self.bootstrap_server.clone(); let bootstrap_server = self.bootstrap_server.clone();
let reasoning = self.engine_args.reasoning.clone(); let reasoning = self.engine_args.reasoning.clone();
// Compute KV transfer delay for prefill workers.
// Simulates the time to transfer KV cache from prefill to decode worker.
let kv_transfer_delay = if is_prefill {
compute_kv_transfer_delay(&self.engine_args, request.token_ids.len())
} else {
None
};
// Spawn a task to handle the complex async logic // Spawn a task to handle the complex async logic
tokio::spawn(async move { tokio::spawn(async move {
let mut token_count = 0; let mut token_count = 0;
...@@ -570,14 +579,6 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<LLMEngineOutput>, Error> ...@@ -570,14 +579,6 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<LLMEngineOutput>, Error>
..Default::default() ..Default::default()
}; };
// Prefill: after first token, mark room complete (unblocks decode)
if is_prefill
&& token_count == 1
&& let (Some(server), Some(room_id)) = (bootstrap_server.get(), bootstrap_room)
{
server.complete_room(room_id);
}
if signal.completed && token_count < max_output_tokens { if signal.completed && token_count < max_output_tokens {
let _ = stream_tx.send(LLMEngineOutput::error("Completion signal received before max tokens reached".to_string())); let _ = stream_tx.send(LLMEngineOutput::error("Completion signal received before max tokens reached".to_string()));
break; break;
...@@ -585,6 +586,23 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<LLMEngineOutput>, Error> ...@@ -585,6 +586,23 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<LLMEngineOutput>, Error>
if signal.completed { if signal.completed {
let _ = stream_tx.send(output); let _ = stream_tx.send(output);
// Simulate KV transfer delay before prefill's first (and only) token.
// This models the time to transfer KV cache to the decode worker.
if token_count == 1
&& let Some(delay) = kv_transfer_delay
{
sleep_precise(delay).await;
}
// Prefill: after first token, mark room complete (unblocks decode)
if is_prefill
&& token_count == 1
&& let (Some(server), Some(room_id)) = (bootstrap_server.get(), bootstrap_room)
{
server.complete_room(room_id);
}
let _ = stream_tx.send(LLMEngineOutput::length()); let _ = stream_tx.send(LLMEngineOutput::length());
break; break;
} }
......
...@@ -9,3 +9,4 @@ pub mod perf_model; ...@@ -9,3 +9,4 @@ pub mod perf_model;
pub mod protocols; pub mod protocols;
pub mod running_mean; pub mod running_mean;
pub mod sequence; pub mod sequence;
pub mod utils;
...@@ -183,6 +183,18 @@ pub struct MockEngineArgs { ...@@ -183,6 +183,18 @@ pub struct MockEngineArgs {
#[builder(default = "None")] #[builder(default = "None")]
pub bootstrap_port: Option<u16>, pub bootstrap_port: Option<u16>,
/// KV cache bytes per token, auto-computed from model config by Python CLI.
/// Formula: num_layers * 2 * num_kv_heads * head_dim * dtype_bytes
#[builder(default = "None")]
pub kv_bytes_per_token: Option<usize>,
/// KV cache transfer bandwidth in GB/s for disaggregated serving latency simulation.
/// Default: 64.0 (inter-node InfiniBand). Set to 0 to disable KV transfer delay.
/// For intra-node NVLink, typical value is ~450.
#[builder(default = "None")]
#[validate(range(min = 0.0))]
pub kv_transfer_bandwidth: Option<f64>,
/// Reasoning/thinking token configuration. /// Reasoning/thinking token configuration.
/// When set, the mocker wraps output in thinking boundary tokens. /// When set, the mocker wraps output in thinking boundary tokens.
#[builder(default = "None")] #[builder(default = "None")]
...@@ -245,6 +257,8 @@ impl MockEngineArgs { ...@@ -245,6 +257,8 @@ impl MockEngineArgs {
"planner_profile_data", "planner_profile_data",
"enable_local_indexer", "enable_local_indexer",
"bootstrap_port", "bootstrap_port",
"kv_bytes_per_token",
"kv_transfer_bandwidth",
"reasoning", "reasoning",
"zmq_kv_events_port", "zmq_kv_events_port",
] ]
...@@ -340,6 +354,18 @@ impl MockEngineArgs { ...@@ -340,6 +354,18 @@ impl MockEngineArgs {
builder = builder.bootstrap_port(Some(port as u16)); builder = builder.bootstrap_port(Some(port as u16));
} }
if let Some(value) = extra_args.get("kv_bytes_per_token")
&& let Some(num) = value.as_u64()
{
builder = builder.kv_bytes_per_token(Some(num as usize));
}
if let Some(value) = extra_args.get("kv_transfer_bandwidth")
&& let Some(num) = value.as_f64()
{
builder = builder.kv_transfer_bandwidth(Some(num));
}
if let Some(value) = extra_args.get("reasoning") { if let Some(value) = extra_args.get("reasoning") {
let cfg: ReasoningConfig = serde_json::from_value(value.clone()) let cfg: ReasoningConfig = serde_json::from_value(value.clone())
.map_err(|e| anyhow::anyhow!("Failed to parse reasoning config: {}", e))?; .map_err(|e| anyhow::anyhow!("Failed to parse reasoning config: {}", e))?;
......
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::time::{Duration, Instant};
use crate::common::protocols::MockEngineArgs;
/// Compute the KV transfer delay duration for a given number of input tokens.
///
/// Returns `None` if KV transfer simulation is disabled (bandwidth is 0 or not configured).
pub fn compute_kv_transfer_delay(
args: &MockEngineArgs,
num_input_tokens: usize,
) -> Option<Duration> {
match (args.kv_transfer_bandwidth, args.kv_bytes_per_token) {
(Some(bw), Some(bpt)) if bw > 0.0 => {
let kv_bytes = num_input_tokens as f64 * bpt as f64;
let delay = Duration::from_secs_f64(kv_bytes / (bw * 1e9));
tracing::debug!(
num_input_tokens,
kv_bytes,
bandwidth_gb_s = bw,
delay_ms = format!("{:.2}", delay.as_secs_f64() * 1000.0),
"KV transfer delay for prefill"
);
Some(delay)
}
_ => None,
}
}
/// Sleep for the specified duration using timerfd on Linux for precision.
pub async fn sleep_precise(duration: Duration) {
sleep_until_precise(Instant::now() + duration).await;
}
/// Sleep until the specified deadline using timerfd on Linux for precision.
///
/// Unlike `sleep_precise`, this accounts for time already elapsed since the
/// deadline's reference point, making it suitable for simulation loops where
/// computation time should be subtracted from the sleep.
pub async fn sleep_until_precise(deadline: Instant) {
#[cfg(target_os = "linux")]
{
if let Ok(delay) = tokio_timerfd::Delay::new(deadline) {
let _ = delay.await;
} else {
tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await;
}
}
#[cfg(not(target_os = "linux"))]
{
tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await;
}
}
...@@ -36,6 +36,7 @@ use crate::common::protocols::{ ...@@ -36,6 +36,7 @@ use crate::common::protocols::{
}; };
use crate::common::running_mean::RunningMean; use crate::common::running_mean::RunningMean;
use crate::common::sequence::ActiveSequence; use crate::common::sequence::ActiveSequence;
use crate::common::utils::sleep_until_precise;
use crate::kv_manager::KvManager; use crate::kv_manager::KvManager;
use dynamo_kv_router::protocols::DpRank; use dynamo_kv_router::protocols::DpRank;
use dynamo_tokens::blocks::UniqueBlock; use dynamo_tokens::blocks::UniqueBlock;
...@@ -44,8 +45,6 @@ use std::sync::Arc; ...@@ -44,8 +45,6 @@ use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::time::Duration; use tokio::time::Duration;
#[cfg(target_os = "linux")]
use tokio_timerfd::Delay;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use uuid::Uuid; use uuid::Uuid;
use validator::Validate; use validator::Validate;
...@@ -420,16 +419,7 @@ async fn simulate_prefill( ...@@ -420,16 +419,7 @@ async fn simulate_prefill(
let sleep_duration = Duration::from_secs_f64(total_time.as_secs_f64() / speedup_ratio); let sleep_duration = Duration::from_secs_f64(total_time.as_secs_f64() / speedup_ratio);
let deadline = start_time + sleep_duration; let deadline = start_time + sleep_duration;
#[cfg(target_os = "linux")] sleep_until_precise(deadline).await;
{
if let Ok(delay) = Delay::new(deadline) {
let _ = delay.await;
}
}
#[cfg(not(target_os = "linux"))]
{
tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await;
}
} }
total_time total_time
...@@ -514,16 +504,7 @@ async fn simulate_decode( ...@@ -514,16 +504,7 @@ async fn simulate_decode(
let sleep_duration = Duration::from_secs_f64(total_time.as_secs_f64() / speedup_ratio); let sleep_duration = Duration::from_secs_f64(total_time.as_secs_f64() / speedup_ratio);
let deadline = start_time + sleep_duration; let deadline = start_time + sleep_duration;
#[cfg(target_os = "linux")] sleep_until_precise(deadline).await;
{
if let Ok(delay) = Delay::new(deadline) {
let _ = delay.await;
}
}
#[cfg(not(target_os = "linux"))]
{
tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await;
}
} }
total_time total_time
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment