Unverified Commit 887742a1 authored by Chang Su's avatar Chang Su Committed by GitHub
Browse files

[router][grpc] Fix index issues in reasoning content and missing streaming events (#12650)

parent 34f7564d
...@@ -615,7 +615,7 @@ impl HarmonyStreamingProcessor { ...@@ -615,7 +615,7 @@ impl HarmonyStreamingProcessor {
let mut accumulated_tool_calls: Option<Vec<ToolCall>> = None; let mut accumulated_tool_calls: Option<Vec<ToolCall>> = None;
// Track which items we've started // Track which items we've started
let mut reasoning_output_index: Option<usize> = None; let mut has_emitted_reasoning = false;
let mut message_output_index: Option<usize> = None; let mut message_output_index: Option<usize> = None;
let mut message_item_id: Option<String> = None; let mut message_item_id: Option<String> = None;
let mut has_emitted_content_part_added = false; let mut has_emitted_content_part_added = false;
...@@ -646,18 +646,14 @@ impl HarmonyStreamingProcessor { ...@@ -646,18 +646,14 @@ impl HarmonyStreamingProcessor {
if let Some(delta) = delta_result { if let Some(delta) = delta_result {
// Analysis channel → Reasoning item (wrapper events only, emitted once) // Analysis channel → Reasoning item (wrapper events only, emitted once)
if let Some(_analysis_text) = &delta.analysis_delta { if let Some(_analysis_text) = &delta.analysis_delta {
if reasoning_output_index.is_none() { if !has_emitted_reasoning {
// Allocate reasoning item and emit wrapper events
let (output_index, _item_id) =
emitter.allocate_output_index(OutputItemType::Reasoning);
reasoning_output_index = Some(output_index);
// Emit reasoning item (added + done in one call) // Emit reasoning item (added + done in one call)
// Note: reasoning_content will be provided at finalize // Note: reasoning_content will be provided at finalize
emitter emitter
.emit_reasoning_item(tx, None) .emit_reasoning_item(tx, None)
.map_err(|e| format!("Failed to emit reasoning item: {}", e))?; .map_err(|e| format!("Failed to emit reasoning item: {}", e))?;
has_emitted_reasoning = true;
has_analysis = true; has_analysis = true;
} }
} }
......
...@@ -378,6 +378,17 @@ async fn process_and_transform_sse_stream( ...@@ -378,6 +378,17 @@ async fn process_and_transform_sse_stream(
let created_at = chrono::Utc::now().timestamp() as u64; let created_at = chrono::Utc::now().timestamp() as u64;
let mut event_emitter = ResponseStreamEventEmitter::new(response_id, model, created_at); let mut event_emitter = ResponseStreamEventEmitter::new(response_id, model, created_at);
// Emit initial response.created and response.in_progress events
let event = event_emitter.emit_created();
event_emitter
.send_event(&event, &tx)
.map_err(|_| "Failed to send response.created event".to_string())?;
let event = event_emitter.emit_in_progress();
event_emitter
.send_event(&event, &tx)
.map_err(|_| "Failed to send response.in_progress event".to_string())?;
// Convert body to data stream // Convert body to data stream
let mut stream = body.into_data_stream(); let mut stream = body.into_data_stream();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment