Unverified Commit d42ff092 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

fix(mocker): restore disagg cache reuse accounting (#7938)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 3205e7db
...@@ -62,7 +62,10 @@ impl EngineComponent { ...@@ -62,7 +62,10 @@ impl EngineComponent {
EnginePassMode::Hidden => self.workers[worker_idx].execute_hidden_pass(now_ms), EnginePassMode::Hidden => self.workers[worker_idx].execute_hidden_pass(now_ms),
}; };
let mut effects = EngineEffects::default(); let mut effects = EngineEffects {
admissions: executed.admissions,
..EngineEffects::default()
};
let completion_kv_events = let completion_kv_events =
if executed.router_event_visibility == RouterEventVisibility::PassStart { if executed.router_event_visibility == RouterEventVisibility::PassStart {
effects.pass_start_kv_events = executed.kv_events; effects.pass_start_kv_events = executed.kv_events;
......
...@@ -7,6 +7,7 @@ use uuid::Uuid; ...@@ -7,6 +7,7 @@ use uuid::Uuid;
use super::super::runtime_utils::WorkerCompletionPayload; use super::super::runtime_utils::WorkerCompletionPayload;
use crate::common::protocols::DirectRequest; use crate::common::protocols::DirectRequest;
use crate::loadgen::ReplayRequestHashes; use crate::loadgen::ReplayRequestHashes;
use crate::scheduler::AdmissionEvent;
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
pub(in crate::replay::offline) enum ReplayMode { pub(in crate::replay::offline) enum ReplayMode {
...@@ -34,6 +35,7 @@ pub(in crate::replay::offline) struct ScheduledWorkerCompletion { ...@@ -34,6 +35,7 @@ pub(in crate::replay::offline) struct ScheduledWorkerCompletion {
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub(in crate::replay::offline) struct EngineEffects { pub(in crate::replay::offline) struct EngineEffects {
pub(in crate::replay::offline) admissions: Vec<AdmissionEvent>,
pub(in crate::replay::offline) pass_start_kv_events: Vec<RouterEvent>, pub(in crate::replay::offline) pass_start_kv_events: Vec<RouterEvent>,
pub(in crate::replay::offline) immediate_completions: Vec<WorkerCompletionPayload>, pub(in crate::replay::offline) immediate_completions: Vec<WorkerCompletionPayload>,
pub(in crate::replay::offline) scheduled_completions: Vec<ScheduledWorkerCompletion>, pub(in crate::replay::offline) scheduled_completions: Vec<ScheduledWorkerCompletion>,
...@@ -41,7 +43,8 @@ pub(in crate::replay::offline) struct EngineEffects { ...@@ -41,7 +43,8 @@ pub(in crate::replay::offline) struct EngineEffects {
impl EngineEffects { impl EngineEffects {
pub(in crate::replay::offline) fn is_empty(&self) -> bool { pub(in crate::replay::offline) fn is_empty(&self) -> bool {
self.pass_start_kv_events.is_empty() self.admissions.is_empty()
&& self.pass_start_kv_events.is_empty()
&& self.immediate_completions.is_empty() && self.immediate_completions.is_empty()
&& self.scheduled_completions.is_empty() && self.scheduled_completions.is_empty()
} }
......
...@@ -27,6 +27,7 @@ use crate::loadgen::{ReplayRequestHashes, WorkloadDriver}; ...@@ -27,6 +27,7 @@ use crate::loadgen::{ReplayRequestHashes, WorkloadDriver};
use crate::replay::{ use crate::replay::{
OfflineDisaggReplayConfig, ReplayPrefillLoadEstimator, ReplayRouterMode, TraceCollector, OfflineDisaggReplayConfig, ReplayPrefillLoadEstimator, ReplayRouterMode, TraceCollector,
}; };
use crate::scheduler::AdmissionEvent;
#[cfg(test)] #[cfg(test)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
...@@ -625,6 +626,7 @@ impl DisaggRuntime { ...@@ -625,6 +626,7 @@ impl DisaggRuntime {
} }
fn handle_prefill_engine_effects(&mut self, effects: EngineEffects) -> Result<()> { fn handle_prefill_engine_effects(&mut self, effects: EngineEffects) -> Result<()> {
self.record_prefill_admissions(effects.admissions);
self.apply_prefill_router_events(effects.pass_start_kv_events)?; self.apply_prefill_router_events(effects.pass_start_kv_events)?;
for payload in effects.immediate_completions { for payload in effects.immediate_completions {
let payload = self.prefill_engine.on_scheduled_completion(payload)?; let payload = self.prefill_engine.on_scheduled_completion(payload)?;
...@@ -641,6 +643,13 @@ impl DisaggRuntime { ...@@ -641,6 +643,13 @@ impl DisaggRuntime {
Ok(()) Ok(())
} }
fn record_prefill_admissions(&mut self, admissions: Vec<AdmissionEvent>) {
for admission in admissions {
self.collector
.on_admit(admission.uuid, self.now_ms, admission.reused_input_tokens);
}
}
fn handle_decode_engine_effects(&mut self, effects: EngineEffects) -> Result<()> { fn handle_decode_engine_effects(&mut self, effects: EngineEffects) -> Result<()> {
for payload in effects.immediate_completions { for payload in effects.immediate_completions {
let payload = self.decode_engine.on_scheduled_completion(payload)?; let payload = self.decode_engine.on_scheduled_completion(payload)?;
...@@ -970,6 +979,29 @@ mod tests { ...@@ -970,6 +979,29 @@ mod tests {
} }
} }
#[test]
fn test_hidden_prefill_reports_reused_tokens_even_when_decode_prefix_caching_is_disabled() {
let mut config = disagg_config();
config.num_prefill_workers = 1;
config.num_decode_workers = 1;
config.decode_args.enable_prefix_caching = false;
let requests = vec![request(1, 128, 2, 0.0), request(2, 128, 2, 100.0)];
let (collector, _) = run_trace_collect(
&config,
requests,
Some(router_config()),
1.0,
ReplayRouterMode::KvRouter,
);
let request_2 = collector.snapshot(Uuid::from_u128(2)).unwrap();
let report = collector.finish();
assert!(request_2.reused_input_tokens > 0);
assert!(report.prefix_cache_reused_ratio > 0.0);
}
#[rstest::rstest] #[rstest::rstest]
#[case(ReplayRouterMode::RoundRobin)] #[case(ReplayRouterMode::RoundRobin)]
#[case(ReplayRouterMode::KvRouter)] #[case(ReplayRouterMode::KvRouter)]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment