Unverified Commit f53fa64c authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

fix: DIS-1768 emit structured SSE error + [DONE] on mid-stream fault (#8430)


Signed-off-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent 9bf9709a
...@@ -221,7 +221,22 @@ pub fn monitor_for_disconnects( ...@@ -221,7 +221,22 @@ pub fn monitor_for_disconnects(
Some(Err(err)) => { Some(Err(err)) => {
// Mark error as internal since it's a streaming error // Mark error as internal since it's a streaming error
inflight_guard.mark_error(ErrorType::Internal); inflight_guard.mark_error(ErrorType::Internal);
yield Event::default().event("error").comment(err.to_string()); // We're terminating the stream intentionally here with a
// structured error + [DONE]; disarm so the stream handle
// doesn't later record this as ClosedUnexpectedly (which
// would mis-attribute the fault as a client disconnect).
stream_handle.disarm();
// DIS-1768: emit structured OpenAI-style error frame + `data: [DONE]`
// so naive `data:`-line parsers see both the error and a stream terminator.
let err_json = serde_json::json!({
"error": {
"message": err.to_string(),
"type": "internal_server_error",
"code": 500,
}
});
yield Event::default().data(err_json.to_string());
yield Event::default().data("[DONE]");
// Break to prevent any subsequent mark_ok() from overwriting the error // Break to prevent any subsequent mark_ok() from overwriting the error
break; break;
} }
...@@ -491,4 +506,128 @@ mod tests { ...@@ -491,4 +506,128 @@ mod tests {
"inflight gauge leaked in phase 2" "inflight gauge leaked in phase 2"
); );
} }
// ─────────────────────────────────────────────────────────────────────────────
// DIS-1768: mid-stream fault SSE contract
//
// When the upstream stream yields `Err(_)` mid-stream — e.g. an upstream
// worker dies and the mpsc channel reports
// `Disconnected: Stream ended before generation completed`, or the Python
// chat-processor raises and the Rust→Python `tx.send()` fails with
// `Failed to send response: SendError { .. }` — the client MUST receive:
// 1. a structured `data: {"error":{"message":..., "type":... or "code":...}}` frame, then
// 2. a `data: [DONE]` terminator.
// Before the fix, the code emitted the bare SSE trailer
// `event: error\n: <comment>\n\n` with no `[DONE]`, which violates the
// OpenAI SSE contract and is silently skipped by naive `data:`-line parsers.
// The two tests below pin the post-fix contract.
// ─────────────────────────────────────────────────────────────────────────────
/// Builds a stream that yields `data_chunks` successful events, then yields an
/// `Err` carrying `err_msg`, simulating a mid-stream upstream fault.
fn simulate_mid_stream_error(
data_chunks: usize,
err_msg: &'static str,
) -> impl futures::Stream<Item = Result<axum::response::sse::Event, axum::Error>> {
async_stream::try_stream! {
for i in 0..data_chunks {
yield axum::response::sse::Event::default().data(format!("chunk-{i}"));
}
Err(axum::Error::new(err_msg))?;
}
}
/// Collect the wire-format SSE body from a monitored stream.
async fn collect_sse_body(
stream: impl Stream<Item = Result<Event, axum::Error>> + Send + 'static,
) -> String {
use axum::body::to_bytes;
use axum::response::{IntoResponse, Sse};
let response = Sse::new(stream).into_response();
let body = to_bytes(response.into_body(), 1 << 20)
.await
.expect("body bytes");
String::from_utf8(body.to_vec()).expect("utf8 body")
}
/// Assert the post-fix SSE fault contract: parsed structured error frame with exact
/// message/type/code, positioned before `[DONE]`, and no bare `event: error` trailer.
fn assert_fault_contract(case: &str, text: &str, expected_message: &str) {
let done_pos = text.find("data: [DONE]").unwrap_or_else(|| {
panic!("[{case}] body does not terminate with `data: [DONE]`. Body:\n{text}")
});
let (error_line, error_frame) = text
.lines()
.find_map(|line| {
let payload = line.strip_prefix("data: ")?;
serde_json::from_str::<serde_json::Value>(payload)
.ok()
.filter(|v| v.get("error").is_some())
.map(|v| (line, v))
})
.unwrap_or_else(|| {
panic!(
"[{case}] body missing structured JSON `data: {{\"error\":{{...}}}}` frame. Body:\n{text}"
)
});
let error_pos = text.find(error_line).unwrap_or_default();
assert!(
error_pos < done_pos,
"[{case}] structured error frame must precede `data: [DONE]`. Body:\n{text}"
);
let error = error_frame
.get("error")
.and_then(|v| v.as_object())
.unwrap_or_else(|| panic!("[{case}] `error` field is not an object. Body:\n{text}"));
assert_eq!(
error.get("message").and_then(|v| v.as_str()),
Some(expected_message),
"[{case}] structured error `message` mismatch. Body:\n{text}"
);
assert_eq!(
error.get("type").and_then(|v| v.as_str()),
Some("internal_server_error"),
"[{case}] structured error `type` mismatch. Body:\n{text}"
);
assert_eq!(
error.get("code").and_then(|v| v.as_i64()),
Some(500),
"[{case}] structured error `code` mismatch. Body:\n{text}"
);
assert!(
!text.contains("event: error\n: "),
"[{case}] body contains bare `event: error\\n: <comment>` trailer (pre-fix bug). Body:\n{text}"
);
}
/// Upstream worker killed mid-stream → mpsc channel reports `Disconnected` to the
/// HTTP layer. Client MUST receive structured error + `[DONE]`.
#[tokio::test]
#[serial]
async fn test_simulate_worker_kill_emits_structured_error_and_done() {
let (_metrics, guard, ctx, handle) = setup_test("worker-kill-model", "req-wk", "0");
let expected_message = "Disconnected: Stream ended before generation completed";
let stream = simulate_mid_stream_error(3, expected_message);
let monitored = monitor_for_disconnects(stream, ctx, guard, handle);
let body = collect_sse_body(monitored).await;
cleanup_env();
assert_fault_contract("worker_kill", &body, expected_message);
}
/// Python chat-processor raises mid-stream → Rust→Python `tx.send()` fails with
/// `SendError`. Client MUST receive structured error + `[DONE]`.
#[tokio::test]
#[serial]
async fn test_simulate_python_consumer_drop_emits_structured_error_and_done() {
let (_metrics, guard, ctx, handle) = setup_test("py-drop-model", "req-py", "0");
let expected_message = "Failed to send response: SendError { .. }";
let stream = simulate_mid_stream_error(3, expected_message);
let monitored = monitor_for_disconnects(stream, ctx, guard, handle);
let body = collect_sse_body(monitored).await;
cleanup_env();
assert_fault_contract("python_consumer_drop", &body, expected_message);
}
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment