Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
OpenDAS
dynamo
Commits
c08034eb
Unverified
Commit
c08034eb
authored
Apr 03, 2026
by
Yan Ru Pei
Committed by
GitHub
Apr 03, 2026
Browse files
fix(bench): replay mooncake events offline (#7876)
Signed-off-by:
PeaBrane
<
yanrpei@gmail.com
>
parent
04e05f8c
Changes
7
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
238 additions
and
185 deletions
+238
-185
lib/bench/kv_router/common/mod.rs
lib/bench/kv_router/common/mod.rs
+14
-172
lib/mocker/src/replay/artifacts.rs
lib/mocker/src/replay/artifacts.rs
+37
-0
lib/mocker/src/replay/entrypoints.rs
lib/mocker/src/replay/entrypoints.rs
+11
-1
lib/mocker/src/replay/mod.rs
lib/mocker/src/replay/mod.rs
+10
-6
lib/mocker/src/replay/offline/core.rs
lib/mocker/src/replay/offline/core.rs
+13
-0
lib/mocker/src/replay/offline/entrypoints.rs
lib/mocker/src/replay/offline/entrypoints.rs
+150
-3
lib/mocker/src/replay/offline/mod.rs
lib/mocker/src/replay/offline/mod.rs
+3
-3
No files found.
lib/bench/kv_router/common/mod.rs
View file @
c08034eb
...
@@ -11,15 +11,15 @@ use dynamo_kv_router::protocols::{
...
@@ -11,15 +11,15 @@ use dynamo_kv_router::protocols::{
KvCacheStoredBlockData
,
RouterEvent
,
WorkerId
,
XXH3_SEED
,
compute_seq_hash_for_block
,
KvCacheStoredBlockData
,
RouterEvent
,
WorkerId
,
XXH3_SEED
,
compute_seq_hash_for_block
,
};
};
pub
use
dynamo_kv_router
::
test_utils
::{
NoopSequencePublisher
,
SimpleWorkerConfig
};
pub
use
dynamo_kv_router
::
test_utils
::{
NoopSequencePublisher
,
SimpleWorkerConfig
};
use
dynamo_mocker
::
common
::
protocols
::{
use
dynamo_mocker
::
common
::
protocols
::
MockEngineArgs
;
DirectRequest
,
KvCacheEventSink
,
KvEventPublishers
,
MockEngineArgs
,
OutputSignal
,
};
use
dynamo_mocker
::
loadgen
::{
use
dynamo_mocker
::
loadgen
::{
ArrivalSpec
,
DelaySpec
,
LengthSpec
,
ReplayRequestHashes
,
RouterSequence
,
SequenceHashMode
,
ArrivalSpec
,
DelaySpec
,
LengthSpec
,
ReplayRequestHashes
,
RouterSequence
,
SequenceHashMode
,
SessionPartitionSpec
,
SyntheticTraceSpec
,
Trace
,
SessionPartitionSpec
,
SyntheticTraceSpec
,
Trace
,
};
};
use
dynamo_mocker
::
scheduler
::
Scheduler
;
pub
use
dynamo_mocker
::
replay
::{
use
dynamo_mocker
::
scheduler
::
SchedulerHandle
;
ReplayTimedKvEvent
as
TimedKvEvent
,
ReplayTimedOutputSignal
as
TimedOutputSignal
,
ReplayTimedRequest
as
TimedReplayRequest
,
ReplayWorkerArtifacts
as
WorkerReplayArtifacts
,
};
use
dynamo_tokens
::
compute_hash_v2
;
use
dynamo_tokens
::
compute_hash_v2
;
use
indicatif
::{
ProgressBar
,
ProgressStyle
};
use
indicatif
::{
ProgressBar
,
ProgressStyle
};
use
plotters
::
prelude
::
*
;
use
plotters
::
prelude
::
*
;
...
@@ -27,10 +27,7 @@ use rand::prelude::*;
...
@@ -27,10 +27,7 @@ use rand::prelude::*;
use
serde
::{
Deserialize
,
Serialize
};
use
serde
::{
Deserialize
,
Serialize
};
use
std
::
fs
::
File
;
use
std
::
fs
::
File
;
use
std
::
io
::{
BufRead
,
BufReader
};
use
std
::
io
::{
BufRead
,
BufReader
};
use
std
::
sync
::{
Arc
,
Mutex
};
use
tokio
::
sync
::
mpsc
;
use
tokio
::
task
::
JoinHandle
;
use
tokio
::
task
::
JoinHandle
;
use
tokio
::
time
::
Instant
;
use
uuid
::
Uuid
;
use
uuid
::
Uuid
;
/// Shared CLI arguments for trace-based benchmarks.
/// Shared CLI arguments for trace-based benchmarks.
...
@@ -112,63 +109,6 @@ pub struct MooncakeRequest {
...
@@ -112,63 +109,6 @@ pub struct MooncakeRequest {
pub
output_length
:
u64
,
pub
output_length
:
u64
,
}
}
/// Collects KV cache events emitted by the mock engine during event generation,
/// tagging each with the wall-clock instant it was produced.
pub
struct
EventCollector
{
events
:
Mutex
<
Option
<
Vec
<
(
KvCacheEvent
,
Instant
)
>>>
,
}
impl
EventCollector
{
pub
fn
new
()
->
Arc
<
Self
>
{
Arc
::
new
(
Self
{
events
:
Mutex
::
new
(
Some
(
Vec
::
new
())),
})
}
pub
fn
get_events
(
self
:
Arc
<
Self
>
)
->
Vec
<
(
KvCacheEvent
,
Instant
)
>
{
self
.events
.lock
()
.unwrap
()
.take
()
.unwrap
()
}
}
impl
KvCacheEventSink
for
EventCollector
{
fn
publish
(
&
self
,
event
:
KvCacheEvent
)
->
anyhow
::
Result
<
()
>
{
let
timestamp
=
Instant
::
now
();
if
let
Some
(
events
)
=
self
.events
.lock
()
.unwrap
()
.as_mut
()
{
events
.push
((
event
,
timestamp
));
}
Ok
(())
}
}
#[derive(Clone)]
pub
struct
TimedReplayRequest
{
pub
uuid
:
Uuid
,
pub
timestamp_us
:
u64
,
pub
scheduled_ready_at_ms
:
f64
,
pub
input_length
:
usize
,
pub
output_length
:
usize
,
pub
replay_hashes
:
ReplayRequestHashes
,
}
#[derive(Clone)]
pub
struct
TimedOutputSignal
{
pub
signal
:
OutputSignal
,
pub
timestamp_us
:
u64
,
}
#[derive(Clone)]
pub
struct
TimedKvEvent
{
pub
event
:
KvCacheEvent
,
pub
timestamp_us
:
u64
,
}
#[derive(Clone)]
pub
struct
WorkerReplayArtifacts
{
pub
requests
:
Vec
<
TimedReplayRequest
>
,
pub
output_signals
:
Vec
<
TimedOutputSignal
>
,
pub
kv_events
:
Vec
<
TimedKvEvent
>
,
}
/// Load the mooncake trace from disk into a flat list of requests.
/// Load the mooncake trace from disk into a flat list of requests.
pub
fn
load_mooncake_trace
(
path
:
&
str
)
->
anyhow
::
Result
<
Vec
<
MooncakeRequest
>>
{
pub
fn
load_mooncake_trace
(
path
:
&
str
)
->
anyhow
::
Result
<
Vec
<
MooncakeRequest
>>
{
let
file
=
File
::
open
(
path
)
?
;
let
file
=
File
::
open
(
path
)
?
;
...
@@ -367,14 +307,14 @@ pub fn default_mock_engine_args(
...
@@ -367,14 +307,14 @@ pub fn default_mock_engine_args(
Ok
(
MockEngineArgs
::
builder
()
Ok
(
MockEngineArgs
::
builder
()
.num_gpu_blocks
(
num_gpu_blocks
)
.num_gpu_blocks
(
num_gpu_blocks
)
.block_size
(
block_size
)
.block_size
(
block_size
)
.speedup_ratio
(
0.0
)
.speedup_ratio
(
1
0.0
)
.enable_prefix_caching
(
true
)
.enable_prefix_caching
(
true
)
.max_num_batched_tokens
(
None
)
.max_num_batched_tokens
(
None
)
.max_num_seqs
(
None
)
.max_num_seqs
(
None
)
.build
()
?
)
.build
()
?
)
}
}
async
fn
replay_worker_trace
(
fn
replay_worker_trace
(
trace
:
Trace
,
trace
:
Trace
,
sched_args
:
MockEngineArgs
,
sched_args
:
MockEngineArgs
,
trace_simulation_duration_ms
:
u64
,
trace_simulation_duration_ms
:
u64
,
...
@@ -385,103 +325,12 @@ async fn replay_worker_trace(
...
@@ -385,103 +325,12 @@ async fn replay_worker_trace(
.iter
()
.iter
()
.map
(|
session
|
session
.turns
.len
())
.map
(|
session
|
session
.turns
.len
())
.sum
::
<
usize
>
();
.sum
::
<
usize
>
();
let
mut
driver
=
trace
let
artifacts
=
dynamo_mocker
::
replay
::
generate_trace_worker_artifacts_offline
(
.rescale_ready_span
(
trace_simulation_duration_ms
)
?
.into_trace_driver
()
?
;
let
collector
=
EventCollector
::
new
();
let
(
output_tx
,
mut
output_rx
)
=
mpsc
::
unbounded_channel
::
<
Vec
<
OutputSignal
>>
();
let
scheduler
=
Scheduler
::
new
(
sched_args
,
sched_args
,
0
,
trace
.rescale_ready_span
(
trace_simulation_duration_ms
)
?
,
Some
(
output_tx
),
)
?
;
KvEventPublishers
::
new
(
Some
(
collector
.clone
()),
None
),
progress
.inc
(
total_turns
as
u64
);
None
,
Ok
(
artifacts
)
);
let
start
=
Instant
::
now
();
let
mut
requests
=
Vec
::
with_capacity
(
total_turns
);
let
mut
output_signals
=
Vec
::
new
();
let
mut
completed_turns
=
0u
size
;
while
completed_turns
<
total_turns
{
let
now_ms
=
start
.elapsed
()
.as_secs_f64
()
*
1000.0
;
for
ready_turn
in
driver
.pop_ready
(
now_ms
,
usize
::
MAX
)
{
let
replay_hashes
=
ready_turn
.replay_hashes
.ok_or_else
(||
{
anyhow
::
anyhow!
(
"bench replay requires synthesized request hashes"
)
})
?
;
requests
.push
(
TimedReplayRequest
{
uuid
:
ready_turn
.request_uuid
,
timestamp_us
:
start
.elapsed
()
.as_micros
()
as
u64
,
scheduled_ready_at_ms
:
ready_turn
.scheduled_ready_at_ms
,
input_length
:
ready_turn
.request.tokens
.len
(),
output_length
:
ready_turn
.request.max_output_tokens
,
replay_hashes
,
});
scheduler
.receive
(
ready_turn
.request
);
progress
.inc
(
1
);
}
if
completed_turns
>=
total_turns
{
break
;
}
match
driver
.next_ready_time_ms
()
{
Some
(
next_ready_ms
)
=>
{
let
deadline
=
start
+
Duration
::
from_secs_f64
((
next_ready_ms
.max
(
0.0
))
/
1000.0
);
tokio
::
select!
{
maybe_signal
=
output_rx
.recv
()
=>
{
let
Some
(
output_batch
)
=
maybe_signal
else
{
anyhow
::
bail!
(
"scheduler ended before workload replay drained"
);
};
let
timestamp_us
=
start
.elapsed
()
.as_micros
()
as
u64
;
let
completion_ms
=
start
.elapsed
()
.as_secs_f64
()
*
1000.0
;
for
signal
in
output_batch
{
output_signals
.push
(
TimedOutputSignal
{
signal
:
signal
.clone
(),
timestamp_us
,
});
if
signal
.completed
{
completed_turns
+=
1
;
driver
.on_complete
(
signal
.uuid
,
completion_ms
)
?
;
}
}
}
_
=
tokio
::
time
::
sleep_until
(
deadline
)
=>
{}
}
}
None
=>
{
let
Some
(
output_batch
)
=
output_rx
.recv
()
.await
else
{
anyhow
::
bail!
(
"scheduler ended before workload replay drained"
);
};
let
timestamp_us
=
start
.elapsed
()
.as_micros
()
as
u64
;
let
completion_ms
=
start
.elapsed
()
.as_secs_f64
()
*
1000.0
;
for
signal
in
output_batch
{
output_signals
.push
(
TimedOutputSignal
{
signal
:
signal
.clone
(),
timestamp_us
,
});
if
signal
.completed
{
completed_turns
+=
1
;
driver
.on_complete
(
signal
.uuid
,
completion_ms
)
?
;
}
}
}
}
}
drop
(
scheduler
);
Ok
(
WorkerReplayArtifacts
{
requests
,
output_signals
,
kv_events
:
collector
.get_events
()
.into_iter
()
.map
(|(
event
,
timestamp
)|
TimedKvEvent
{
event
,
timestamp_us
:
timestamp
.saturating_duration_since
(
start
)
.as_micros
()
as
u64
,
})
.collect
(),
})
}
}
pub
async
fn
generate_replay_artifacts
(
pub
async
fn
generate_replay_artifacts
(
...
@@ -509,8 +358,8 @@ pub async fn generate_replay_artifacts(
...
@@ -509,8 +358,8 @@ pub async fn generate_replay_artifacts(
for
trace
in
traces
.iter
()
.cloned
()
{
for
trace
in
traces
.iter
()
.cloned
()
{
let
sched_args
=
sched_args
.clone
();
let
sched_args
=
sched_args
.clone
();
let
progress
=
progress
.clone
();
let
progress
=
progress
.clone
();
tasks
.push
(
tokio
::
spawn
(
async
move
{
tasks
.push
(
tokio
::
task
::
spawn_blocking
(
move
||
{
replay_worker_trace
(
trace
,
sched_args
,
trace_simulation_duration_ms
,
progress
)
.await
replay_worker_trace
(
trace
,
sched_args
,
trace_simulation_duration_ms
,
progress
)
}));
}));
}
}
...
@@ -532,13 +381,6 @@ pub async fn generate_replay_artifacts(
...
@@ -532,13 +381,6 @@ pub async fn generate_replay_artifacts(
.map
(|
artifact
|
artifact
.kv_events
.len
())
.map
(|
artifact
|
artifact
.kv_events
.len
())
.sum
::
<
usize
>
()
.sum
::
<
usize
>
()
);
);
if
progress
.elapsed
()
>
Duration
::
from_millis
(
trace_simulation_duration_ms
*
11
/
10
)
{
eprintln!
(
"Warning: Generated events took significantly longer than the trace simulation duration. Inaccurate timing information has been produced. Rerun with a larger --trace-simulation-duration-ms."
);
}
let
mut
num_stored_events
=
0
;
let
mut
num_stored_events
=
0
;
let
mut
num_removed_events
=
0
;
let
mut
num_removed_events
=
0
;
for
event
in
artifacts
for
event
in
artifacts
...
...
lib/mocker/src/replay/artifacts.rs
0 → 100644
View file @
c08034eb
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use
dynamo_kv_router
::
protocols
::
KvCacheEvent
;
use
uuid
::
Uuid
;
use
crate
::
common
::
protocols
::
OutputSignal
;
use
crate
::
loadgen
::
ReplayRequestHashes
;
#[derive(Debug,
Clone)]
pub
struct
ReplayTimedRequest
{
pub
uuid
:
Uuid
,
pub
timestamp_us
:
u64
,
pub
scheduled_ready_at_ms
:
f64
,
pub
input_length
:
usize
,
pub
output_length
:
usize
,
pub
replay_hashes
:
ReplayRequestHashes
,
}
#[derive(Debug,
Clone)]
pub
struct
ReplayTimedOutputSignal
{
pub
signal
:
OutputSignal
,
pub
timestamp_us
:
u64
,
}
#[derive(Debug,
Clone)]
pub
struct
ReplayTimedKvEvent
{
pub
event
:
KvCacheEvent
,
pub
timestamp_us
:
u64
,
}
#[derive(Debug,
Clone,
Default)]
pub
struct
ReplayWorkerArtifacts
{
pub
requests
:
Vec
<
ReplayTimedRequest
>
,
pub
output_signals
:
Vec
<
ReplayTimedOutputSignal
>
,
pub
kv_events
:
Vec
<
ReplayTimedKvEvent
>
,
}
lib/mocker/src/replay/entrypoints.rs
View file @
c08034eb
...
@@ -13,10 +13,20 @@ use super::validate::{
...
@@ -13,10 +13,20 @@ use super::validate::{
validate_offline_disagg_replay_args
,
validate_offline_replay_args
,
validate_offline_disagg_replay_args
,
validate_offline_replay_args
,
validate_online_concurrency_args
,
validate_online_replay_args
,
validate_online_concurrency_args
,
validate_online_replay_args
,
};
};
use
super
::{
OfflineDisaggReplayConfig
,
ReplayRouterMode
,
TraceSimulationReport
};
use
super
::{
OfflineDisaggReplayConfig
,
ReplayRouterMode
,
ReplayWorkerArtifacts
,
TraceSimulationReport
,
};
use
crate
::
common
::
protocols
::{
DirectRequest
,
MockEngineArgs
};
use
crate
::
common
::
protocols
::{
DirectRequest
,
MockEngineArgs
};
use
crate
::
loadgen
::
Trace
;
use
crate
::
loadgen
::
Trace
;
pub
fn
generate_trace_worker_artifacts_offline
(
args
:
MockEngineArgs
,
trace
:
Trace
,
)
->
Result
<
ReplayWorkerArtifacts
>
{
let
args
=
args
.normalized
()
?
;
crate
::
replay
::
offline
::
generate_trace_worker_artifacts
(
args
,
trace
)
}
pub
fn
simulate_trace_file
(
pub
fn
simulate_trace_file
(
args
:
MockEngineArgs
,
args
:
MockEngineArgs
,
trace_path
:
&
Path
,
trace_path
:
&
Path
,
...
...
lib/mocker/src/replay/mod.rs
View file @
c08034eb
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
// SPDX-License-Identifier: Apache-2.0
mod
artifacts
;
mod
collector
;
mod
collector
;
mod
entrypoints
;
mod
entrypoints
;
pub
(
crate
)
mod
offline
;
pub
(
crate
)
mod
offline
;
...
@@ -12,6 +13,9 @@ use std::collections::VecDeque;
...
@@ -12,6 +13,9 @@ use std::collections::VecDeque;
use
crate
::
common
::
protocols
::{
DirectRequest
,
MockEngineArgs
};
use
crate
::
common
::
protocols
::{
DirectRequest
,
MockEngineArgs
};
pub
use
artifacts
::{
ReplayTimedKvEvent
,
ReplayTimedOutputSignal
,
ReplayTimedRequest
,
ReplayWorkerArtifacts
,
};
pub
(
crate
)
use
collector
::
TraceCollector
;
pub
(
crate
)
use
collector
::
TraceCollector
;
#[cfg(test)]
#[cfg(test)]
pub
(
crate
)
use
collector
::
TraceRequestStatsSnapshot
;
pub
(
crate
)
use
collector
::
TraceRequestStatsSnapshot
;
...
@@ -51,12 +55,12 @@ impl OfflineDisaggReplayConfig {
...
@@ -51,12 +55,12 @@ impl OfflineDisaggReplayConfig {
}
}
pub
use
entrypoints
::{
pub
use
entrypoints
::{
simulate_concurrency_fil
e
,
simulate_concurrency_file
_disagg_with_router_mode
,
generate_trace_worker_artifacts_offlin
e
,
simulate_concurrency_file
,
simulate_concurrency_file_with_router_mode
,
simulate_concurrency_
live_fil
e
,
simulate_concurrency_file_
disagg_
with_router_mode
,
simulate_concurrency_
file_with_router_mod
e
,
simulate_concurrency_live_file
_with_router_mode
,
simulate_concurrency_live_
requests
,
simulate_concurrency_live_file
,
simulate_concurrency_live_
file_with_router_mode
,
simulate_concurrency_live_requests
_with_router_mode
,
simulate_concurrency_live_
workload
,
simulate_concurrency_live_requests
,
simulate_concurrency_live_
requests_with_router_mode
,
simulate_concurrency_live_workload
_with_router_mode
,
simulate_concurrency_
requests
,
simulate_concurrency_live_workload
,
simulate_concurrency_
live_workload_with_router_mode
,
simulate_concurrency_requests_disagg_with_router_mode
,
simulate_concurrency_requests
,
simulate_concurrency_requests_disagg_with_router_mode
,
simulate_concurrency_requests_with_router_mode
,
simulate_concurrency_workload
,
simulate_concurrency_requests_with_router_mode
,
simulate_concurrency_workload
,
simulate_concurrency_workload_disagg_with_router_mode
,
simulate_concurrency_workload_disagg_with_router_mode
,
simulate_concurrency_workload_with_router_mode
,
simulate_trace_file
,
simulate_concurrency_workload_with_router_mode
,
simulate_trace_file
,
...
...
lib/mocker/src/replay/offline/core.rs
View file @
c08034eb
...
@@ -4,6 +4,7 @@
...
@@ -4,6 +4,7 @@
use
crate
::
common
::
protocols
::
MockEngineArgs
;
use
crate
::
common
::
protocols
::
MockEngineArgs
;
use
crate
::
replay
::
TraceCollector
;
use
crate
::
replay
::
TraceCollector
;
use
crate
::
scheduler
::{
EngineCore
,
EnginePassResult
,
SglangCore
,
VllmCore
};
use
crate
::
scheduler
::{
EngineCore
,
EnginePassResult
,
SglangCore
,
VllmCore
};
use
dynamo_kv_router
::
protocols
::
WorkerId
;
pub
(
crate
)
struct
ReplayWorkerCore
{
pub
(
crate
)
struct
ReplayWorkerCore
{
core
:
EngineCore
,
core
:
EngineCore
,
...
@@ -20,6 +21,18 @@ impl ReplayWorkerCore {
...
@@ -20,6 +21,18 @@ impl ReplayWorkerCore {
Self
{
core
}
Self
{
core
}
}
}
pub
(
crate
)
fn
new_with_kv_capture
(
args
:
MockEngineArgs
,
worker_id
:
WorkerId
)
->
Self
{
let
core
=
match
args
.engine_type
{
crate
::
common
::
protocols
::
EngineType
::
Vllm
=>
{
EngineCore
::
Vllm
(
VllmCore
::
new_with_kv_capture
(
args
,
worker_id
))
}
crate
::
common
::
protocols
::
EngineType
::
Sglang
=>
{
EngineCore
::
Sglang
(
SglangCore
::
new_with_kv_capture
(
args
,
worker_id
))
}
};
Self
{
core
}
}
pub
(
crate
)
fn
is_empty
(
&
self
)
->
bool
{
pub
(
crate
)
fn
is_empty
(
&
self
)
->
bool
{
self
.core
.is_empty
()
self
.core
.is_empty
()
}
}
...
...
lib/mocker/src/replay/offline/entrypoints.rs
View file @
c08034eb
...
@@ -5,10 +5,12 @@ use std::collections::VecDeque;
...
@@ -5,10 +5,12 @@ use std::collections::VecDeque;
use
anyhow
::
Result
;
use
anyhow
::
Result
;
use
dynamo_kv_router
::
config
::
KvRouterConfig
;
use
dynamo_kv_router
::
config
::
KvRouterConfig
;
use
dynamo_kv_router
::
protocols
::
WorkerId
;
#[cfg(test)]
#[cfg(test)]
use
super
::
agg
::
AggRuntimeStats
;
use
super
::
agg
::
AggRuntimeStats
;
use
super
::
agg
::{
AggRuntime
,
ReplayMode
as
AggReplayMode
};
use
super
::
agg
::{
AggRuntime
,
ReplayMode
as
AggReplayMode
};
use
super
::
core
::
ReplayWorkerCore
;
#[cfg(test)]
#[cfg(test)]
use
super
::
disagg
::
DisaggRuntimeStats
;
use
super
::
disagg
::
DisaggRuntimeStats
;
use
super
::
disagg
::{
DisaggRuntime
,
ReplayMode
as
DisaggReplayMode
};
use
super
::
disagg
::{
DisaggRuntime
,
ReplayMode
as
DisaggReplayMode
};
...
@@ -17,9 +19,89 @@ use super::single::{SingleReplayMode, SingleRuntime};
...
@@ -17,9 +19,89 @@ use super::single::{SingleReplayMode, SingleRuntime};
use
crate
::
common
::
protocols
::{
DirectRequest
,
EngineType
,
MockEngineArgs
};
use
crate
::
common
::
protocols
::{
DirectRequest
,
EngineType
,
MockEngineArgs
};
use
crate
::
loadgen
::{
Trace
,
WorkloadDriver
};
use
crate
::
loadgen
::{
Trace
,
WorkloadDriver
};
use
crate
::
replay
::
OfflineDisaggReplayConfig
;
use
crate
::
replay
::
OfflineDisaggReplayConfig
;
#[cfg(test)]
use
crate
::
replay
::{
use
crate
::
replay
::
TraceCollector
;
ReplayRouterMode
,
ReplayTimedKvEvent
,
ReplayTimedOutputSignal
,
ReplayTimedRequest
,
use
crate
::
replay
::{
ReplayRouterMode
,
TraceSimulationReport
};
ReplayWorkerArtifacts
,
TraceCollector
,
TraceSimulationReport
,
};
use
crate
::
scheduler
::
RouterEventVisibility
;
fn
timestamp_us_from_ms
(
timestamp_ms
:
f64
)
->
u64
{
if
!
timestamp_ms
.is_finite
()
||
timestamp_ms
<=
0.0
{
return
0
;
}
(
timestamp_ms
*
1000.0
)
as
u64
}
pub
(
crate
)
fn
generate_trace_worker_artifacts
(
args
:
MockEngineArgs
,
trace
:
Trace
,
)
->
Result
<
ReplayWorkerArtifacts
>
{
let
mut
worker
=
ReplayWorkerCore
::
new_with_kv_capture
(
args
,
WorkerId
::
default
());
let
mut
driver
=
trace
.into_trace_driver
()
?
;
let
mut
collector
=
TraceCollector
::
default
();
let
mut
artifacts
=
ReplayWorkerArtifacts
::
default
();
let
mut
current_time_ms
=
0.0
;
while
!
driver
.is_drained
()
||
!
worker
.is_empty
()
{
for
ready_turn
in
driver
.pop_ready
(
current_time_ms
,
usize
::
MAX
)
{
let
replay_hashes
=
ready_turn
.replay_hashes
.ok_or_else
(||
anyhow
::
anyhow!
(
"offline artifacts require synthesized hashes"
))
?
;
collector
.on_arrival
(
ready_turn
.request_uuid
,
ready_turn
.scheduled_ready_at_ms
,
ready_turn
.request.tokens
.len
(),
ready_turn
.request.max_output_tokens
,
);
artifacts
.requests
.push
(
ReplayTimedRequest
{
uuid
:
ready_turn
.request_uuid
,
timestamp_us
:
timestamp_us_from_ms
(
current_time_ms
),
scheduled_ready_at_ms
:
ready_turn
.scheduled_ready_at_ms
,
input_length
:
ready_turn
.request.tokens
.len
(),
output_length
:
ready_turn
.request.max_output_tokens
,
replay_hashes
,
});
worker
.receive
(
ready_turn
.request
);
}
if
worker
.is_empty
()
{
let
Some
(
next_ready_ms
)
=
driver
.next_ready_time_ms
()
else
{
break
;
};
current_time_ms
=
next_ready_ms
;
continue
;
}
let
pass_start_ms
=
current_time_ms
;
let
pass
=
worker
.execute_pass
(
&
mut
collector
,
current_time_ms
);
current_time_ms
=
pass
.end_ms
;
let
kv_event_timestamp_us
=
match
pass
.router_event_visibility
{
RouterEventVisibility
::
PassStart
=>
timestamp_us_from_ms
(
pass_start_ms
),
RouterEventVisibility
::
PassEnd
=>
timestamp_us_from_ms
(
current_time_ms
),
};
artifacts
.kv_events
.extend
(
pass
.kv_events
.into_iter
()
.map
(|
event
|
ReplayTimedKvEvent
{
event
:
event
.event
,
timestamp_us
:
kv_event_timestamp_us
,
}));
let
output_timestamp_us
=
timestamp_us_from_ms
(
current_time_ms
);
for
signal
in
pass
.output_signals
{
if
signal
.completed
{
driver
.on_complete
(
signal
.uuid
,
current_time_ms
)
?
;
}
artifacts
.output_signals
.push
(
ReplayTimedOutputSignal
{
signal
,
timestamp_us
:
output_timestamp_us
,
});
}
}
Ok
(
artifacts
)
}
pub
(
crate
)
fn
simulate_trace
(
pub
(
crate
)
fn
simulate_trace
(
args
:
MockEngineArgs
,
args
:
MockEngineArgs
,
...
@@ -493,3 +575,68 @@ pub(super) fn run_concurrency_collect(
...
@@ -493,3 +575,68 @@ pub(super) fn run_concurrency_collect(
.run
()
.run
()
.unwrap
()
.unwrap
()
}
}
#[cfg(test)]
mod
tests
{
use
super
::
generate_trace_worker_artifacts
;
use
crate
::
common
::
protocols
::
MockEngineArgs
;
use
crate
::
loadgen
::{
SessionTrace
,
Trace
,
TurnTrace
};
#[test]
fn
test_generate_trace_worker_artifacts_emits_monotonic_event_timestamps
()
{
let
args
=
MockEngineArgs
::
builder
()
.block_size
(
2
)
.num_gpu_blocks
(
1024
)
.max_num_batched_tokens
(
None
)
.max_num_seqs
(
None
)
.enable_prefix_caching
(
true
)
.speedup_ratio
(
1000.0
)
.build
()
.unwrap
();
let
trace
=
Trace
{
block_size
:
2
,
sessions
:
vec!
[
SessionTrace
{
session_id
:
"session-a"
.to_string
(),
first_arrival_timestamp_ms
:
Some
(
0.0
),
turns
:
vec!
[
TurnTrace
{
input_length
:
4
,
max_output_tokens
:
2
,
hash_ids
:
vec!
[
1
,
2
],
delay_after_previous_ms
:
0.0
,
},
TurnTrace
{
input_length
:
4
,
max_output_tokens
:
2
,
hash_ids
:
vec!
[
3
,
4
],
delay_after_previous_ms
:
5.0
,
},
],
}],
};
let
artifacts
=
generate_trace_worker_artifacts
(
args
,
trace
)
.unwrap
();
assert_eq!
(
artifacts
.requests
.len
(),
2
);
assert
!
(
!
artifacts
.kv_events
.is_empty
());
assert
!
(
artifacts
.kv_events
.windows
(
2
)
.all
(|
events
|
events
[
0
]
.timestamp_us
<=
events
[
1
]
.timestamp_us
)
);
let
first_uuid
=
artifacts
.requests
[
0
]
.uuid
;
let
first_completion_ms
=
artifacts
.output_signals
.iter
()
.find
(|
signal
|
signal
.signal.uuid
==
first_uuid
&&
signal
.signal.completed
)
.expect
(
"first request must complete"
)
.timestamp_us
as
f64
/
1000.0
;
assert
!
(
artifacts
.requests
[
1
]
.scheduled_ready_at_ms
+
0.1
>=
first_completion_ms
+
5.0
,
"expected second request to wait for completion plus delay"
);
}
}
lib/mocker/src/replay/offline/mod.rs
View file @
c08034eb
...
@@ -13,7 +13,7 @@ pub(crate) mod single;
...
@@ -13,7 +13,7 @@ pub(crate) mod single;
pub
(
crate
)
mod
state
;
pub
(
crate
)
mod
state
;
pub
(
crate
)
use
entrypoints
::{
pub
(
crate
)
use
entrypoints
::{
simulate_concurrency
,
simulate_concurrency_disagg
,
simulate_concurrency_workload
,
generate_trace_worker_artifacts
,
simulate_concurrency
,
simulate_concurrency_disagg
,
simulate_concurrency_workload
_disagg
,
simulate_
trace
,
simulate_trace
_disagg
,
simulate_concurrency_workload
,
simulate_
concurrency_workload_disagg
,
simulate_trace
,
simulate_trace_workload
,
simulate_trace_workload_disagg
,
simulate_trace_disagg
,
simulate_trace_workload
,
simulate_trace_workload_disagg
,
};
};
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment