Unverified Commit 31189eea authored by Qi Wang's avatar Qi Wang Committed by GitHub
Browse files

fix: fix worker's push_handler non-blocking error upon stop word (#5157)

parent 332da400
...@@ -282,9 +282,24 @@ where ...@@ -282,9 +282,24 @@ where
m.response_bytes.inc_by(resp_bytes.len() as u64); m.response_bytes.inc_by(resp_bytes.len() as u64);
} }
if (publisher.send(resp_bytes.into()).await).is_err() { if (publisher.send(resp_bytes.into()).await).is_err() {
tracing::error!("Failed to publish response for stream {}", context.id());
context.stop_generating();
send_complete_final = false; send_complete_final = false;
if context.is_stopped() {
// Say there are 2 threads accessing `context`, the sequence can be either:
// 1. context.stop_generating (other) -> publisher.send failure (this)
// -> context.is_stopped (this)
// 2. publisher.send failure (this) -> context.stop_generating (other)
// -> context.is_stopped (this)
// Case 1 can happen when client closed the connection after receiving the
// complete response from frontend. Hence, send failure can be expected in this
// case.
tracing::warn!("Failed to publish response for stream {}", context.id());
} else {
// Otherwise, this is an error.
tracing::error!("Failed to publish response for stream {}", context.id());
context.stop_generating();
}
// Account errors in all cases, including cancellation. Therefore this metric can be
// inflated.
if let Some(m) = self.metrics() { if let Some(m) = self.metrics() {
m.error_counter m.error_counter
.with_label_values(&[work_handler::error_types::PUBLISH_RESPONSE]) .with_label_values(&[work_handler::error_types::PUBLISH_RESPONSE])
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment