Unverified Commit b92c9593 authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

fix(replay): drain traffic metrics only on throughput ticks (#8232)


Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
Co-authored-by: default avatarClaude Opus 4.6 (1M context) <noreply@anthropic.com>
parent d96a2cf1
...@@ -369,7 +369,7 @@ class ReplayPlannerAdapter: ...@@ -369,7 +369,7 @@ class ReplayPlannerAdapter:
traffic = None traffic = None
if tick.need_traffic_metrics: if tick.need_traffic_metrics:
t = result.get("traffic", {}) t = self._bridge.drain_traffic()
duration_s = t.get("duration_s", 0.0) duration_s = t.get("duration_s", 0.0)
if duration_s > 0: if duration_s > 0:
traffic = TrafficObservation( traffic = TrafficObservation(
......
...@@ -1317,6 +1317,8 @@ impl PlannerReplayBridge { ...@@ -1317,6 +1317,8 @@ impl PlannerReplayBridge {
/// Advance the simulation to `until_ms` simulated time. /// Advance the simulation to `until_ms` simulated time.
/// ///
/// Returns a dict with separate prefill/decode worker counts and FPM snapshots. /// Returns a dict with separate prefill/decode worker counts and FPM snapshots.
/// Traffic metrics are NOT included — call `drain_traffic()` explicitly on
/// throughput-scaling ticks only.
fn advance_to(&mut self, py: Python<'_>, until_ms: f64) -> PyResult<PyObject> { fn advance_to(&mut self, py: Python<'_>, until_ms: f64) -> PyResult<PyObject> {
let handle = self let handle = self
.handle .handle
...@@ -1324,19 +1326,12 @@ impl PlannerReplayBridge { ...@@ -1324,19 +1326,12 @@ impl PlannerReplayBridge {
.ok_or_else(|| PyException::new_err("bridge has been finalized"))?; .ok_or_else(|| PyException::new_err("bridge has been finalized"))?;
let tick_data = handle.advance_to(until_ms).map_err(to_pyerr)?; let tick_data = handle.advance_to(until_ms).map_err(to_pyerr)?;
let (duration_s, num_req, avg_isl, avg_osl) = tick_data.traffic;
let result = json!({ let result = json!({
"now_ms": tick_data.now_ms, "now_ms": tick_data.now_ms,
"is_done": tick_data.is_done, "is_done": tick_data.is_done,
"prefill_fpm_snapshots": fpm_snapshots_to_json(tick_data.prefill_fpm_snapshots), "prefill_fpm_snapshots": fpm_snapshots_to_json(tick_data.prefill_fpm_snapshots),
"decode_fpm_snapshots": fpm_snapshots_to_json(tick_data.decode_fpm_snapshots), "decode_fpm_snapshots": fpm_snapshots_to_json(tick_data.decode_fpm_snapshots),
"traffic": {
"duration_s": duration_s,
"num_req": num_req,
"avg_isl": avg_isl,
"avg_osl": avg_osl,
},
"active_prefill_count": tick_data.active_prefill_count, "active_prefill_count": tick_data.active_prefill_count,
"active_decode_count": tick_data.active_decode_count, "active_decode_count": tick_data.active_decode_count,
"total_prefill_count": tick_data.total_prefill_count, "total_prefill_count": tick_data.total_prefill_count,
...@@ -1348,6 +1343,31 @@ impl PlannerReplayBridge { ...@@ -1348,6 +1343,31 @@ impl PlannerReplayBridge {
.map(|obj| obj.unbind()) .map(|obj| obj.unbind())
} }
/// Drain accumulated traffic metrics since the last drain.
///
/// Returns a dict with `duration_s`, `num_req`, `avg_isl`, `avg_osl`.
/// Call this only on throughput-scaling ticks so the observation window
/// covers the full `throughput_adjustment_interval`.
fn drain_traffic(&mut self, py: Python<'_>) -> PyResult<PyObject> {
let handle = self
.handle
.as_mut()
.ok_or_else(|| PyException::new_err("bridge has been finalized"))?;
let (duration_s, num_req, avg_isl, avg_osl) = handle.drain_traffic();
let result = json!({
"duration_s": duration_s,
"num_req": num_req,
"avg_isl": avg_isl,
"avg_osl": avg_osl,
});
pythonize(py, &result)
.map_err(to_pyerr)
.map(|obj| obj.unbind())
}
/// Apply a scaling decision with separate prefill and decode targets. /// Apply a scaling decision with separate prefill and decode targets.
/// For agg mode, `target_prefill` is ignored (pass 0). /// For agg mode, `target_prefill` is ignored (pass 0).
fn apply_scaling(&mut self, target_prefill: usize, target_decode: usize) -> PyResult<()> { fn apply_scaling(&mut self, target_prefill: usize, target_decode: usize) -> PyResult<()> {
......
...@@ -27,6 +27,11 @@ use crate::loadgen::Trace; ...@@ -27,6 +27,11 @@ use crate::loadgen::Trace;
/// ///
/// For aggregated mode, prefill fields are 0 and all data is in decode fields /// For aggregated mode, prefill fields are 0 and all data is in decode fields
/// (matching how the planner treats agg as a single decode-stage engine). /// (matching how the planner treats agg as a single decode-stage engine).
///
/// Traffic metrics are NOT included here — they accumulate across ticks and
/// must be drained explicitly via [`PlannerReplayHandle::drain_traffic`] on
/// throughput-scaling ticks only. Draining on every tick would discard data
/// between the more frequent load-scaling ticks.
pub struct PlannerTickData { pub struct PlannerTickData {
/// Current simulated time in milliseconds. /// Current simulated time in milliseconds.
pub now_ms: f64, pub now_ms: f64,
...@@ -36,8 +41,6 @@ pub struct PlannerTickData { ...@@ -36,8 +41,6 @@ pub struct PlannerTickData {
pub prefill_fpm_snapshots: Vec<(usize, ForwardPassSnapshot)>, pub prefill_fpm_snapshots: Vec<(usize, ForwardPassSnapshot)>,
/// Decode (or agg) FPM snapshots since last tick: (worker_id, snapshot). /// Decode (or agg) FPM snapshots since last tick: (worker_id, snapshot).
pub decode_fpm_snapshots: Vec<(usize, ForwardPassSnapshot)>, pub decode_fpm_snapshots: Vec<(usize, ForwardPassSnapshot)>,
/// Traffic observation: (duration_s, num_req, avg_isl, avg_osl).
pub traffic: (f64, usize, f64, f64),
/// Active prefill workers (0 for agg mode). /// Active prefill workers (0 for agg mode).
pub active_prefill_count: usize, pub active_prefill_count: usize,
/// Active decode workers (or total active for agg mode). /// Active decode workers (or total active for agg mode).
...@@ -120,18 +123,19 @@ impl PlannerReplayHandle { ...@@ -120,18 +123,19 @@ impl PlannerReplayHandle {
} }
/// Advance the simulation up to `until_ms`, collect metrics, return tick data. /// Advance the simulation up to `until_ms`, collect metrics, return tick data.
///
/// Traffic metrics are NOT drained here — call [`drain_traffic`] explicitly
/// on throughput-scaling ticks so the accumulator covers the full interval.
pub fn advance_to(&mut self, until_ms: f64) -> Result<PlannerTickData> { pub fn advance_to(&mut self, until_ms: f64) -> Result<PlannerTickData> {
match &mut self.runtime { match &mut self.runtime {
RuntimeKind::Agg(rt) => { RuntimeKind::Agg(rt) => {
let is_done = rt.advance_to(until_ms)?; let is_done = rt.advance_to(until_ms)?;
let fpm = rt.drain_fpm(); let fpm = rt.drain_fpm();
let traffic = rt.drain_traffic();
Ok(PlannerTickData { Ok(PlannerTickData {
now_ms: rt.now_ms(), now_ms: rt.now_ms(),
is_done, is_done,
prefill_fpm_snapshots: Vec::new(), prefill_fpm_snapshots: Vec::new(),
decode_fpm_snapshots: fpm, decode_fpm_snapshots: fpm,
traffic,
active_prefill_count: 0, active_prefill_count: 0,
active_decode_count: rt.active_worker_count(), active_decode_count: rt.active_worker_count(),
total_prefill_count: 0, total_prefill_count: 0,
...@@ -142,13 +146,11 @@ impl PlannerReplayHandle { ...@@ -142,13 +146,11 @@ impl PlannerReplayHandle {
let is_done = rt.advance_to(until_ms)?; let is_done = rt.advance_to(until_ms)?;
let prefill_fpm = rt.drain_prefill_fpm(); let prefill_fpm = rt.drain_prefill_fpm();
let decode_fpm = rt.drain_decode_fpm(); let decode_fpm = rt.drain_decode_fpm();
let traffic = rt.drain_traffic();
Ok(PlannerTickData { Ok(PlannerTickData {
now_ms: rt.now_ms(), now_ms: rt.now_ms(),
is_done, is_done,
prefill_fpm_snapshots: prefill_fpm, prefill_fpm_snapshots: prefill_fpm,
decode_fpm_snapshots: decode_fpm, decode_fpm_snapshots: decode_fpm,
traffic,
active_prefill_count: rt.active_prefill_count(), active_prefill_count: rt.active_prefill_count(),
active_decode_count: rt.active_decode_count(), active_decode_count: rt.active_decode_count(),
total_prefill_count: rt.total_prefill_count(), total_prefill_count: rt.total_prefill_count(),
...@@ -158,6 +160,18 @@ impl PlannerReplayHandle { ...@@ -158,6 +160,18 @@ impl PlannerReplayHandle {
} }
} }
/// Drain accumulated traffic metrics since the last drain.
///
/// Returns `(duration_s, num_req, avg_isl, avg_osl)`. Call this only on
/// throughput-scaling ticks so the window covers the full
/// `throughput_adjustment_interval`, not just the gap between load ticks.
pub fn drain_traffic(&mut self) -> (f64, usize, f64, f64) {
match &mut self.runtime {
RuntimeKind::Agg(rt) => rt.drain_traffic(),
RuntimeKind::Disagg(rt) => rt.drain_traffic(),
}
}
/// Apply a scaling decision with separate prefill and decode targets. /// Apply a scaling decision with separate prefill and decode targets.
/// For agg mode, `target_prefill` is ignored. /// For agg mode, `target_prefill` is ignored.
pub fn apply_scaling(&mut self, target_prefill: usize, target_decode: usize) -> Result<()> { pub fn apply_scaling(&mut self, target_prefill: usize, target_decode: usize) -> Result<()> {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment