"ssh:/git@developer.sourcefind.cn:2222/OpenDAS/vllm_cscc.git" did not exist on "18be11fd59cd3bf1082170ca638ebdfa384e7ed6"
Unverified Commit 1ebe7060 authored by Jonathan Tong's avatar Jonathan Tong Committed by GitHub
Browse files

feat: add KV cache transfer estimated latency metric for disaggregated serving (#7590)


Signed-off-by: default avatarJont828 <jt572@cornell.edu>
Co-authored-by: default avatarHongkuan Zhou <tedzhouhk@gmail.com>
parent e41e1715
...@@ -46,9 +46,11 @@ impl<'a> PythonGenerator<'a> { ...@@ -46,9 +46,11 @@ impl<'a> PythonGenerator<'a> {
let mut module_names: Vec<&String> = self.modules.keys().collect(); let mut module_names: Vec<&String> = self.modules.keys().collect();
module_names.sort(); module_names.sort();
let total = module_names.len();
// Generate simple classes with constants as class attributes // Generate simple classes with constants as class attributes
for module_name in module_names { for (idx, module_name) in module_names.iter().enumerate() {
let module = &self.modules[module_name]; let module = &self.modules[module_name.as_str()];
lines.push(format!("class {}:", module_name)); lines.push(format!("class {}:", module_name));
// Use doc comment from module if available // Use doc comment from module if available
...@@ -58,20 +60,30 @@ impl<'a> PythonGenerator<'a> { ...@@ -58,20 +60,30 @@ impl<'a> PythonGenerator<'a> {
lines.push(format!(" \"\"\"{}\"\"\"", first_line)); lines.push(format!(" \"\"\"{}\"\"\"", first_line));
} }
} }
lines.push("".to_string());
for constant in &module.constants { if !module.constants.is_empty() {
if !constant.doc_comment.is_empty() { lines.push("".to_string());
for comment_line in constant.doc_comment.lines() { for constant in &module.constants {
lines.push(format!(" # {}", comment_line)); if !constant.doc_comment.is_empty() {
for comment_line in constant.doc_comment.lines() {
lines.push(format!(" # {}", comment_line));
}
} }
lines.push(format!(" {} = \"{}\"", constant.name, constant.value));
} }
lines.push(format!(" {} = \"{}\"", constant.name, constant.value));
} }
lines.push("".to_string()); // PEP 8 / black requires two blank lines between top-level class definitions,
// but no trailing blank lines at end of file.
if idx + 1 < total {
lines.push("".to_string());
lines.push("".to_string());
}
} }
// End file with a single trailing newline (no blank lines after last class)
lines.push("".to_string());
lines.join("\n") lines.join("\n")
} }
} }
......
...@@ -83,6 +83,8 @@ class frontend_service: ...@@ -83,6 +83,8 @@ class frontend_service:
OUTPUT_SEQUENCE_TOKENS = "output_sequence_tokens" OUTPUT_SEQUENCE_TOKENS = "output_sequence_tokens"
# Predicted KV cache hit rate at routing time (0.0-1.0) # Predicted KV cache hit rate at routing time (0.0-1.0)
KV_HIT_RATE = "kv_hit_rate" KV_HIT_RATE = "kv_hit_rate"
# Upper-bound estimation of KV cache transfer latency in disaggregated serving (seconds)
KV_TRANSFER_ESTIMATED_LATENCY_SECONDS = "kv_transfer_estimated_latency_seconds"
# Number of cached tokens (prefix cache hits) per request # Number of cached tokens (prefix cache hits) per request
CACHED_TOKENS = "cached_tokens" CACHED_TOKENS = "cached_tokens"
# Tokenizer latency in milliseconds # Tokenizer latency in milliseconds
......
...@@ -327,6 +327,7 @@ pub struct RouterRequestMetrics { ...@@ -327,6 +327,7 @@ pub struct RouterRequestMetrics {
pub input_sequence_tokens: prometheus::Histogram, pub input_sequence_tokens: prometheus::Histogram,
pub output_sequence_tokens: prometheus::Histogram, pub output_sequence_tokens: prometheus::Histogram,
pub kv_hit_rate: prometheus::Histogram, pub kv_hit_rate: prometheus::Histogram,
pub kv_transfer_estimated_latency_seconds: prometheus::Histogram,
} }
static ROUTER_REQUEST_METRICS: OnceLock<Arc<RouterRequestMetrics>> = OnceLock::new(); static ROUTER_REQUEST_METRICS: OnceLock<Arc<RouterRequestMetrics>> = OnceLock::new();
...@@ -393,6 +394,14 @@ impl RouterRequestMetrics { ...@@ -393,6 +394,14 @@ impl RouterRequestMetrics {
Some(prometheus::linear_buckets(0.0, 0.05, 21).unwrap()), Some(prometheus::linear_buckets(0.0, 0.05, 21).unwrap()),
) )
.expect("failed to create router_kv_hit_rate"); .expect("failed to create router_kv_hit_rate");
let kv_transfer_estimated_latency_seconds = metrics
.create_histogram(
&router_metric(frontend_service::KV_TRANSFER_ESTIMATED_LATENCY_SECONDS),
"Upper-bound estimation of KV cache transfer latency in disaggregated serving (prefill_complete to first_token)",
extra_labels,
Some(generate_log_buckets(0.001, 10.0, 15)),
)
.expect("failed to create router_kv_transfer_estimated_latency_seconds");
Arc::new(Self { Arc::new(Self {
requests_total, requests_total,
time_to_first_token_seconds, time_to_first_token_seconds,
...@@ -400,6 +409,7 @@ impl RouterRequestMetrics { ...@@ -400,6 +409,7 @@ impl RouterRequestMetrics {
input_sequence_tokens, input_sequence_tokens,
output_sequence_tokens, output_sequence_tokens,
kv_hit_rate, kv_hit_rate,
kv_transfer_estimated_latency_seconds,
}) })
}) })
.clone() .clone()
...@@ -614,4 +624,47 @@ dynamo_frontend_router_queue_pending_requests{worker_type=\"decode\"} 5 ...@@ -614,4 +624,47 @@ dynamo_frontend_router_queue_pending_requests{worker_type=\"decode\"} 5
); );
// Reaching here without panic confirms saturating_sub works // Reaching here without panic confirms saturating_sub works
} }
#[test]
fn test_kv_transfer_estimated_latency_metric_pef() {
// Verify the metric name is correctly composed from the constant
// and produces valid PEF when observed.
let registry = prometheus::Registry::new();
let name = format!(
"{}{}",
router_request::METRIC_PREFIX,
frontend_service::KV_TRANSFER_ESTIMATED_LATENCY_SECONDS,
);
let buckets = generate_log_buckets(0.001, 10.0, 15);
let histogram = prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
&name,
"Upper-bound estimation of KV cache transfer latency in disaggregated serving (prefill_complete to first_token)",
)
.buckets(buckets),
)
.unwrap();
registry.register(Box::new(histogram.clone())).unwrap();
// Observe a 5ms latency
histogram.observe(0.005);
let output = gather_pef(&registry);
assert!(
output.contains("# HELP router_kv_transfer_estimated_latency_seconds"),
"PEF missing HELP line. Got:\n{output}"
);
assert!(
output.contains("# TYPE router_kv_transfer_estimated_latency_seconds histogram"),
"PEF missing TYPE line. Got:\n{output}"
);
assert!(
output.contains("router_kv_transfer_estimated_latency_seconds_count 1"),
"PEF missing observation count. Got:\n{output}"
);
assert!(
output.contains("router_kv_transfer_estimated_latency_seconds_sum 0.005"),
"PEF missing observation sum. Got:\n{output}"
);
}
} }
...@@ -128,6 +128,9 @@ impl PrefillRouter { ...@@ -128,6 +128,9 @@ impl PrefillRouter {
phase_transition_permit: Option<OwnedSemaphorePermit>, phase_transition_permit: Option<OwnedSemaphorePermit>,
) -> Result<(PrefillResult, Option<(u64, Option<u32>)>), PrefillError> { ) -> Result<(PrefillResult, Option<(u64, Option<u32>)>), PrefillError> {
let router = router.ok_or(PrefillError::NotActivated)?; let router = router.ok_or(PrefillError::NotActivated)?;
// Clone tracker before request is consumed by generate_to_worker.
// Used to record prefill_complete_time for KV transfer latency metric.
let tracker = request.tracker.clone();
let mut prefill_response = router let mut prefill_response = router
.generate_to_worker(request, target_worker) .generate_to_worker(request, target_worker)
.await .await
...@@ -149,6 +152,12 @@ impl PrefillRouter { ...@@ -149,6 +152,12 @@ impl PrefillRouter {
)); ));
}; };
// Record when prefill result arrived at the router (for KV transfer latency metric).
// This is after drop(phase_transition_permit) and after first_output is received.
if let Some(ref tracker) = tracker {
tracker.record_prefill_complete();
}
if let Some(err) = first_output.err() { if let Some(err) = first_output.err() {
return Err(PrefillError::PrefillError( return Err(PrefillError::PrefillError(
"Prefill router returned error in output".to_string(), "Prefill router returned error in output".to_string(),
......
...@@ -89,6 +89,12 @@ impl RequestGuard { ...@@ -89,6 +89,12 @@ impl RequestGuard {
if !self.first_token_recorded && new_tokens > 0 { if !self.first_token_recorded && new_tokens > 0 {
if let Some(ref tracker) = self.tracker { if let Some(ref tracker) = self.tracker {
tracker.record_first_token(); tracker.record_first_token();
// Record decode-phase first token for KV transfer latency metric.
// In disaggregated serving, first_token_time is locked by the prefill phase,
// so we need a separate timestamp for the decode worker's first token.
if tracker.phase() == RequestPhase::Decode {
tracker.record_decode_first_token();
}
if let Some(ttft) = tracker.ttft_ms() { if let Some(ttft) = tracker.ttft_ms() {
self.request_metrics self.request_metrics
.time_to_first_token_seconds .time_to_first_token_seconds
...@@ -150,6 +156,12 @@ impl RequestGuard { ...@@ -150,6 +156,12 @@ impl RequestGuard {
if let Some(ref tracker) = self.tracker { if let Some(ref tracker) = self.tracker {
tracker.record_finish(); tracker.record_finish();
tracker.record_osl(self.cumulative_osl); tracker.record_osl(self.cumulative_osl);
// Observe KV transfer estimated latency (disaggregated paths)
if let Some(latency) = tracker.kv_transfer_estimated_latency_secs() {
self.request_metrics
.kv_transfer_estimated_latency_seconds
.observe(latency);
}
} }
self.request_metrics self.request_metrics
.output_sequence_tokens .output_sequence_tokens
......
...@@ -95,6 +95,12 @@ pub struct RequestTracker { ...@@ -95,6 +95,12 @@ pub struct RequestTracker {
/// decode phase's attempt is silently ignored, preserving the real TTFT. /// decode phase's attempt is silently ignored, preserving the real TTFT.
first_token_time: OnceLock<Instant>, first_token_time: OnceLock<Instant>,
/// When the decode worker produced its first token (set once via OnceLock).
/// Separate from `first_token_time` because in disaggregated serving, the prefill
/// phase locks `first_token_time` first. This field captures the decode phase's
/// first token for KV transfer latency estimation (`decode_first_token - prefill_complete`).
decode_first_token_time: OnceLock<Instant>,
/// When the request finished. Mutex allows the last router phase to /// When the request finished. Mutex allows the last router phase to
/// record the final finish time. /// record the final finish time.
request_finish_time: Mutex<Option<Instant>>, request_finish_time: Mutex<Option<Instant>>,
...@@ -157,6 +163,10 @@ pub struct RequestTracker { ...@@ -157,6 +163,10 @@ pub struct RequestTracker {
/// Router scheduler queue depth at routing time (how many requests were pending) /// Router scheduler queue depth at routing time (how many requests were pending)
router_queue_depth: OnceLock<usize>, router_queue_depth: OnceLock<usize>,
/// When the prefill result arrived at the router (disaggregated, original path only).
/// Set in execute_prefill() after the first output is received from the prefill worker.
prefill_complete_time: OnceLock<Instant>,
} }
impl RequestTracker { impl RequestTracker {
...@@ -173,6 +183,7 @@ impl RequestTracker { ...@@ -173,6 +183,7 @@ impl RequestTracker {
request_received_epoch_ms: epoch_ms, request_received_epoch_ms: epoch_ms,
prefill_start_time: OnceLock::new(), prefill_start_time: OnceLock::new(),
first_token_time: OnceLock::new(), first_token_time: OnceLock::new(),
decode_first_token_time: OnceLock::new(),
request_finish_time: Mutex::new(None), request_finish_time: Mutex::new(None),
kv_overlap_blocks: OnceLock::new(), kv_overlap_blocks: OnceLock::new(),
isl_blocks: OnceLock::new(), isl_blocks: OnceLock::new(),
...@@ -191,6 +202,7 @@ impl RequestTracker { ...@@ -191,6 +202,7 @@ impl RequestTracker {
detokenize_total_ns: AtomicU64::new(0), detokenize_total_ns: AtomicU64::new(0),
detokenize_count: AtomicU64::new(0), detokenize_count: AtomicU64::new(0),
router_queue_depth: OnceLock::new(), router_queue_depth: OnceLock::new(),
prefill_complete_time: OnceLock::new(),
} }
} }
...@@ -203,6 +215,12 @@ impl RequestTracker { ...@@ -203,6 +215,12 @@ impl RequestTracker {
let _ = self.first_token_time.set(Instant::now()); let _ = self.first_token_time.set(Instant::now());
} }
/// Record when the decode worker produced its first token.
/// Used for KV transfer latency estimation in disaggregated serving.
pub fn record_decode_first_token(&self) {
let _ = self.decode_first_token_time.set(Instant::now());
}
pub fn record_finish(&self) { pub fn record_finish(&self) {
*self.request_finish_time.lock() = Some(Instant::now()); *self.request_finish_time.lock() = Some(Instant::now());
} }
...@@ -457,6 +475,23 @@ impl RequestTracker { ...@@ -457,6 +475,23 @@ impl RequestTracker {
self.router_queue_depth.get().copied() self.router_queue_depth.get().copied()
} }
/// Record when the prefill result was received by the router.
/// Returns true if this was the first call (OnceLock first-write-wins).
pub fn record_prefill_complete(&self) -> bool {
self.prefill_complete_time.set(Instant::now()).is_ok()
}
/// Upper-bound estimation of KV cache transfer latency in seconds.
/// Computed as `decode_first_token_time - prefill_complete_time`, which captures:
/// router dispatch overhead + network + KV transfer (NIXL) + one decode forward pass.
/// Works for all disaggregated paths (original and bootstrap).
/// Returns None if either timestamp was not recorded.
pub fn kv_transfer_estimated_latency_secs(&self) -> Option<f64> {
let complete = *self.prefill_complete_time.get()?;
let first_tok = *self.decode_first_token_time.get()?;
Some(first_tok.saturating_duration_since(complete).as_secs_f64())
}
/// Get worker ID information if any worker IDs have been recorded. /// Get worker ID information if any worker IDs have been recorded.
pub fn get_worker_info(&self) -> Option<WorkerIdInfo> { pub fn get_worker_info(&self) -> Option<WorkerIdInfo> {
let prefill = self.prefill_worker_id(); let prefill = self.prefill_worker_id();
...@@ -558,6 +593,9 @@ impl RequestTracker { ...@@ -558,6 +593,9 @@ impl RequestTracker {
total_time_ms: self.total_time_ms(), total_time_ms: self.total_time_ms(),
kv_hit_rate: self.kv_hit_rate(), kv_hit_rate: self.kv_hit_rate(),
router_queue_depth: self.router_queue_depth(), router_queue_depth: self.router_queue_depth(),
kv_transfer_estimated_latency_ms: self
.kv_transfer_estimated_latency_secs()
.map(|s| s * 1000.0),
} }
} }
} }
...@@ -600,6 +638,11 @@ pub struct TimingInfo { ...@@ -600,6 +638,11 @@ pub struct TimingInfo {
/// Number of requests pending in the router scheduler queue at routing time /// Number of requests pending in the router scheduler queue at routing time
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub router_queue_depth: Option<usize>, pub router_queue_depth: Option<usize>,
/// Upper-bound estimation of KV cache transfer latency in milliseconds (disaggregated only).
/// Measured as decode_first_token_time - prefill_complete_time.
#[serde(skip_serializing_if = "Option::is_none")]
pub kv_transfer_estimated_latency_ms: Option<f64>,
} }
#[cfg(test)] #[cfg(test)]
...@@ -776,4 +819,180 @@ mod tests { ...@@ -776,4 +819,180 @@ mod tests {
"ITL gauge should be positive after observe, got {itl_val}" "ITL gauge should be positive after observe, got {itl_val}"
); );
} }
#[test]
fn test_kv_transfer_estimated_latency() {
let tracker = RequestTracker::new();
// Before any timestamps: returns None
assert!(tracker.kv_transfer_estimated_latency_secs().is_none());
tracker.record_prefill_complete();
thread::sleep(Duration::from_millis(10));
tracker.record_decode_first_token();
let latency = tracker.kv_transfer_estimated_latency_secs().unwrap();
assert!(
latency >= 0.005,
"latency should be at least 5ms, got {latency}"
);
}
#[test]
fn test_kv_transfer_estimated_latency_none_without_first_token() {
let tracker = RequestTracker::new();
tracker.record_prefill_complete();
assert!(
tracker.kv_transfer_estimated_latency_secs().is_none(),
"Should return None when decode_first_token_time is not set"
);
}
#[test]
fn test_kv_transfer_estimated_latency_none_without_prefill_complete() {
let tracker = RequestTracker::new();
tracker.record_decode_first_token();
assert!(
tracker.kv_transfer_estimated_latency_secs().is_none(),
"Should return None when prefill_complete_time is not set"
);
}
#[test]
fn test_kv_transfer_estimated_latency_oncelock_first_write_wins() {
let tracker = RequestTracker::new();
assert!(tracker.record_prefill_complete()); // first call returns true
assert!(!tracker.record_prefill_complete()); // second call returns false (OnceLock)
}
#[test]
fn test_timing_info_includes_kv_transfer_estimated_latency() {
let tracker = RequestTracker::new();
tracker.record_prefill_complete();
thread::sleep(Duration::from_millis(10));
tracker.record_decode_first_token();
let info = tracker.get_timing_info();
let latency_ms = info
.kv_transfer_estimated_latency_ms
.expect("should be Some");
assert!(
latency_ms >= 5.0,
"latency should be at least 5ms, got {latency_ms}"
);
}
#[test]
fn test_timing_info_kv_transfer_estimated_latency_none_in_aggregated() {
let tracker = RequestTracker::new();
// No record_prefill_complete / record_first_token called
let info = tracker.get_timing_info();
assert!(
info.kv_transfer_estimated_latency_ms.is_none(),
"Should be None in aggregated mode (no timestamps recorded)"
);
}
/// Reproduces the original bug where kv_transfer_estimated_latency was always 0.
///
/// The bug: in disaggregated serving, both `record_first_token()` and
/// `record_prefill_complete()` were called during the prefill phase with
/// ~nanoseconds between them, and the latency was computed as
/// `first_token_time - prefill_complete_time`. Since `first_token_time`
/// was set *before* `prefill_complete_time`, `saturating_duration_since`
/// clamped the negative duration to zero.
///
/// The fix: use a separate `decode_first_token_time` field that is only
/// recorded during the Decode phase, giving a meaningful time gap.
#[test]
fn test_kv_transfer_latency_bug_prefill_timestamps_are_zero() {
let tracker = RequestTracker::new();
// Simulate the buggy prefill-phase sequence:
// 1. RequestGuard::on_item() calls record_first_token() during prefill
tracker.record_first_token();
// 2. execute_prefill() calls record_prefill_complete() immediately after
tracker.record_prefill_complete();
// The OLD computation (first_token_time - prefill_complete_time) would be 0
// because first_token_time < prefill_complete_time chronologically,
// and saturating_duration_since clamps to zero.
let first_tok = *tracker.first_token_time.get().unwrap();
let complete = *tracker.prefill_complete_time.get().unwrap();
let old_latency = first_tok.saturating_duration_since(complete).as_secs_f64();
assert_eq!(
old_latency, 0.0,
"Old computation should produce exactly 0.0 (the bug), got {old_latency}"
);
// The FIXED computation uses decode_first_token_time which hasn't been set
// yet, so it correctly returns None (no decode phase has run).
assert!(
tracker.kv_transfer_estimated_latency_secs().is_none(),
"Fixed metric should be None when decode hasn't started"
);
// Now simulate the decode phase producing its first token after a delay.
thread::sleep(Duration::from_millis(10));
tracker.record_decode_first_token();
// The FIXED computation (decode_first_token_time - prefill_complete_time)
// captures the actual KV transfer + decode startup latency.
let fixed_latency = tracker.kv_transfer_estimated_latency_secs().unwrap();
assert!(
fixed_latency >= 0.005,
"Fixed latency should be >= 5ms (actual KV transfer time), got {fixed_latency}"
);
}
/// Verifies that the decode phase's record_first_token() is rejected by OnceLock
/// (since prefill already set it), but record_decode_first_token() succeeds.
#[test]
fn test_decode_first_token_not_blocked_by_prefill_oncelock() {
let tracker = RequestTracker::new();
// Prefill phase sets first_token_time
tracker.record_first_token();
let prefill_first_tok = *tracker.first_token_time.get().unwrap();
thread::sleep(Duration::from_millis(5));
// Decode phase: record_first_token() is rejected (OnceLock already set)
tracker.record_first_token();
let still_prefill_tok = *tracker.first_token_time.get().unwrap();
assert_eq!(
prefill_first_tok, still_prefill_tok,
"first_token_time should be unchanged (OnceLock rejected decode's write)"
);
// But record_decode_first_token() succeeds on its own OnceLock
tracker.record_decode_first_token();
let decode_tok = *tracker.decode_first_token_time.get().unwrap();
assert!(
decode_tok > prefill_first_tok,
"decode_first_token_time should be later than first_token_time"
);
}
#[test]
fn test_timing_info_kv_transfer_estimated_latency_serialization() {
let tracker = RequestTracker::new();
// When not set, the field should be omitted from JSON (skip_serializing_if)
let info = tracker.get_timing_info();
let json = serde_json::to_string(&info).unwrap();
assert!(
!json.contains("kv_transfer_estimated_latency_ms"),
"None field should be omitted from JSON, got: {json}"
);
// When set, it should appear
let tracker2 = RequestTracker::new();
tracker2.record_prefill_complete();
tracker2.record_decode_first_token();
let info2 = tracker2.get_timing_info();
let json2 = serde_json::to_string(&info2).unwrap();
assert!(
json2.contains("kv_transfer_estimated_latency_ms"),
"Set field should appear in JSON, got: {json2}"
);
}
} }
...@@ -192,6 +192,9 @@ pub mod frontend_service { ...@@ -192,6 +192,9 @@ pub mod frontend_service {
/// Predicted KV cache hit rate at routing time (0.0-1.0) /// Predicted KV cache hit rate at routing time (0.0-1.0)
pub const KV_HIT_RATE: &str = "kv_hit_rate"; pub const KV_HIT_RATE: &str = "kv_hit_rate";
/// Upper-bound estimation of KV cache transfer latency in disaggregated serving (seconds)
pub const KV_TRANSFER_ESTIMATED_LATENCY_SECONDS: &str = "kv_transfer_estimated_latency_seconds";
/// Number of cached tokens (prefix cache hits) per request /// Number of cached tokens (prefix cache hits) per request
pub const CACHED_TOKENS: &str = "cached_tokens"; pub const CACHED_TOKENS: &str = "cached_tokens";
......
...@@ -1569,6 +1569,7 @@ def _test_router_decisions_disagg( ...@@ -1569,6 +1569,7 @@ def _test_router_decisions_disagg(
request_plane: str = "nats", request_plane: str = "nats",
durable_kv_events: bool = False, durable_kv_events: bool = False,
router_aic_config: Optional[dict[str, Any]] = None, router_aic_config: Optional[dict[str, Any]] = None,
enable_bootstrap: bool = False,
): ):
"""Validate KV cache prefix reuse in disaggregated prefill-decode setup via HTTP frontend. """Validate KV cache prefix reuse in disaggregated prefill-decode setup via HTTP frontend.
...@@ -1716,11 +1717,13 @@ def _test_router_decisions_disagg( ...@@ -1716,11 +1717,13 @@ def _test_router_decisions_disagg(
if decode_wid is not None: if decode_wid is not None:
decode_worker_ids.append(decode_wid) decode_worker_ids.append(decode_wid)
# Verify timing info is present and valid # Verify timing info is present and valid.
# kv_transfer_estimated_latency_ms is measured on both the original
# and bootstrap prefill paths (uses first_token_time as stop).
assert ( assert (
timing_info is not None timing_info is not None
), f"Request {i + 1}: Expected timing info in final chunk, got None" ), f"Request {i + 1}: Expected timing info in final chunk, got None"
verify_response_timing(timing_info) verify_response_timing(timing_info, disagg=not enable_bootstrap)
# Small delay between requests # Small delay between requests
await asyncio.sleep(1) await asyncio.sleep(1)
......
...@@ -118,8 +118,13 @@ def verify_response_worker_ids( ...@@ -118,8 +118,13 @@ def verify_response_worker_ids(
) )
def verify_response_timing(timing_info: dict[str, Any]) -> None: def verify_response_timing(timing_info: dict[str, Any], disagg: bool = False) -> None:
"""Verify timing info has valid values (ttft_ms > 0, total_time_ms > 0).""" """Verify timing info has valid values (ttft_ms > 0, total_time_ms > 0).
Args:
timing_info: Dict of timing fields from nvext.timing in the response.
disagg: If True, also verify kv_transfer_estimated_latency_ms > 0 (disaggregated mode only).
"""
ttft_ms = timing_info.get("ttft_ms") ttft_ms = timing_info.get("ttft_ms")
total_time_ms = timing_info.get("total_time_ms") total_time_ms = timing_info.get("total_time_ms")
...@@ -134,6 +139,21 @@ def verify_response_timing(timing_info: dict[str, Any]) -> None: ...@@ -134,6 +139,21 @@ def verify_response_timing(timing_info: dict[str, Any]) -> None:
f"✓ Verified timing: ttft_ms={ttft_ms:.2f}, total_time_ms={total_time_ms:.2f}" f"✓ Verified timing: ttft_ms={ttft_ms:.2f}, total_time_ms={total_time_ms:.2f}"
) )
if disagg:
kv_transfer_estimated_latency_ms = timing_info.get(
"kv_transfer_estimated_latency_ms"
)
assert (
kv_transfer_estimated_latency_ms is not None
and kv_transfer_estimated_latency_ms > 0
), (
f"Expected kv_transfer_estimated_latency_ms > 0 in disaggregated mode, "
f"got: {kv_transfer_estimated_latency_ms}"
)
logger.info(
f"✓ Verified kv_transfer_estimated_latency_ms={kv_transfer_estimated_latency_ms:.2f}"
)
######################################################## ########################################################
# Utility functions # Utility functions
......
...@@ -1289,6 +1289,7 @@ def test_router_decisions_disagg( ...@@ -1289,6 +1289,7 @@ def test_router_decisions_disagg(
frontend_port=frontend_port, frontend_port=frontend_port,
test_payload=TEST_PAYLOAD, test_payload=TEST_PAYLOAD,
request_plane="nats", request_plane="nats",
enable_bootstrap=enable_disagg_bootstrap,
) )
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment