Unverified Commit 1357ab02 authored by Chang Su's avatar Chang Su Committed by GitHub
Browse files

[router][grpc] Emit OutputItemDone event and store output item array (#12656)

parent 44da7377
......@@ -9,7 +9,10 @@ use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use uuid::Uuid;
use crate::{mcp, protocols::chat::ChatCompletionStreamResponse};
use crate::{
mcp,
protocols::{chat::ChatCompletionStreamResponse, responses::ResponsesRequest},
};
pub enum OutputItemType {
Message,
......@@ -31,6 +34,7 @@ enum ItemStatus {
struct OutputItemState {
output_index: usize,
status: ItemStatus,
item_data: Option<serde_json::Value>,
}
/// OpenAI-compatible event emitter for /v1/responses streaming
......@@ -70,6 +74,7 @@ pub struct ResponseStreamEventEmitter {
next_output_index: usize,
current_message_output_index: Option<usize>, // Tracks output_index of current message
current_item_id: Option<String>, // Tracks item_id of current item
original_request: Option<ResponsesRequest>,
}
impl ResponseStreamEventEmitter {
......@@ -92,9 +97,15 @@ impl ResponseStreamEventEmitter {
next_output_index: 0,
current_message_output_index: None,
current_item_id: None,
original_request: None,
}
}
/// Set the original request for including all fields in response.completed
pub fn set_original_request(&mut self, request: ResponsesRequest) {
self.original_request = Some(request);
}
fn next_sequence(&mut self) -> u64 {
let seq = self.sequence_number;
self.sequence_number += 1;
......@@ -204,32 +215,98 @@ impl ResponseStreamEventEmitter {
}
pub fn emit_completed(&mut self, usage: Option<&serde_json::Value>) -> serde_json::Value {
let mut response = json!({
"type": "response.completed",
"sequence_number": self.next_sequence(),
"response": {
"id": self.response_id,
"object": "response",
"created_at": self.created_at,
"status": "completed",
"model": self.model,
"output": [{
"id": self.message_id.clone(),
"type": "message",
"role": "assistant",
"content": [{
"type": "text",
"text": self.accumulated_text.clone()
}]
// Build output array from tracked items
let output: Vec<serde_json::Value> = self
.output_items
.iter()
.filter_map(|item| {
if item.status == ItemStatus::Completed {
item.item_data.clone()
} else {
None
}
})
.collect();
// If no items were tracked (legacy path), fall back to generic message
let output = if output.is_empty() {
vec![json!({
"id": self.message_id.clone(),
"type": "message",
"role": "assistant",
"content": [{
"type": "text",
"text": self.accumulated_text.clone()
}]
}
})]
} else {
output
};
// Build base response object
let mut response_obj = json!({
"id": self.response_id,
"object": "response",
"created_at": self.created_at,
"status": "completed",
"model": self.model,
"output": output
});
// Add usage if provided
if let Some(usage_val) = usage {
response["response"]["usage"] = usage_val.clone();
response_obj["usage"] = usage_val.clone();
}
// Add all original request fields if available
if let Some(ref req) = self.original_request {
Self::add_optional_field(&mut response_obj, "instructions", &req.instructions);
Self::add_optional_field(
&mut response_obj,
"max_output_tokens",
&req.max_output_tokens,
);
Self::add_optional_field(&mut response_obj, "max_tool_calls", &req.max_tool_calls);
Self::add_optional_field(
&mut response_obj,
"previous_response_id",
&req.previous_response_id,
);
Self::add_optional_field(&mut response_obj, "reasoning", &req.reasoning);
Self::add_optional_field(&mut response_obj, "temperature", &req.temperature);
Self::add_optional_field(&mut response_obj, "top_p", &req.top_p);
Self::add_optional_field(&mut response_obj, "truncation", &req.truncation);
Self::add_optional_field(&mut response_obj, "user", &req.user);
response_obj["parallel_tool_calls"] = json!(req.parallel_tool_calls.unwrap_or(true));
response_obj["store"] = json!(req.store.unwrap_or(true));
response_obj["tools"] = json!(req.tools.as_ref().unwrap_or(&vec![]));
response_obj["metadata"] = json!(req.metadata.as_ref().unwrap_or(&Default::default()));
// tool_choice: serialize if present, otherwise use "auto"
if let Some(ref tc) = req.tool_choice {
response_obj["tool_choice"] = json!(tc);
} else {
response_obj["tool_choice"] = json!("auto");
}
}
response
json!({
"type": "response.completed",
"sequence_number": self.next_sequence(),
"response": response_obj
})
}
/// Helper to add optional fields to JSON object
fn add_optional_field<T: serde::Serialize>(
obj: &mut serde_json::Value,
key: &str,
value: &Option<T>,
) {
if let Some(val) = value {
obj[key] = json!(val);
}
}
// ========================================================================
......@@ -403,6 +480,9 @@ impl ResponseStreamEventEmitter {
output_index: usize,
item: &serde_json::Value,
) -> serde_json::Value {
// Store the item data for later use in emit_completed
self.store_output_item_data(output_index, item.clone());
json!({
"type": "response.output_item.done",
"sequence_number": self.next_sequence(),
......@@ -434,12 +514,13 @@ impl ResponseStreamEventEmitter {
self.output_items.push(OutputItemState {
output_index: index,
status: ItemStatus::InProgress,
item_data: None,
});
(index, id)
}
/// Mark output item as completed
/// Mark output item as completed and store its data
pub fn complete_output_item(&mut self, output_index: usize) {
if let Some(item) = self
.output_items
......@@ -450,6 +531,17 @@ impl ResponseStreamEventEmitter {
}
}
/// Store output item data when emitting output_item.done
pub fn store_output_item_data(&mut self, output_index: usize, item_data: serde_json::Value) {
if let Some(item) = self
.output_items
.iter_mut()
.find(|i| i.output_index == output_index)
{
item.item_data = Some(item_data);
}
}
/// Emit reasoning item wrapper events (added + done)
///
/// Reasoning items in OpenAI format are simple placeholders emitted between tool iterations.
......
......@@ -505,6 +505,9 @@ pub async fn serve_harmony_responses_stream(
.as_secs();
let mut emitter = ResponseStreamEventEmitter::new(response_id.clone(), model, created_at);
// Set original request for complete response fields
emitter.set_original_request(current_request.clone());
// Clone context for spawned task
let ctx_clone = ctx.clone();
......@@ -717,8 +720,8 @@ async fn execute_mcp_tool_loop_streaming(
// Emit response.completed with incomplete_details and usage
let incomplete_details = json!({ "reason": "max_tool_calls" });
let usage_json = json!({
"prompt_tokens": usage.prompt_tokens,
"completion_tokens": usage.completion_tokens,
"input_tokens": usage.prompt_tokens,
"output_tokens": usage.completion_tokens,
"total_tokens": usage.total_tokens,
"incomplete_details": incomplete_details,
});
......@@ -773,8 +776,8 @@ async fn execute_mcp_tool_loop_streaming(
// Emit response.completed with usage
let usage_json = json!({
"prompt_tokens": usage.prompt_tokens,
"completion_tokens": usage.completion_tokens,
"input_tokens": usage.prompt_tokens,
"output_tokens": usage.completion_tokens,
"total_tokens": usage.total_tokens,
});
let event = emitter.emit_completed(Some(&usage_json));
......@@ -815,19 +818,34 @@ async fn execute_without_mcp_streaming(
};
// Process stream (emits all output items during streaming - function tool path emits function_call_arguments.* events)
if let Err(err_msg) = HarmonyStreamingProcessor::process_responses_iteration_stream_function(
execution_result,
emitter,
tx,
)
.await
{
emitter.emit_error(&err_msg, Some("processing_error"), tx);
return;
}
let iteration_result =
match HarmonyStreamingProcessor::process_responses_iteration_stream_function(
execution_result,
emitter,
tx,
)
.await
{
Ok(result) => result,
Err(err_msg) => {
emitter.emit_error(&err_msg, Some("processing_error"), tx);
return;
}
};
// Emit response.completed
let event = emitter.emit_completed(None);
// Extract usage from iteration result
let usage = match iteration_result {
ResponsesIterationResult::ToolCallsFound { usage, .. } => usage,
ResponsesIterationResult::Completed { usage, .. } => usage,
};
// Emit response.completed with usage
let usage_json = json!({
"input_tokens": usage.prompt_tokens,
"output_tokens": usage.completion_tokens,
"total_tokens": usage.total_tokens,
});
let event = emitter.emit_completed(Some(&usage_json));
emitter.send_event_best_effort(&event, tx);
}
......
......@@ -924,6 +924,8 @@ impl HarmonyStreamingProcessor {
if let Some(ref tool_calls) = accumulated_tool_calls {
for (call_idx, tool_call) in tool_calls.iter().enumerate() {
if let Some((output_index, item_id)) = tool_call_tracking.get(&call_idx) {
let tool_name = &tool_call.function.name;
// Emit arguments done with final arguments
let args_str = tool_call.function.arguments.as_deref().unwrap_or("");
let event =
......@@ -935,6 +937,21 @@ impl HarmonyStreamingProcessor {
let event = emitter.emit_mcp_call_completed(*output_index, item_id);
emitter.send_event_best_effort(&event, tx);
}
// Emit output_item.done wrapper event
let item = json!({
"id": item_id,
"type": mode.type_str(),
"name": tool_name,
"call_id": &tool_call.id,
"arguments": args_str,
"status": "completed"
});
let event = emitter.emit_output_item_done(*output_index, &item);
emitter.send_event_best_effort(&event, tx);
// Mark output item as completed
emitter.complete_output_item(*output_index);
}
}
}
......
......@@ -377,6 +377,7 @@ async fn process_and_transform_sse_stream(
let model = original_request.model.clone();
let created_at = chrono::Utc::now().timestamp() as u64;
let mut event_emitter = ResponseStreamEventEmitter::new(response_id, model, created_at);
event_emitter.set_original_request(original_request.clone());
// Emit initial response.created and response.in_progress events
let event = event_emitter.emit_created();
......@@ -432,15 +433,15 @@ async fn process_and_transform_sse_stream(
// Emit final response.completed event with accumulated usage
let usage_json = accumulator.usage.as_ref().map(|u| {
let mut usage_obj = json!({
"prompt_tokens": u.prompt_tokens,
"completion_tokens": u.completion_tokens,
"input_tokens": u.prompt_tokens,
"output_tokens": u.completion_tokens,
"total_tokens": u.total_tokens
});
// Include reasoning_tokens if present
if let Some(details) = &u.completion_tokens_details {
if let Some(reasoning_tokens) = details.reasoning_tokens {
usage_obj["completion_tokens_details"] = json!({
usage_obj["output_tokens_details"] = json!({
"reasoning_tokens": reasoning_tokens
});
}
......
......@@ -545,6 +545,7 @@ async fn execute_tool_loop_streaming_internal(
.unwrap()
.as_secs();
let mut emitter = ResponseStreamEventEmitter::new(response_id, model, created_at);
emitter.set_original_request(original_request.clone());
// Emit initial response.created and response.in_progress events
let event = emitter.emit_created();
......@@ -896,8 +897,8 @@ async fn execute_tool_loop_streaming_internal(
// Emit final response.completed event
let usage_json = accumulated_response.usage.as_ref().map(|u| {
json!({
"prompt_tokens": u.prompt_tokens,
"completion_tokens": u.completion_tokens,
"input_tokens": u.prompt_tokens,
"output_tokens": u.completion_tokens,
"total_tokens": u.total_tokens
})
});
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment