Unverified Commit 837b08eb authored by Chang Su's avatar Chang Su Committed by GitHub
Browse files

[router][grpc] Support mixin tool calls in Responses API (#12736)

parent bb6a21cd
...@@ -56,7 +56,7 @@ impl Default for ResponseTool { ...@@ -56,7 +56,7 @@ impl Default for ResponseTool {
} }
} }
#[derive(Debug, Clone, Deserialize, Serialize)] #[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
pub enum ResponseToolType { pub enum ResponseToolType {
Function, Function,
......
...@@ -20,9 +20,9 @@ ...@@ -20,9 +20,9 @@
//! //!
//! match result { //! match result {
//! ToolCallsFound { tool_calls, .. } => { //! ToolCallsFound { tool_calls, .. } => {
//! // Execute MCP tools //! // Separate MCP tools from function tools
//! // Build next request with tool results //! // Execute MCP tools, return if function tools found
//! // Continue loop //! // Continue loop with MCP results if only MCP tools
//! } //! }
//! Completed { response, .. } => { //! Completed { response, .. } => {
//! return Ok(response); //! return Ok(response);
...@@ -30,12 +30,6 @@ ...@@ -30,12 +30,6 @@
//! } //! }
//! } //! }
//! ``` //! ```
//!
//! ## Design Reference
//!
//! See `/Users/simolin/workspace/sglang/.claude/docs/harmony_pipeline/tool_loop_design.md`
//! for complete architecture, rationale, and implementation details.
use std::{ use std::{
sync::Arc, sync::Arc,
time::{SystemTime, UNIX_EPOCH}, time::{SystemTime, UNIX_EPOCH},
...@@ -210,6 +204,18 @@ impl HarmonyResponsesContext { ...@@ -210,6 +204,18 @@ impl HarmonyResponsesContext {
} }
} }
/// Build a HashSet of MCP tool names for O(1) lookup
///
/// Creates a HashSet containing the names of all MCP tools in the request,
/// allowing for efficient O(1) lookups when partitioning tool calls.
fn build_mcp_tool_names_set(request_tools: &[ResponseTool]) -> std::collections::HashSet<&str> {
request_tools
.iter()
.filter(|t| t.r#type == ResponseToolType::Mcp)
.filter_map(|t| t.function.as_ref().map(|f| f.name.as_str()))
.collect()
}
/// Execute Harmony Responses API request with multi-turn MCP tool support /// Execute Harmony Responses API request with multi-turn MCP tool support
/// ///
/// This function orchestrates the multi-turn conversation flow: /// This function orchestrates the multi-turn conversation flow:
...@@ -354,7 +360,20 @@ async fn execute_with_mcp_loop( ...@@ -354,7 +360,20 @@ async fn execute_with_mcp_loop(
tool_call_count = tool_calls.len(), tool_call_count = tool_calls.len(),
has_analysis = analysis.is_some(), has_analysis = analysis.is_some(),
partial_text_len = partial_text.len(), partial_text_len = partial_text.len(),
"Tool calls found - checking limits before executing MCP tools" "Tool calls found - separating MCP and function tools"
);
// Separate MCP and function tool calls based on tool type
let request_tools = current_request.tools.as_deref().unwrap_or(&[]);
let mcp_tool_names = build_mcp_tool_names_set(request_tools);
let (mcp_tool_calls, function_tool_calls): (Vec<_>, Vec<_>) = tool_calls
.into_iter()
.partition(|tc| mcp_tool_names.contains(tc.function.name.as_str()));
debug!(
mcp_calls = mcp_tool_calls.len(),
function_calls = function_tool_calls.len(),
"Tool calls separated by type"
); );
// Check combined limit (user's max_tool_calls vs safety limit) // Check combined limit (user's max_tool_calls vs safety limit)
...@@ -363,21 +382,29 @@ async fn execute_with_mcp_loop( ...@@ -363,21 +382,29 @@ async fn execute_with_mcp_loop(
None => MAX_TOOL_ITERATIONS, None => MAX_TOOL_ITERATIONS,
}; };
// Check if we would exceed the limit with these new tool calls // Check if we would exceed the limit with these new MCP tool calls
let total_calls_after = mcp_tracking.total_calls() + tool_calls.len(); let total_calls_after = mcp_tracking.total_calls() + mcp_tool_calls.len();
if total_calls_after > effective_limit { if total_calls_after > effective_limit {
warn!( warn!(
current_calls = mcp_tracking.total_calls(), current_calls = mcp_tracking.total_calls(),
new_calls = tool_calls.len(), new_calls = mcp_tool_calls.len() + function_tool_calls.len(),
total_after = total_calls_after, total_after = total_calls_after,
effective_limit = effective_limit, effective_limit = effective_limit,
user_max = ?max_tool_calls, user_max = ?max_tool_calls,
"Reached tool call limit - returning incomplete response" "Reached tool call limit - returning incomplete response"
); );
// Build response with incomplete status // Combine back for response
let mut response = build_function_tool_response( let all_tool_calls: Vec<_> = mcp_tool_calls
tool_calls, .into_iter()
.chain(function_tool_calls)
.collect();
// Build response with incomplete status - no tools executed due to limit
let mut response = build_tool_response(
vec![], // No MCP tools executed
vec![], // No MCP results
all_tool_calls, // All tools returned as function calls (not executed)
analysis, analysis,
partial_text, partial_text,
usage, usage,
...@@ -397,15 +424,50 @@ async fn execute_with_mcp_loop( ...@@ -397,15 +424,50 @@ async fn execute_with_mcp_loop(
return Ok(response); return Ok(response);
} }
// Execute MCP tools // Execute MCP tools (if any)
let tool_results = let mcp_results = if !mcp_tool_calls.is_empty() {
execute_mcp_tools(&ctx.mcp_manager, &tool_calls, &mut mcp_tracking).await?; execute_mcp_tools(&ctx.mcp_manager, &mcp_tool_calls, &mut mcp_tracking).await?
} else {
Vec::new()
};
// If there are function tools, exit MCP loop and return response
if !function_tool_calls.is_empty() {
debug!(
"Function tool calls present - exiting MCP loop and returning to caller"
);
// Build response that includes:
// 1. Reasoning/message from this iteration
// 2. MCP tools as completed (with output) - these were executed
// 3. Function tools as completed (without output) - need caller execution
let mut response = build_tool_response(
mcp_tool_calls,
mcp_results,
function_tool_calls,
analysis,
partial_text,
usage,
request_id,
Arc::new(current_request),
);
// Inject MCP metadata for all executed calls
if mcp_tracking.total_calls() > 0 {
inject_mcp_metadata(&mut response, &mcp_tracking, &ctx.mcp_manager);
}
return Ok(response);
}
// Only MCP tools - continue loop with their results
debug!("Only MCP tools - continuing loop with results");
// Build next request with appended history // Build next request with appended history
current_request = build_next_request_with_tools( current_request = build_next_request_with_tools(
current_request, current_request,
tool_calls, mcp_tool_calls,
tool_results, mcp_results,
analysis, analysis,
partial_text, partial_text,
) )
...@@ -469,7 +531,9 @@ async fn execute_without_mcp_loop( ...@@ -469,7 +531,9 @@ async fn execute_without_mcp_loop(
"Function tool calls found - returning to caller" "Function tool calls found - returning to caller"
); );
Ok(build_function_tool_response( Ok(build_tool_response(
vec![],
vec![],
tool_calls, tool_calls,
analysis, analysis,
partial_text, partial_text,
...@@ -602,6 +666,20 @@ async fn execute_mcp_tool_loop_streaming( ...@@ -602,6 +666,20 @@ async fn execute_mcp_tool_loop_streaming(
); );
} }
// Build HashSet of MCP tool names for O(1) lookup during streaming
// Clone tool names to owned strings to avoid borrowing current_request
let mcp_tool_names: std::collections::HashSet<String> = current_request
.tools
.as_ref()
.map(|tools| {
tools
.iter()
.filter(|t| t.r#type == ResponseToolType::Mcp)
.filter_map(|t| t.function.as_ref().map(|f| f.name.clone()))
.collect()
})
.unwrap_or_default();
// Emit mcp_list_tools on first iteration // Emit mcp_list_tools on first iteration
let (output_index, item_id) = emitter.allocate_output_index(OutputItemType::McpListTools); let (output_index, item_id) = emitter.allocate_output_index(OutputItemType::McpListTools);
...@@ -705,21 +783,21 @@ async fn execute_mcp_tool_loop_streaming( ...@@ -705,21 +783,21 @@ async fn execute_mcp_tool_loop_streaming(
} }
}; };
// Process stream with token-level streaming (MCP path - emits mcp_call.* events) // Process stream with token-level streaming (mixed tools - emits correct events per tool type)
let iteration_result = let iteration_result = match HarmonyStreamingProcessor::process_responses_iteration_stream(
match HarmonyStreamingProcessor::process_responses_iteration_stream_mcp( execution_result,
execution_result, emitter,
emitter, tx,
tx, &mcp_tool_names,
) )
.await .await
{ {
Ok(result) => result, Ok(result) => result,
Err(err_msg) => { Err(err_msg) => {
emitter.emit_error(&err_msg, Some("processing_error"), tx); emitter.emit_error(&err_msg, Some("processing_error"), tx);
return; return;
} }
}; };
// Handle iteration result (tool calls or completion) // Handle iteration result (tool calls or completion)
match iteration_result { match iteration_result {
...@@ -734,7 +812,20 @@ async fn execute_mcp_tool_loop_streaming( ...@@ -734,7 +812,20 @@ async fn execute_mcp_tool_loop_streaming(
tool_call_count = tool_calls.len(), tool_call_count = tool_calls.len(),
has_analysis = analysis.is_some(), has_analysis = analysis.is_some(),
partial_text_len = partial_text.len(), partial_text_len = partial_text.len(),
"MCP tool calls found in commentary channel - checking limits" "Tool calls found - separating MCP and function tools"
);
// Separate MCP and function tool calls based on tool type
let request_tools = current_request.tools.as_deref().unwrap_or(&[]);
let mcp_tool_names = build_mcp_tool_names_set(request_tools);
let (mcp_tool_calls, function_tool_calls): (Vec<_>, Vec<_>) = tool_calls
.into_iter()
.partition(|tc| mcp_tool_names.contains(tc.function.name.as_str()));
debug!(
mcp_calls = mcp_tool_calls.len(),
function_calls = function_tool_calls.len(),
"Tool calls separated by type in streaming"
); );
// Check combined limit (user's max_tool_calls vs safety limit) // Check combined limit (user's max_tool_calls vs safety limit)
...@@ -743,12 +834,12 @@ async fn execute_mcp_tool_loop_streaming( ...@@ -743,12 +834,12 @@ async fn execute_mcp_tool_loop_streaming(
None => MAX_TOOL_ITERATIONS, None => MAX_TOOL_ITERATIONS,
}; };
// Check if we would exceed the limit with these new tool calls // Check if we would exceed the limit with these new MCP tool calls
let total_calls_after = mcp_tracking.total_calls() + tool_calls.len(); let total_calls_after = mcp_tracking.total_calls() + mcp_tool_calls.len();
if total_calls_after > effective_limit { if total_calls_after > effective_limit {
warn!( warn!(
current_calls = mcp_tracking.total_calls(), current_calls = mcp_tracking.total_calls(),
new_calls = tool_calls.len(), new_calls = mcp_tool_calls.len() + function_tool_calls.len(),
total_after = total_calls_after, total_after = total_calls_after,
effective_limit = effective_limit, effective_limit = effective_limit,
user_max = ?max_tool_calls, user_max = ?max_tool_calls,
...@@ -768,9 +859,10 @@ async fn execute_mcp_tool_loop_streaming( ...@@ -768,9 +859,10 @@ async fn execute_mcp_tool_loop_streaming(
return; return;
} }
// Execute MCP tools and continue loop // Execute MCP tools (if any)
let tool_results = let mcp_results = if !mcp_tool_calls.is_empty() {
match execute_mcp_tools(&ctx.mcp_manager, &tool_calls, &mut mcp_tracking).await match execute_mcp_tools(&ctx.mcp_manager, &mcp_tool_calls, &mut mcp_tracking)
.await
{ {
Ok(results) => results, Ok(results) => results,
Err(err_response) => { Err(err_response) => {
...@@ -781,16 +873,42 @@ async fn execute_mcp_tool_loop_streaming( ...@@ -781,16 +873,42 @@ async fn execute_mcp_tool_loop_streaming(
); );
return; return;
} }
}; }
} else {
Vec::new()
};
// Update mcp_call output items with execution results // Update mcp_call output items with execution results (if any MCP tools were executed)
emitter.update_mcp_call_outputs(&tool_results); if !mcp_results.is_empty() {
emitter.update_mcp_call_outputs(&mcp_results);
}
// If there are function tools, exit MCP loop and emit completion
if !function_tool_calls.is_empty() {
debug!(
"Function tool calls present - exiting MCP loop and emitting completion"
);
// Function tool calls were already emitted during streaming processing
// Just emit response.completed with usage
let usage_json = json!({
"input_tokens": usage.prompt_tokens,
"output_tokens": usage.completion_tokens,
"total_tokens": usage.total_tokens,
});
let event = emitter.emit_completed(Some(&usage_json));
emitter.send_event_best_effort(&event, tx);
return;
}
// Only MCP tools - continue loop with their results
debug!("Only MCP tools - continuing loop with results");
// Build next request with appended history // Build next request with appended history
current_request = match build_next_request_with_tools( current_request = match build_next_request_with_tools(
current_request, current_request,
tool_calls, mcp_tool_calls,
tool_results, mcp_results,
analysis, analysis,
partial_text, partial_text,
) { ) {
...@@ -873,20 +991,22 @@ async fn execute_without_mcp_streaming( ...@@ -873,20 +991,22 @@ async fn execute_without_mcp_streaming(
}; };
// Process stream (emits all output items during streaming - function tool path emits function_call_arguments.* events) // Process stream (emits all output items during streaming - function tool path emits function_call_arguments.* events)
let iteration_result = // Pass empty HashSet so all tools are treated as function tools (per-tool detection)
match HarmonyStreamingProcessor::process_responses_iteration_stream_function( let empty_mcp_tools = std::collections::HashSet::new();
execution_result, let iteration_result = match HarmonyStreamingProcessor::process_responses_iteration_stream(
emitter, execution_result,
tx, emitter,
) tx,
.await &empty_mcp_tools,
{ )
Ok(result) => result, .await
Err(err_msg) => { {
emitter.emit_error(&err_msg, Some("processing_error"), tx); Ok(result) => result,
return; Err(err_msg) => {
} emitter.emit_error(&err_msg, Some("processing_error"), tx);
}; return;
}
};
// Extract usage from iteration result // Extract usage from iteration result
let usage = match iteration_result { let usage = match iteration_result {
...@@ -917,17 +1037,17 @@ async fn execute_without_mcp_streaming( ...@@ -917,17 +1037,17 @@ async fn execute_without_mcp_streaming(
emitter.send_event_best_effort(&event, tx); emitter.send_event_best_effort(&event, tx);
} }
/// Build ResponsesResponse with function tool calls for caller to execute /// Build ResponsesResponse with tool calls (MCP and/or function tools)
///
/// When tool calls are found but no MCP client is available (function tools only),
/// this builds a response with status=Completed and tool calls without output field.
/// The absence of output signals the caller should execute tools and resume.
/// ///
/// ResponsesResponse with tool calls
/// TODO: Refactor to use builder pattern /// TODO: Refactor to use builder pattern
fn build_function_tool_response( #[allow(clippy::too_many_arguments)]
tool_calls: Vec<ToolCall>, fn build_tool_response(
analysis: Option<String>, mcp_tool_calls: Vec<ToolCall>,
partial_text: String, mcp_results: Vec<ToolResult>,
function_tool_calls: Vec<ToolCall>,
analysis: Option<String>, // Analysis channel content (reasoning)
partial_text: String, // Final channel content (message)
usage: Usage, usage: Usage,
request_id: String, request_id: String,
responses_request: Arc<ResponsesRequest>, responses_request: Arc<ResponsesRequest>,
...@@ -960,20 +1080,40 @@ fn build_function_tool_response( ...@@ -960,20 +1080,40 @@ fn build_function_tool_response(
}); });
} }
// Add function tool calls as completed output items (no output field = needs execution) // Add MCP tool calls WITH output (these were executed)
for tool_call in tool_calls { for (tool_call, result) in mcp_tool_calls.iter().zip(mcp_results.iter()) {
let output_str = to_string(&result.output).unwrap_or_else(|e| {
format!("{{\"error\": \"Failed to serialize tool output: {}\"}}", e)
});
output.push(ResponseOutputItem::FunctionToolCall { output.push(ResponseOutputItem::FunctionToolCall {
id: tool_call.id.clone(), id: tool_call.id.clone(),
call_id: tool_call.id.clone(), call_id: tool_call.id.clone(),
name: tool_call.function.name.clone(), name: tool_call.function.name.clone(),
arguments: tool_call.function.arguments.clone().unwrap_or_default(), arguments: tool_call.function.arguments.clone().unwrap_or_default(),
output: None, // No output = tool needs execution by caller output: Some(output_str),
status: if result.is_error {
"failed"
} else {
"completed"
}
.to_string(),
});
}
// Add function tool calls WITHOUT output (need caller execution)
for tool_call in function_tool_calls {
output.push(ResponseOutputItem::FunctionToolCall {
id: tool_call.id.clone(),
call_id: tool_call.id.clone(),
name: tool_call.function.name.clone(),
arguments: tool_call.function.arguments.clone().unwrap_or_default(),
output: None, // No output = needs execution
status: "completed".to_string(), status: "completed".to_string(),
}); });
} }
// Build ResponsesResponse with Completed status // Build ResponsesResponse with Completed status
// The presence of FunctionToolCall items without output signals tool execution needed
let created_at = SystemTime::now() let created_at = SystemTime::now()
.duration_since(UNIX_EPOCH) .duration_since(UNIX_EPOCH)
.unwrap() .unwrap()
...@@ -1023,13 +1163,6 @@ fn build_function_tool_response( ...@@ -1023,13 +1163,6 @@ fn build_function_tool_response(
/// Tool execution errors are returned as error results to the model /// Tool execution errors are returned as error results to the model
/// (allows model to handle gracefully). /// (allows model to handle gracefully).
/// ///
/// # Arguments
///
/// * `mcp_manager` - MCP manager for tool execution
/// * `tool_calls` - Tool calls from commentary channel
///
/// # Returns
///
/// Vector of tool results (one per tool call) /// Vector of tool results (one per tool call)
async fn execute_mcp_tools( async fn execute_mcp_tools(
mcp_manager: &Arc<McpManager>, mcp_manager: &Arc<McpManager>,
...@@ -1151,24 +1284,12 @@ async fn execute_mcp_tools( ...@@ -1151,24 +1284,12 @@ async fn execute_mcp_tools(
/// 1. Original input items (preserved) /// 1. Original input items (preserved)
/// 2. Assistant message with analysis (reasoning) + partial_text + tool_calls /// 2. Assistant message with analysis (reasoning) + partial_text + tool_calls
/// 3. Tool result messages for each tool execution /// 3. Tool result messages for each tool execution
///
/// # Arguments
///
/// * `request` - Current request (contains original input)
/// * `tool_calls` - Tool calls from commentary channel
/// * `tool_results` - Results from MCP tool execution
/// * `analysis` - Analysis channel content (becomes reasoning content)
/// * `partial_text` - Final channel content (becomes message content)
///
/// # Returns
///
/// New ResponsesRequest with appended history
fn build_next_request_with_tools( fn build_next_request_with_tools(
mut request: ResponsesRequest, mut request: ResponsesRequest,
tool_calls: Vec<ToolCall>, tool_calls: Vec<ToolCall>,
tool_results: Vec<ToolResult>, tool_results: Vec<ToolResult>,
analysis: Option<String>, analysis: Option<String>, // Analysis channel content (becomes reasoning content)
partial_text: String, partial_text: String, // Final channel content (becomes message content)
) -> Result<ResponsesRequest, Box<Response>> { ) -> Result<ResponsesRequest, Box<Response>> {
// Get current input items (or empty vec if Text variant) // Get current input items (or empty vec if Text variant)
let mut items = match request.input { let mut items = match request.input {
......
...@@ -596,14 +596,84 @@ impl HarmonyStreamingProcessor { ...@@ -596,14 +596,84 @@ impl HarmonyStreamingProcessor {
Ok(()) Ok(())
} }
/// Decode stream processing for tool loops /// Process streaming chunks for Responses API iteration
/// ///
/// Emits tool call events based on the mode (MCP or Function). /// Emits correct event types based on tool names: mcp_call.* for MCP tools, function_call.* for function tools.
async fn process_decode_stream( /// Pass empty HashSet for function-only tools, full MCP tool names for MCP-only, or subset for mixed.
pub async fn process_responses_iteration_stream(
execution_result: context::ExecutionResult,
emitter: &mut ResponseStreamEventEmitter,
tx: &mpsc::UnboundedSender<Result<Bytes, io::Error>>,
mcp_tool_names: &std::collections::HashSet<String>,
) -> Result<ResponsesIterationResult, String> {
match execution_result {
context::ExecutionResult::Single { stream } => {
debug!("Processing Responses API single stream mode");
Self::process_responses_single_stream_mixed(stream, emitter, tx, mcp_tool_names)
.await
}
context::ExecutionResult::Dual { prefill, decode } => {
debug!("Processing Responses API dual stream mode");
Self::process_responses_dual_stream_mixed(
prefill,
*decode,
emitter,
tx,
mcp_tool_names,
)
.await
}
}
}
/// Process streaming chunks from a single stream
async fn process_responses_single_stream_mixed(
grpc_stream: AbortOnDropStream,
emitter: &mut ResponseStreamEventEmitter,
tx: &mpsc::UnboundedSender<Result<Bytes, io::Error>>,
mcp_tool_names: &std::collections::HashSet<String>,
) -> Result<ResponsesIterationResult, String> {
Self::process_decode_stream_with_tool_lookup(grpc_stream, emitter, tx, Some(mcp_tool_names))
.await
}
/// Process streaming chunks from dual streams
async fn process_responses_dual_stream_mixed(
mut prefill_stream: AbortOnDropStream,
decode_stream: AbortOnDropStream,
emitter: &mut ResponseStreamEventEmitter,
tx: &mpsc::UnboundedSender<Result<Bytes, io::Error>>,
mcp_tool_names: &std::collections::HashSet<String>,
) -> Result<ResponsesIterationResult, String> {
// Phase 1: Process prefill stream (collect metadata, no output)
while let Some(result) = prefill_stream.next().await {
let _response = result.map_err(|e| format!("Prefill stream error: {}", e))?;
}
// Phase 2: Process decode stream with per-tool mode detection
let result = Self::process_decode_stream_with_tool_lookup(
decode_stream,
emitter,
tx,
Some(mcp_tool_names),
)
.await;
// Mark prefill stream as completed AFTER decode completes successfully
// This ensures that if client disconnects during decode, BOTH streams send abort
prefill_stream.mark_completed();
result
}
/// Decode stream processing with optional per-tool mode lookup
///
/// If mcp_tool_names is Some, determines mode per-tool by checking tool name.
/// If mcp_tool_names is None, uses default MCP mode for all tools.
async fn process_decode_stream_with_tool_lookup(
mut decode_stream: AbortOnDropStream, mut decode_stream: AbortOnDropStream,
emitter: &mut ResponseStreamEventEmitter, emitter: &mut ResponseStreamEventEmitter,
tx: &mpsc::UnboundedSender<Result<Bytes, io::Error>>, tx: &mpsc::UnboundedSender<Result<Bytes, io::Error>>,
mode: ToolCallMode, mcp_tool_names: Option<&std::collections::HashSet<String>>,
) -> Result<ResponsesIterationResult, String> { ) -> Result<ResponsesIterationResult, String> {
// Initialize Harmony parser for this iteration // Initialize Harmony parser for this iteration
let mut parser = let mut parser =
...@@ -620,8 +690,9 @@ impl HarmonyStreamingProcessor { ...@@ -620,8 +690,9 @@ impl HarmonyStreamingProcessor {
let mut message_item_id: Option<String> = None; let mut message_item_id: Option<String> = None;
let mut has_emitted_content_part_added = false; let mut has_emitted_content_part_added = false;
// Tool call tracking (call_index -> (output_index, item_id)) // Tool call tracking (call_index -> (output_index, item_id, mode))
let mut tool_call_tracking: HashMap<usize, (usize, String)> = HashMap::new(); // Mode is determined per-tool when mcp_tool_names is provided
let mut tool_call_tracking: HashMap<usize, (usize, String, ToolCallMode)> = HashMap::new();
// Metadata from Complete message // Metadata from Complete message
let mut finish_reason = String::from("stop"); let mut finish_reason = String::from("stop");
...@@ -715,15 +786,7 @@ impl HarmonyStreamingProcessor { ...@@ -715,15 +786,7 @@ impl HarmonyStreamingProcessor {
// Check if this is a new tool call (has id and name) // Check if this is a new tool call (has id and name)
if tc_delta.id.is_some() { if tc_delta.id.is_some() {
// NEW TOOL CALL: Allocate output item // Get tool name first to determine mode
let (output_index, item_id) =
emitter.allocate_output_index(mode.output_item_type());
// Store tracking info
tool_call_tracking
.insert(call_index, (output_index, item_id.clone()));
// Get tool name
let tool_name = tc_delta let tool_name = tc_delta
.function .function
.as_ref() .as_ref()
...@@ -731,11 +794,32 @@ impl HarmonyStreamingProcessor { ...@@ -731,11 +794,32 @@ impl HarmonyStreamingProcessor {
.map(|n| n.as_str()) .map(|n| n.as_str())
.unwrap_or(""); .unwrap_or("");
// Determine mode for this specific tool call
let tool_mode = if let Some(mcp_names) = mcp_tool_names {
// Mixed mode: check if tool is MCP
if mcp_names.contains(tool_name) {
ToolCallMode::Mcp
} else {
ToolCallMode::Function
}
} else {
// Single mode: use MCP (legacy behavior)
ToolCallMode::Mcp
};
// NEW TOOL CALL: Allocate output item
let (output_index, item_id) =
emitter.allocate_output_index(tool_mode.output_item_type());
// Store tracking info with mode
tool_call_tracking
.insert(call_index, (output_index, item_id.clone(), tool_mode));
// Emit output_item.added wrapper event // Emit output_item.added wrapper event
let call_id = tc_delta.id.as_ref().unwrap(); let call_id = tc_delta.id.as_ref().unwrap();
let mut item = json!({ let mut item = json!({
"id": item_id, "id": item_id,
"type": mode.type_str(), "type": tool_mode.type_str(),
"name": tool_name, "name": tool_name,
"call_id": call_id, "call_id": call_id,
"arguments": "", "arguments": "",
...@@ -743,7 +827,7 @@ impl HarmonyStreamingProcessor { ...@@ -743,7 +827,7 @@ impl HarmonyStreamingProcessor {
}); });
// Add server_label for MCP calls // Add server_label for MCP calls
if mode.emits_status_events() { if tool_mode.emits_status_events() {
if let Some(ref server_label) = emitter.mcp_server_label { if let Some(ref server_label) = emitter.mcp_server_label {
item["server_label"] = json!(server_label); item["server_label"] = json!(server_label);
} }
...@@ -753,7 +837,7 @@ impl HarmonyStreamingProcessor { ...@@ -753,7 +837,7 @@ impl HarmonyStreamingProcessor {
emitter.send_event_best_effort(&event, tx); emitter.send_event_best_effort(&event, tx);
// Emit status event if mode supports it (MCP only) // Emit status event if mode supports it (MCP only)
if mode.emits_status_events() { if tool_mode.emits_status_events() {
let event = let event =
emitter.emit_mcp_call_in_progress(output_index, &item_id); emitter.emit_mcp_call_in_progress(output_index, &item_id);
emitter.send_event_best_effort(&event, tx); emitter.send_event_best_effort(&event, tx);
...@@ -762,7 +846,7 @@ impl HarmonyStreamingProcessor { ...@@ -762,7 +846,7 @@ impl HarmonyStreamingProcessor {
// If we have function name, emit initial arguments delta // If we have function name, emit initial arguments delta
if let Some(func) = &tc_delta.function { if let Some(func) = &tc_delta.function {
if func.name.is_some() { if func.name.is_some() {
let event = mode.emit_arguments_delta( let event = tool_mode.emit_arguments_delta(
emitter, emitter,
output_index, output_index,
&item_id, &item_id,
...@@ -773,7 +857,7 @@ impl HarmonyStreamingProcessor { ...@@ -773,7 +857,7 @@ impl HarmonyStreamingProcessor {
} }
} else { } else {
// CONTINUING TOOL CALL: Emit arguments delta // CONTINUING TOOL CALL: Emit arguments delta
if let Some((output_index, item_id)) = if let Some((output_index, item_id, tool_mode)) =
tool_call_tracking.get(&call_index) tool_call_tracking.get(&call_index)
{ {
if let Some(args) = tc_delta if let Some(args) = tc_delta
...@@ -782,7 +866,7 @@ impl HarmonyStreamingProcessor { ...@@ -782,7 +866,7 @@ impl HarmonyStreamingProcessor {
.and_then(|f| f.arguments.as_ref()) .and_then(|f| f.arguments.as_ref())
.filter(|a| !a.is_empty()) .filter(|a| !a.is_empty())
{ {
let event = mode.emit_arguments_delta( let event = tool_mode.emit_arguments_delta(
emitter, emitter,
*output_index, *output_index,
item_id, item_id,
...@@ -820,7 +904,8 @@ impl HarmonyStreamingProcessor { ...@@ -820,7 +904,8 @@ impl HarmonyStreamingProcessor {
// Complete all tool calls if we have commentary // Complete all tool calls if we have commentary
if let Some(ref tool_calls) = accumulated_tool_calls { if let Some(ref tool_calls) = accumulated_tool_calls {
for (call_idx, tool_call) in tool_calls.iter().enumerate() { for (call_idx, tool_call) in tool_calls.iter().enumerate() {
if let Some((output_index, item_id)) = tool_call_tracking.get(&call_idx) if let Some((output_index, item_id, tool_mode)) =
tool_call_tracking.get(&call_idx)
{ {
let tool_name = &tool_call.function.name; let tool_name = &tool_call.function.name;
...@@ -828,7 +913,7 @@ impl HarmonyStreamingProcessor { ...@@ -828,7 +913,7 @@ impl HarmonyStreamingProcessor {
let args_str = let args_str =
tool_call.function.arguments.as_deref().unwrap_or(""); tool_call.function.arguments.as_deref().unwrap_or("");
let event = mode.emit_arguments_done( let event = tool_mode.emit_arguments_done(
emitter, emitter,
*output_index, *output_index,
item_id, item_id,
...@@ -837,7 +922,7 @@ impl HarmonyStreamingProcessor { ...@@ -837,7 +922,7 @@ impl HarmonyStreamingProcessor {
emitter.send_event_best_effort(&event, tx); emitter.send_event_best_effort(&event, tx);
// Emit status event if mode supports it (MCP only) // Emit status event if mode supports it (MCP only)
if mode.emits_status_events() { if tool_mode.emits_status_events() {
let event = let event =
emitter.emit_mcp_call_completed(*output_index, item_id); emitter.emit_mcp_call_completed(*output_index, item_id);
emitter.send_event_best_effort(&event, tx); emitter.send_event_best_effort(&event, tx);
...@@ -846,7 +931,7 @@ impl HarmonyStreamingProcessor { ...@@ -846,7 +931,7 @@ impl HarmonyStreamingProcessor {
// Emit output_item.done wrapper event // Emit output_item.done wrapper event
let mut item = json!({ let mut item = json!({
"id": item_id, "id": item_id,
"type": mode.type_str(), "type": tool_mode.type_str(),
"name": tool_name, "name": tool_name,
"call_id": &tool_call.id, "call_id": &tool_call.id,
"arguments": args_str, "arguments": args_str,
...@@ -854,7 +939,7 @@ impl HarmonyStreamingProcessor { ...@@ -854,7 +939,7 @@ impl HarmonyStreamingProcessor {
}); });
// Add server_label for MCP calls // Add server_label for MCP calls
if mode.emits_status_events() { if tool_mode.emits_status_events() {
// MCP mode - include server_label // MCP mode - include server_label
if let Some(ref server_label) = emitter.mcp_server_label { if let Some(ref server_label) = emitter.mcp_server_label {
item["server_label"] = json!(server_label); item["server_label"] = json!(server_label);
...@@ -943,17 +1028,23 @@ impl HarmonyStreamingProcessor { ...@@ -943,17 +1028,23 @@ impl HarmonyStreamingProcessor {
// Complete any pending tool calls with data from completed messages // Complete any pending tool calls with data from completed messages
if let Some(ref tool_calls) = accumulated_tool_calls { if let Some(ref tool_calls) = accumulated_tool_calls {
for (call_idx, tool_call) in tool_calls.iter().enumerate() { for (call_idx, tool_call) in tool_calls.iter().enumerate() {
if let Some((output_index, item_id)) = tool_call_tracking.get(&call_idx) { if let Some((output_index, item_id, tool_mode)) =
tool_call_tracking.get(&call_idx)
{
let tool_name = &tool_call.function.name; let tool_name = &tool_call.function.name;
// Emit arguments done with final arguments // Emit arguments done with final arguments
let args_str = tool_call.function.arguments.as_deref().unwrap_or(""); let args_str = tool_call.function.arguments.as_deref().unwrap_or("");
let event = let event = tool_mode.emit_arguments_done(
mode.emit_arguments_done(emitter, *output_index, item_id, args_str); emitter,
*output_index,
item_id,
args_str,
);
emitter.send_event_best_effort(&event, tx); emitter.send_event_best_effort(&event, tx);
// Emit status event if mode supports it (MCP only) // Emit status event if mode supports it (MCP only)
if mode.emits_status_events() { if tool_mode.emits_status_events() {
let event = emitter.emit_mcp_call_completed(*output_index, item_id); let event = emitter.emit_mcp_call_completed(*output_index, item_id);
emitter.send_event_best_effort(&event, tx); emitter.send_event_best_effort(&event, tx);
} }
...@@ -961,7 +1052,7 @@ impl HarmonyStreamingProcessor { ...@@ -961,7 +1052,7 @@ impl HarmonyStreamingProcessor {
// Emit output_item.done wrapper event // Emit output_item.done wrapper event
let mut item = json!({ let mut item = json!({
"id": item_id, "id": item_id,
"type": mode.type_str(), "type": tool_mode.type_str(),
"name": tool_name, "name": tool_name,
"call_id": &tool_call.id, "call_id": &tool_call.id,
"arguments": args_str, "arguments": args_str,
...@@ -969,7 +1060,7 @@ impl HarmonyStreamingProcessor { ...@@ -969,7 +1060,7 @@ impl HarmonyStreamingProcessor {
}); });
// Add server_label for MCP calls // Add server_label for MCP calls
if mode.emits_status_events() { if tool_mode.emits_status_events() {
if let Some(ref server_label) = emitter.mcp_server_label { if let Some(ref server_label) = emitter.mcp_server_label {
item["server_label"] = json!(server_label); item["server_label"] = json!(server_label);
} }
...@@ -1061,120 +1152,6 @@ impl HarmonyStreamingProcessor { ...@@ -1061,120 +1152,6 @@ impl HarmonyStreamingProcessor {
}) })
} }
/// Process streaming chunks for Responses API iteration - MCP loop
///
/// Emits mcp_call.* events for all tool calls
pub async fn process_responses_iteration_stream_mcp(
execution_result: context::ExecutionResult,
emitter: &mut ResponseStreamEventEmitter,
tx: &mpsc::UnboundedSender<Result<Bytes, io::Error>>,
) -> Result<ResponsesIterationResult, String> {
match execution_result {
context::ExecutionResult::Single { stream } => {
debug!("Processing Responses API single stream mode (MCP)");
Self::process_responses_single_stream_mcp(stream, emitter, tx).await
}
context::ExecutionResult::Dual { prefill, decode } => {
debug!("Processing Responses API dual stream mode (MCP)");
Self::process_responses_dual_stream_mcp(prefill, *decode, emitter, tx).await
}
}
}
/// Process streaming chunks for Responses API iteration - Function tools
///
/// Emits function_call_arguments.* events for all tool calls
pub async fn process_responses_iteration_stream_function(
execution_result: context::ExecutionResult,
emitter: &mut ResponseStreamEventEmitter,
tx: &mpsc::UnboundedSender<Result<Bytes, io::Error>>,
) -> Result<ResponsesIterationResult, String> {
match execution_result {
context::ExecutionResult::Single { stream } => {
debug!("Processing Responses API single stream mode (Function)");
Self::process_responses_single_stream_function(stream, emitter, tx).await
}
context::ExecutionResult::Dual { prefill, decode } => {
debug!("Processing Responses API dual stream mode (Function)");
Self::process_responses_dual_stream_function(prefill, *decode, emitter, tx).await
}
}
}
/// Process streaming chunks from a single stream - MCP loop
async fn process_responses_single_stream_mcp(
grpc_stream: AbortOnDropStream,
emitter: &mut ResponseStreamEventEmitter,
tx: &mpsc::UnboundedSender<Result<Bytes, io::Error>>,
) -> Result<ResponsesIterationResult, String> {
Self::process_decode_stream(grpc_stream, emitter, tx, ToolCallMode::Mcp).await
}
/// Process streaming chunks from a single stream - Function tools
async fn process_responses_single_stream_function(
grpc_stream: AbortOnDropStream,
emitter: &mut ResponseStreamEventEmitter,
tx: &mpsc::UnboundedSender<Result<Bytes, io::Error>>,
) -> Result<ResponsesIterationResult, String> {
Self::process_decode_stream(grpc_stream, emitter, tx, ToolCallMode::Function).await
}
/// Process streaming chunks from dual streams (common implementation)
async fn process_responses_dual_stream(
mut prefill_stream: AbortOnDropStream,
decode_stream: AbortOnDropStream,
emitter: &mut ResponseStreamEventEmitter,
tx: &mpsc::UnboundedSender<Result<Bytes, io::Error>>,
mode: ToolCallMode,
) -> Result<ResponsesIterationResult, String> {
// Phase 1: Process prefill stream (collect metadata, no output)
while let Some(result) = prefill_stream.next().await {
let _response = result.map_err(|e| format!("Prefill stream error: {}", e))?;
}
// Phase 2: Process decode stream using common helper
let result = Self::process_decode_stream(decode_stream, emitter, tx, mode).await;
// Mark prefill stream as completed AFTER decode completes successfully
// This ensures that if client disconnects during decode, BOTH streams send abort
prefill_stream.mark_completed();
result
}
/// Process streaming chunks from dual streams - MCP loop
async fn process_responses_dual_stream_mcp(
prefill_stream: AbortOnDropStream,
decode_stream: AbortOnDropStream,
emitter: &mut ResponseStreamEventEmitter,
tx: &mpsc::UnboundedSender<Result<Bytes, io::Error>>,
) -> Result<ResponsesIterationResult, String> {
Self::process_responses_dual_stream(
prefill_stream,
decode_stream,
emitter,
tx,
ToolCallMode::Mcp,
)
.await
}
/// Process streaming chunks from dual streams - Function tools
async fn process_responses_dual_stream_function(
prefill_stream: AbortOnDropStream,
decode_stream: AbortOnDropStream,
emitter: &mut ResponseStreamEventEmitter,
tx: &mpsc::UnboundedSender<Result<Bytes, io::Error>>,
) -> Result<ResponsesIterationResult, String> {
Self::process_responses_dual_stream(
prefill_stream,
decode_stream,
emitter,
tx,
ToolCallMode::Function,
)
.await
}
/// Build SSE response from receiver /// Build SSE response from receiver
fn build_sse_response(rx: mpsc::UnboundedReceiver<Result<Bytes, io::Error>>) -> Response { fn build_sse_response(rx: mpsc::UnboundedReceiver<Result<Bytes, io::Error>>) -> Response {
let stream = UnboundedReceiverStream::new(rx); let stream = UnboundedReceiverStream::new(rx);
......
...@@ -40,33 +40,6 @@ use crate::{ ...@@ -40,33 +40,6 @@ use crate::{
}, },
}; };
/// Extract function call from a chat completion response
/// Returns (call_id, tool_name, arguments_json_str) if found
fn extract_function_call_from_chat(
response: &ChatCompletionResponse,
) -> Option<(String, String, String)> {
// Check if response has choices with tool calls
let choice = response.choices.first()?;
let message = &choice.message;
// Look for tool_calls in the message
if let Some(tool_calls) = &message.tool_calls {
if let Some(tool_call) = tool_calls.first() {
return Some((
tool_call.id.clone(),
tool_call.function.name.clone(),
tool_call
.function
.arguments
.clone()
.unwrap_or_else(|| "{}".to_string()),
));
}
}
None
}
/// Merge function tools from request with MCP tools and set tool_choice based on iteration /// Merge function tools from request with MCP tools and set tool_choice based on iteration
fn prepare_chat_tools_and_choice( fn prepare_chat_tools_and_choice(
chat_request: &mut ChatCompletionRequest, chat_request: &mut ChatCompletionRequest,
...@@ -294,27 +267,61 @@ pub(super) async fn execute_tool_loop( ...@@ -294,27 +267,61 @@ pub(super) async fn execute_tool_loop(
) )
.await?; .await?;
// Check for function calls // Check for function calls (extract all for parallel execution)
if let Some((call_id, tool_name, args_json_str)) = let tool_calls = extract_all_tool_calls_from_chat(&chat_response);
extract_function_call_from_chat(&chat_response)
{ if !tool_calls.is_empty() {
state.iteration += 1; state.iteration += 1;
debug!( debug!(
"Tool loop iteration {}: found call to {} (call_id: {})", "Tool loop iteration {}: found {} tool call(s)",
state.iteration, tool_name, call_id state.iteration,
tool_calls.len()
);
// Separate MCP and function tool calls
let mcp_tool_names: std::collections::HashSet<&str> =
mcp_tools.iter().map(|t| t.name.as_ref()).collect();
let (mcp_tool_calls, function_tool_calls): (Vec<_>, Vec<_>) = tool_calls
.into_iter()
.partition(|(_, tool_name, _)| mcp_tool_names.contains(tool_name.as_str()));
debug!(
"Separated tool calls: {} MCP, {} function",
mcp_tool_calls.len(),
function_tool_calls.len()
); );
// Check combined limit BEFORE executing // If ANY tool call is a function tool, return to caller immediately
if !function_tool_calls.is_empty() {
// Convert chat response to responses format (includes all tool calls)
let responses_response = conversions::chat_to_responses(
&chat_response,
original_request,
response_id.clone(),
)
.map_err(|e| {
error::internal_error(format!("Failed to convert to responses format: {}", e))
})?;
// Return response with function tool calls to caller
return Ok(responses_response);
}
// All MCP tools - check combined limit BEFORE executing
let effective_limit = match max_tool_calls { let effective_limit = match max_tool_calls {
Some(user_max) => user_max.min(MAX_ITERATIONS), Some(user_max) => user_max.min(MAX_ITERATIONS),
None => MAX_ITERATIONS, None => MAX_ITERATIONS,
}; };
if state.total_calls >= effective_limit { if state.total_calls + mcp_tool_calls.len() > effective_limit {
warn!( warn!(
"Reached tool call limit: {} (max_tool_calls={:?}, safety_limit={})", "Reached tool call limit: {} + {} > {} (max_tool_calls={:?}, safety_limit={})",
state.total_calls, max_tool_calls, MAX_ITERATIONS state.total_calls,
mcp_tool_calls.len(),
effective_limit,
max_tool_calls,
MAX_ITERATIONS
); );
// Convert chat response to responses format and mark as incomplete // Convert chat response to responses format and mark as incomplete
...@@ -334,46 +341,49 @@ pub(super) async fn execute_tool_loop( ...@@ -334,46 +341,49 @@ pub(super) async fn execute_tool_loop(
return Ok(responses_response); return Ok(responses_response);
} }
// Increment after check // Execute all MCP tools
state.total_calls += 1; for (call_id, tool_name, args_json_str) in mcp_tool_calls {
debug!(
"Calling MCP tool '{}' (call_id: {}) with args: {}",
tool_name, call_id, args_json_str
);
// Execute the MCP tool - manager handles parsing and type coercion let (output_str, success, error) = match ctx
debug!( .mcp_manager
"Calling MCP tool '{}' with args: {}", .call_tool(tool_name.as_str(), args_json_str.as_str())
tool_name, args_json_str .await
); {
let (output_str, success, error) = match ctx Ok(result) => match serde_json::to_string(&result) {
.mcp_manager Ok(output) => (output, true, None),
.call_tool(tool_name.as_str(), args_json_str.as_str()) Err(e) => {
.await let err = format!("Failed to serialize tool result: {}", e);
{ warn!("{}", err);
Ok(result) => match serde_json::to_string(&result) { let error_json = json!({ "error": &err }).to_string();
Ok(output) => (output, true, None), (error_json, false, Some(err))
Err(e) => { }
let err = format!("Failed to serialize tool result: {}", e); },
warn!("{}", err); Err(err) => {
let error_json = json!({ "error": &err }).to_string(); let err_str = format!("tool call failed: {}", err);
(error_json, false, Some(err)) warn!("Tool execution failed: {}", err_str);
// Return error as output, let model decide how to proceed
let error_json = json!({ "error": &err_str }).to_string();
(error_json, false, Some(err_str))
} }
}, };
Err(err) => {
let err_str = format!("tool call failed: {}", err);
warn!("Tool execution failed: {}", err_str);
// Return error as output, let model decide how to proceed
let error_json = json!({ "error": &err_str }).to_string();
(error_json, false, Some(err_str))
}
};
// Record the call in state // Record the call in state
state.record_call( state.record_call(
call_id, call_id,
tool_name, tool_name,
args_json_str, args_json_str,
output_str, output_str,
success, success,
error, error,
); );
// Increment total calls counter
state.total_calls += 1;
}
// Build resume request with conversation history // Build resume request with conversation history
// Start with original input // Start with original input
...@@ -687,17 +697,30 @@ async fn execute_tool_loop_streaming_internal( ...@@ -687,17 +697,30 @@ async fn execute_tool_loop_streaming_internal(
tool_calls.len() tool_calls.len()
); );
// Check combined limit // Separate MCP and function tool calls
let mcp_tool_names: std::collections::HashSet<&str> =
mcp_tools.iter().map(|t| t.name.as_ref()).collect();
let (mcp_tool_calls, function_tool_calls): (Vec<_>, Vec<_>) = tool_calls
.into_iter()
.partition(|(_, tool_name, _)| mcp_tool_names.contains(tool_name.as_str()));
debug!(
"Separated tool calls: {} MCP, {} function",
mcp_tool_calls.len(),
function_tool_calls.len()
);
// Check combined limit (only count MCP tools since function tools will be returned)
let effective_limit = match max_tool_calls { let effective_limit = match max_tool_calls {
Some(user_max) => user_max.min(MAX_ITERATIONS), Some(user_max) => user_max.min(MAX_ITERATIONS),
None => MAX_ITERATIONS, None => MAX_ITERATIONS,
}; };
if state.total_calls + tool_calls.len() > effective_limit { if state.total_calls + mcp_tool_calls.len() > effective_limit {
warn!( warn!(
"Reached tool call limit: {} + {} > {} (max_tool_calls={:?}, safety_limit={})", "Reached tool call limit: {} + {} > {} (max_tool_calls={:?}, safety_limit={})",
state.total_calls, state.total_calls,
tool_calls.len(), mcp_tool_calls.len(),
effective_limit, effective_limit,
max_tool_calls, max_tool_calls,
MAX_ITERATIONS MAX_ITERATIONS
...@@ -705,8 +728,8 @@ async fn execute_tool_loop_streaming_internal( ...@@ -705,8 +728,8 @@ async fn execute_tool_loop_streaming_internal(
break; break;
} }
// Process each tool call // Process each MCP tool call
for (call_id, tool_name, args_json_str) in tool_calls { for (call_id, tool_name, args_json_str) in mcp_tool_calls {
state.total_calls += 1; state.total_calls += 1;
debug!( debug!(
...@@ -846,6 +869,70 @@ async fn execute_tool_loop_streaming_internal( ...@@ -846,6 +869,70 @@ async fn execute_tool_loop_streaming_internal(
); );
} }
// If there are function tool calls, emit events and exit MCP loop
if !function_tool_calls.is_empty() {
debug!(
"Found {} function tool call(s) - emitting events and exiting MCP loop",
function_tool_calls.len()
);
// Emit function_tool_call events for each function tool
for (call_id, tool_name, args_json_str) in function_tool_calls {
// Allocate output_index for this function_tool_call item
let (output_index, item_id) =
emitter.allocate_output_index(OutputItemType::FunctionCall);
// Build initial function_tool_call item
let item = json!({
"id": item_id,
"type": "function_tool_call",
"call_id": call_id,
"name": tool_name,
"status": "in_progress",
"arguments": ""
});
// Emit output_item.added
let event = emitter.emit_output_item_added(output_index, &item);
emitter.send_event(&event, &tx)?;
// Emit function_call_arguments.delta
let event = emitter.emit_function_call_arguments_delta(
output_index,
&item_id,
&args_json_str,
);
emitter.send_event(&event, &tx)?;
// Emit function_call_arguments.done
let event = emitter.emit_function_call_arguments_done(
output_index,
&item_id,
&args_json_str,
);
emitter.send_event(&event, &tx)?;
// Build complete item
let item_complete = json!({
"id": item_id,
"type": "function_tool_call",
"call_id": call_id,
"name": tool_name,
"status": "completed",
"arguments": args_json_str
});
// Emit output_item.done
let event = emitter.emit_output_item_done(output_index, &item_complete);
emitter.send_event(&event, &tx)?;
emitter.complete_output_item(output_index);
}
// Break loop to return response to caller
break;
}
// Build next request with conversation history // Build next request with conversation history
let mut input_items = match &state.original_input { let mut input_items = match &state.original_input {
ResponseInput::Text(text) => vec![ResponseInputOutputItem::Message { ResponseInput::Text(text) => vec![ResponseInputOutputItem::Message {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment