Unverified Commit 590bc4b7 authored by Chang Su's avatar Chang Su Committed by GitHub
Browse files

[router][grpc] Fix background tasks stored with wrong id (#11945)

parent 63cfe1b0
...@@ -187,7 +187,7 @@ fn extract_text_from_content(content: &[ResponseContentPart]) -> String { ...@@ -187,7 +187,7 @@ fn extract_text_from_content(content: &[ResponseContentPart]) -> String {
/// Convert a ChatCompletionResponse to ResponsesResponse /// Convert a ChatCompletionResponse to ResponsesResponse
/// ///
/// # Conversion Logic /// # Conversion Logic
/// - `id` → `id` (pass through) /// - `id` → `response_id_override` if provided, otherwise `chat_resp.id`
/// - `model` → `model` (pass through) /// - `model` → `model` (pass through)
/// - `choices[0].message` → `output` array (convert to ResponseOutputItem::Message) /// - `choices[0].message` → `output` array (convert to ResponseOutputItem::Message)
/// - `choices[0].finish_reason` → determines `status` (stop/length → Completed) /// - `choices[0].finish_reason` → determines `status` (stop/length → Completed)
...@@ -195,6 +195,7 @@ fn extract_text_from_content(content: &[ResponseContentPart]) -> String { ...@@ -195,6 +195,7 @@ fn extract_text_from_content(content: &[ResponseContentPart]) -> String {
pub fn chat_to_responses( pub fn chat_to_responses(
chat_resp: &ChatCompletionResponse, chat_resp: &ChatCompletionResponse,
original_req: &ResponsesRequest, original_req: &ResponsesRequest,
response_id_override: Option<String>,
) -> Result<ResponsesResponse, String> { ) -> Result<ResponsesResponse, String> {
// Extract the first choice (responses API doesn't support n>1) // Extract the first choice (responses API doesn't support n>1)
let choice = chat_resp let choice = chat_resp
...@@ -275,7 +276,7 @@ pub fn chat_to_responses( ...@@ -275,7 +276,7 @@ pub fn chat_to_responses(
// Generate response // Generate response
Ok(ResponsesResponse { Ok(ResponsesResponse {
id: chat_resp.id.clone(), id: response_id_override.unwrap_or_else(|| chat_resp.id.clone()),
object: "response".to_string(), object: "response".to_string(),
created_at: chat_resp.created as i64, created_at: chat_resp.created as i64,
status, status,
......
...@@ -21,7 +21,7 @@ use futures_util::StreamExt; ...@@ -21,7 +21,7 @@ use futures_util::StreamExt;
use serde_json::json; use serde_json::json;
use tokio::sync::{mpsc, RwLock}; use tokio::sync::{mpsc, RwLock};
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, warn}; use tracing::{debug, error, warn};
use uuid::Uuid; use uuid::Uuid;
use super::{ use super::{
...@@ -935,14 +935,14 @@ async fn execute_without_mcp( ...@@ -935,14 +935,14 @@ async fn execute_without_mcp(
headers, headers,
model_id, model_id,
components, components,
response_id, response_id.clone(),
background_tasks, background_tasks,
) )
.await .await
.map_err(|e| format!("Pipeline execution failed: {}", e))?; .map_err(|e| format!("Pipeline execution failed: {}", e))?;
// Convert ChatCompletionResponse → ResponsesResponse // Convert ChatCompletionResponse → ResponsesResponse
conversions::chat_to_responses(&chat_response, original_request) conversions::chat_to_responses(&chat_response, original_request, response_id)
.map_err(|e| format!("Failed to convert to responses format: {}", e)) .map_err(|e| format!("Failed to convert to responses format: {}", e))
} }
...@@ -1207,13 +1207,20 @@ pub async fn cancel_response_impl( ...@@ -1207,13 +1207,20 @@ pub async fn cancel_response_impl(
) )
.into_response() .into_response()
} else { } else {
// Task handle not found (may have already completed) // Task handle not found but status is queued/in_progress
// This can happen if: (1) task crashed, or (2) storage persistence failed
error!(
"Response {} has status '{}' but task handle is missing. Task may have crashed or storage update failed.",
response_id, current_status
);
( (
StatusCode::OK, StatusCode::INTERNAL_SERVER_ERROR,
axum::Json(json!({ axum::Json(json!({
"id": response_id, "error": {
"status": "completed_or_not_found", "message": "Internal error: background task completed but failed to update status in storage",
"message": "Task may have already completed before cancellation" "type": "internal_error",
"code": "status_update_failed"
}
})), })),
) )
.into_response() .into_response()
......
...@@ -365,9 +365,12 @@ pub(super) async fn execute_tool_loop( ...@@ -365,9 +365,12 @@ pub(super) async fn execute_tool_loop(
); );
// Convert chat response to responses format and mark as incomplete // Convert chat response to responses format and mark as incomplete
let mut responses_response = let mut responses_response = conversions::chat_to_responses(
conversions::chat_to_responses(&chat_response, original_request) &chat_response,
.map_err(|e| format!("Failed to convert to responses format: {}", e))?; original_request,
response_id.clone(),
)
.map_err(|e| format!("Failed to convert to responses format: {}", e))?;
// Mark as completed but with incomplete details // Mark as completed but with incomplete details
responses_response.status = ResponseStatus::Completed; responses_response.status = ResponseStatus::Completed;
...@@ -461,9 +464,12 @@ pub(super) async fn execute_tool_loop( ...@@ -461,9 +464,12 @@ pub(super) async fn execute_tool_loop(
); );
// Convert final chat response to responses format // Convert final chat response to responses format
let mut responses_response = let mut responses_response = conversions::chat_to_responses(
conversions::chat_to_responses(&chat_response, original_request) &chat_response,
.map_err(|e| format!("Failed to convert to responses format: {}", e))?; original_request,
response_id.clone(),
)
.map_err(|e| format!("Failed to convert to responses format: {}", e))?;
// Inject MCP metadata into output // Inject MCP metadata into output
if state.total_calls > 0 { if state.total_calls > 0 {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment