//! Harmony Responses API implementation with multi-turn MCP tool support //! //! This module implements the Harmony Responses API orchestration logic, //! coordinating full pipeline execution with MCP tool support for multi-turn conversations. //! //! ## Architecture //! //! Multi-turn pipeline orchestration (NOT just a tool loop): //! - Serves Harmony Responses API requests end-to-end //! - Each iteration executes FULL pipeline (worker selection + client acquisition + execution + parsing) //! - Handles MCP tool execution and history building between iterations //! - Clean separation: serving orchestration (this file) vs. pipeline stages (stages/) //! //! ## Flow //! //! ```text //! loop { //! // Execute through FULL pipeline //! let result = pipeline.execute_harmony_responses(&request, &ctx).await?; //! //! match result { //! ToolCallsFound { tool_calls, .. } => { //! // Execute MCP tools //! // Build next request with tool results //! // Continue loop //! } //! Completed { response, .. } => { //! return Ok(response); //! } //! } //! } //! ``` //! //! ## Design Reference //! //! See `/Users/simolin/workspace/sglang/.claude/docs/harmony_pipeline/tool_loop_design.md` //! for complete architecture, rationale, and implementation details. use std::{ sync::Arc, time::{SystemTime, UNIX_EPOCH}, }; use axum::response::Response; use bytes::Bytes; use serde_json::{from_str, from_value, json, to_string, to_value, Value}; use tokio::sync::mpsc; use tracing::{debug, warn}; use uuid::Uuid; use crate::{ data_connector::{ResponseId, ResponseStorage}, mcp::{self, McpManager}, protocols::{ common::{Function, ToolCall, Usage}, responses::{ McpToolInfo, ResponseContentPart, ResponseInput, ResponseInputOutputItem, ResponseOutputItem, ResponseReasoningContent, ResponseStatus, ResponseTool, ResponseToolType, ResponseUsage, ResponsesRequest, ResponsesResponse, ResponsesUsage, StringOrContentParts, }, }, routers::grpc::{ common::responses::{ build_sse_response, ensure_mcp_connection, streaming::{OutputItemType, ResponseStreamEventEmitter}, }, context::SharedComponents, error, harmony::{processor::ResponsesIterationResult, streaming::HarmonyStreamingProcessor}, pipeline::RequestPipeline, }, }; /// Maximum number of tool execution iterations to prevent infinite loops const MAX_TOOL_ITERATIONS: usize = 10; /// Record of a single MCP tool call execution /// /// Stores metadata needed to build mcp_call output items for Responses API format #[derive(Debug, Clone)] struct McpCallRecord { /// Tool call ID (stored for potential future use, currently generate new IDs) #[allow(dead_code)] call_id: String, /// Tool name tool_name: String, /// JSON-encoded arguments arguments: String, /// JSON-encoded output/result output: String, /// Whether execution succeeded success: bool, /// Error message if execution failed error: Option, } /// Tracking structure for MCP tool calls across iterations /// /// Accumulates all MCP tool call metadata during multi-turn conversation /// so we can build proper mcp_list_tools and mcp_call output items. #[derive(Debug, Clone)] struct McpCallTracking { /// MCP server label (e.g., "sglang-mcp") server_label: String, /// All tool call records across all iterations tool_calls: Vec, } impl McpCallTracking { pub fn new(server_label: String) -> Self { Self { server_label, tool_calls: Vec::new(), } } fn record_call( &mut self, call_id: String, tool_name: String, arguments: String, output: String, success: bool, error: Option, ) { self.tool_calls.push(McpCallRecord { call_id, tool_name, arguments, output, success, error, }); } fn total_calls(&self) -> usize { self.tool_calls.len() } } /// Context for Harmony Responses execution with MCP tool support /// /// Contains all dependencies needed for multi-turn Responses API execution. /// Cheap to clone (all Arc references). #[derive(Clone)] pub struct HarmonyResponsesContext { /// Pipeline for executing Harmony requests pub pipeline: Arc, /// Shared components (tokenizer, parsers) pub components: Arc, /// MCP manager for tool execution pub mcp_manager: Arc, /// Response storage for loading conversation history pub response_storage: Arc, /// Optional streaming sender (for future streaming support) pub stream_tx: Option>>, } impl HarmonyResponsesContext { /// Create a new Harmony Responses context pub fn new( pipeline: Arc, components: Arc, mcp_manager: Arc, response_storage: Arc, ) -> Self { Self { pipeline, components, mcp_manager, response_storage, stream_tx: None, } } /// Create with streaming support pub fn with_streaming( pipeline: Arc, components: Arc, mcp_manager: Arc, response_storage: Arc, stream_tx: mpsc::UnboundedSender>, ) -> Self { Self { pipeline, components, mcp_manager, response_storage, stream_tx: Some(stream_tx), } } } /// Execute Harmony Responses API request with multi-turn MCP tool support /// /// This function orchestrates the multi-turn conversation flow: /// 1. Execute request through full pipeline /// 2. Check for tool calls in commentary channel /// 3. If tool calls found: /// - Execute MCP tools /// - Build next request with tool results /// - Repeat from step 1 (full pipeline re-execution) /// 4. If no tool calls, return final response /// /// # Architecture /// /// Uses **external loop pattern**: wraps full pipeline execution rather than /// implementing loop inside pipeline. Each iteration goes through: /// - Worker selection (fresh selection based on current context) /// - Client acquisition (new gRPC client if worker changed) /// - Request building (Harmony prefill with complete history) /// - Execution (model generation) /// - Response processing (parse channels, detect tool calls) /// /// # Arguments /// /// * `ctx` - Harmony Responses context with pipeline, components, MCP manager /// * `request` - Initial Responses API request /// /// # Returns /// /// Final ResponsesResponse after all tool iterations complete /// /// # Errors /// /// Returns error if: /// - Max iterations exceeded (10 iterations) /// - Pipeline execution fails /// - MCP tool execution fails /// - Response building fails pub async fn serve_harmony_responses( ctx: &HarmonyResponsesContext, request: ResponsesRequest, ) -> Result { // Load previous conversation history if previous_response_id is set let current_request = load_previous_messages(ctx, request).await?; // Check MCP connection and get whether MCP tools are present let has_mcp_tools = ensure_mcp_connection(&ctx.mcp_manager, current_request.tools.as_deref()).await?; if has_mcp_tools { execute_with_mcp_loop(ctx, current_request).await } else { // No MCP tools - execute pipeline once (may have function tools or no tools) execute_without_mcp_loop(ctx, current_request).await } } /// Execute Harmony Responses with MCP tool loop /// /// Automatically executes MCP tools in a loop until no more tool calls or max iterations async fn execute_with_mcp_loop( ctx: &HarmonyResponsesContext, mut current_request: ResponsesRequest, ) -> Result { let mut iteration_count = 0; let mut mcp_tracking = McpCallTracking::new("sglang-mcp".to_string()); // Extract user's max_tool_calls limit (if set) let max_tool_calls = current_request.max_tool_calls.map(|n| n as usize); // Add static MCP tools from inventory to the request let mcp_tools = ctx.mcp_manager.list_tools(); if !mcp_tools.is_empty() { let mcp_response_tools = convert_mcp_tools_to_response_tools(&mcp_tools); let mut all_tools = current_request.tools.clone().unwrap_or_default(); all_tools.extend(mcp_response_tools); current_request.tools = Some(all_tools); debug!( mcp_tool_count = mcp_tools.len(), total_tool_count = current_request.tools.as_ref().map(|t| t.len()).unwrap_or(0), "MCP client available - added static MCP tools to Harmony Responses request" ); } loop { iteration_count += 1; // Safety check: prevent infinite loops if iteration_count > MAX_TOOL_ITERATIONS { return Err(error::internal_error(format!( "Maximum tool iterations ({}) exceeded", MAX_TOOL_ITERATIONS ))); } debug!( iteration = iteration_count, "Harmony Responses serving iteration" ); // Execute through full pipeline // This includes: // - HarmonyPreparationStage (builder.rs: construct_input_messages_with_harmony) // - WorkerSelectionStage (FRESH selection based on current context) // - ClientAcquisitionStage (NEW gRPC client if needed) // - HarmonyRequestBuildingStage (encode to token_ids) // - RequestExecutionStage (model generation) // - HarmonyResponseProcessingStage (processor.rs: process_responses_iteration) let iteration_result = ctx .pipeline .execute_harmony_responses(¤t_request, ctx) .await?; match iteration_result { ResponsesIterationResult::ToolCallsFound { tool_calls, analysis, partial_text, usage, request_id, } => { debug!( tool_call_count = tool_calls.len(), has_analysis = analysis.is_some(), partial_text_len = partial_text.len(), "Tool calls found - checking limits before executing MCP tools" ); // Check combined limit (user's max_tool_calls vs safety limit) let effective_limit = match max_tool_calls { Some(user_max) => user_max.min(MAX_TOOL_ITERATIONS), None => MAX_TOOL_ITERATIONS, }; // Check if we would exceed the limit with these new tool calls let total_calls_after = mcp_tracking.total_calls() + tool_calls.len(); if total_calls_after > effective_limit { warn!( current_calls = mcp_tracking.total_calls(), new_calls = tool_calls.len(), total_after = total_calls_after, effective_limit = effective_limit, user_max = ?max_tool_calls, "Reached tool call limit - returning incomplete response" ); // Build response with incomplete status let mut response = build_function_tool_response( tool_calls, analysis, partial_text, usage, request_id, Arc::new(current_request), ); // Mark as completed with incomplete_details response.status = ResponseStatus::Completed; response.incomplete_details = Some(json!({ "reason": "max_tool_calls" })); // Inject MCP metadata if any calls were executed if mcp_tracking.total_calls() > 0 { inject_mcp_metadata(&mut response, &mcp_tracking, &ctx.mcp_manager); } return Ok(response); } // Execute MCP tools let tool_results = execute_mcp_tools(&ctx.mcp_manager, &tool_calls, &mut mcp_tracking).await?; // Build next request with appended history current_request = build_next_request_with_tools( current_request, tool_calls, tool_results, analysis, partial_text, ) .map_err(|e| *e)?; // Continue loop - next iteration will select workers and execute } ResponsesIterationResult::Completed { mut response, usage, } => { debug!( output_items = response.output.len(), input_tokens = usage.prompt_tokens, output_tokens = usage.completion_tokens, "MCP loop completed - no more tool calls" ); // Inject MCP metadata into final response inject_mcp_metadata(&mut response, &mcp_tracking, &ctx.mcp_manager); debug!( mcp_calls = mcp_tracking.total_calls(), output_items_after = response.output.len(), "Injected MCP metadata into final response" ); // No tool calls - this is the final response return Ok(*response); } } } } /// Execute Harmony Responses without MCP loop (single execution) /// /// For function tools or no tools - executes pipeline once and returns async fn execute_without_mcp_loop( ctx: &HarmonyResponsesContext, current_request: ResponsesRequest, ) -> Result { debug!("Executing Harmony Responses without MCP loop"); // Execute pipeline once let iteration_result = ctx .pipeline .execute_harmony_responses(¤t_request, ctx) .await?; match iteration_result { ResponsesIterationResult::ToolCallsFound { tool_calls, analysis, partial_text, usage, request_id, } => { // Function tool calls found - return to caller for execution debug!( tool_call_count = tool_calls.len(), "Function tool calls found - returning to caller" ); Ok(build_function_tool_response( tool_calls, analysis, partial_text, usage, request_id, Arc::new(current_request), )) } ResponsesIterationResult::Completed { response, usage: _ } => { // No tool calls - return completed response debug!("No tool calls - returning completed response"); Ok(*response) } } } /// Serve Harmony Responses API with streaming (SSE) /// /// This is the streaming equivalent of `serve_harmony_responses()`. /// Emits SSE events for lifecycle, MCP list_tools, and per-iteration streaming. /// /// # Architecture /// /// - Emits `response.created` and `response.in_progress` at start /// - Emits `mcp_list_tools` events on first iteration (if MCP tools available) /// - Loops through tool execution iterations (max 10) /// - Calls `streaming::process_responses_iteration_stream()` for per-iteration events /// - Emits `response.completed` at end /// - Handles errors with `response.failed` /// /// # Arguments /// /// * `ctx` - Harmony responses context with pipeline and dependencies /// * `request` - Responses API request /// /// # Returns /// /// SSE stream response with proper headers pub async fn serve_harmony_responses_stream( ctx: &HarmonyResponsesContext, request: ResponsesRequest, ) -> Response { // Load previous conversation history if previous_response_id is set let current_request = match load_previous_messages(ctx, request).await { Ok(req) => req, Err(err_response) => return err_response, }; // Check MCP connection BEFORE starting stream and get whether MCP tools are present let has_mcp_tools = match ensure_mcp_connection(&ctx.mcp_manager, current_request.tools.as_deref()).await { Ok(has_mcp) => has_mcp, Err(response) => return response, }; // Create SSE channel let (tx, rx) = mpsc::unbounded_channel(); // Create response event emitter let response_id = format!("resp_{}", Uuid::new_v4()); let model = current_request.model.clone(); let created_at = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(); let mut emitter = ResponseStreamEventEmitter::new(response_id.clone(), model, created_at); // Clone context for spawned task let ctx_clone = ctx.clone(); // Spawn async task to handle streaming tokio::spawn(async move { let ctx = &ctx_clone; // Emit initial response.created and response.in_progress events let event = emitter.emit_created(); if emitter.send_event(&event, &tx).is_err() { return; } let event = emitter.emit_in_progress(); if emitter.send_event(&event, &tx).is_err() { return; } if has_mcp_tools { execute_mcp_tool_loop_streaming(ctx, current_request, &mut emitter, &tx).await; } else { execute_without_mcp_streaming(ctx, ¤t_request, &mut emitter, &tx).await; } }); // Return SSE stream response build_sse_response(rx) } // Execute MCP tool loop with streaming /// /// Handles the full MCP workflow: /// - Adds static MCP tools to request /// - Emits mcp_list_tools events /// - Loops through tool execution iterations /// - Emits final response.completed event async fn execute_mcp_tool_loop_streaming( ctx: &HarmonyResponsesContext, mut current_request: ResponsesRequest, emitter: &mut ResponseStreamEventEmitter, tx: &mpsc::UnboundedSender>, ) { // Initialize MCP call tracking let mut mcp_tracking = McpCallTracking::new("sglang-mcp".to_string()); // Extract user's max_tool_calls limit (if set) let max_tool_calls = current_request.max_tool_calls.map(|n| n as usize); // Add static MCP tools from inventory let mcp_tools = ctx.mcp_manager.list_tools(); if !mcp_tools.is_empty() { let mcp_response_tools = convert_mcp_tools_to_response_tools(&mcp_tools); let mut all_tools = current_request.tools.clone().unwrap_or_default(); all_tools.extend(mcp_response_tools); current_request.tools = Some(all_tools); debug!( mcp_tool_count = mcp_tools.len(), total_tool_count = current_request.tools.as_ref().map(|t| t.len()).unwrap_or(0), "MCP client available - added static MCP tools to Harmony Responses streaming request" ); } // Emit mcp_list_tools on first iteration let (output_index, item_id) = emitter.allocate_output_index(OutputItemType::McpListTools); // Build tools list for item structure let tool_items: Vec<_> = mcp_tools .iter() .map(|t| { json!({ "name": t.name, "description": t.description, "input_schema": Value::Object((*t.input_schema).clone()) }) }) .collect(); // Emit output_item.added let item = json!({ "id": item_id, "type": "mcp_list_tools", "server_label": "sglang-mcp", "status": "in_progress", "tools": [] }); let event = emitter.emit_output_item_added(output_index, &item); if emitter.send_event(&event, tx).is_err() { return; } // Emit mcp_list_tools.in_progress let event = emitter.emit_mcp_list_tools_in_progress(output_index); if emitter.send_event(&event, tx).is_err() { return; } // Emit mcp_list_tools.completed let event = emitter.emit_mcp_list_tools_completed(output_index, &mcp_tools); if emitter.send_event(&event, tx).is_err() { return; } // Emit output_item.done let item_done = json!({ "id": item_id, "type": "mcp_list_tools", "server_label": "sglang-mcp", "status": "completed", "tools": tool_items }); let event = emitter.emit_output_item_done(output_index, &item_done); if emitter.send_event(&event, tx).is_err() { return; } emitter.complete_output_item(output_index); debug!( tool_count = mcp_tools.len(), "Emitted mcp_list_tools on first iteration" ); // MCP tool loop (max 10 iterations) let mut iteration_count = 0; loop { iteration_count += 1; // Safety check: prevent infinite loops if iteration_count > MAX_TOOL_ITERATIONS { emitter.emit_error( &format!("Maximum tool iterations ({}) exceeded", MAX_TOOL_ITERATIONS), Some("max_iterations_exceeded"), tx, ); return; } debug!( iteration = iteration_count, "Harmony Responses streaming iteration" ); // Execute pipeline and get stream let execution_result = match ctx .pipeline .execute_harmony_responses_streaming(¤t_request, ctx) .await { Ok(result) => result, Err(err_response) => { emitter.emit_error( &format!("Pipeline execution failed: {:?}", err_response), Some("pipeline_error"), tx, ); return; } }; // Process stream with token-level streaming (MCP path - emits mcp_call.* events) let iteration_result = match HarmonyStreamingProcessor::process_responses_iteration_stream_mcp( execution_result, emitter, tx, ) .await { Ok(result) => result, Err(err_msg) => { emitter.emit_error(&err_msg, Some("processing_error"), tx); return; } }; // Handle iteration result (tool calls or completion) match iteration_result { ResponsesIterationResult::ToolCallsFound { tool_calls, analysis, partial_text, usage, request_id: _, } => { debug!( tool_call_count = tool_calls.len(), has_analysis = analysis.is_some(), partial_text_len = partial_text.len(), "MCP tool calls found in commentary channel - checking limits" ); // Check combined limit (user's max_tool_calls vs safety limit) let effective_limit = match max_tool_calls { Some(user_max) => user_max.min(MAX_TOOL_ITERATIONS), None => MAX_TOOL_ITERATIONS, }; // Check if we would exceed the limit with these new tool calls let total_calls_after = mcp_tracking.total_calls() + tool_calls.len(); if total_calls_after > effective_limit { warn!( current_calls = mcp_tracking.total_calls(), new_calls = tool_calls.len(), total_after = total_calls_after, effective_limit = effective_limit, user_max = ?max_tool_calls, "Reached tool call limit in streaming - emitting completion with incomplete_details" ); // Emit response.completed with incomplete_details and usage let incomplete_details = json!({ "reason": "max_tool_calls" }); let usage_json = json!({ "prompt_tokens": usage.prompt_tokens, "completion_tokens": usage.completion_tokens, "total_tokens": usage.total_tokens, "incomplete_details": incomplete_details, }); let event = emitter.emit_completed(Some(&usage_json)); emitter.send_event_best_effort(&event, tx); return; } // Execute MCP tools and continue loop let tool_results = match execute_mcp_tools(&ctx.mcp_manager, &tool_calls, &mut mcp_tracking).await { Ok(results) => results, Err(err_response) => { emitter.emit_error( &format!("MCP tool execution failed: {:?}", err_response), Some("mcp_tool_error"), tx, ); return; } }; // Build next request with appended history current_request = match build_next_request_with_tools( current_request, tool_calls, tool_results, analysis, partial_text, ) { Ok(req) => req, Err(e) => { emitter.emit_error( &format!("Failed to build next request: {:?}", e), Some("request_building_error"), tx, ); return; } }; // Continue loop } ResponsesIterationResult::Completed { response, usage } => { debug!( output_items = response.output.len(), input_tokens = usage.prompt_tokens, output_tokens = usage.completion_tokens, "Harmony Responses streaming completed - no more tool calls" ); // Emit response.completed with usage let usage_json = json!({ "prompt_tokens": usage.prompt_tokens, "completion_tokens": usage.completion_tokens, "total_tokens": usage.total_tokens, }); let event = emitter.emit_completed(Some(&usage_json)); emitter.send_event_best_effort(&event, tx); return; } } } } /// Execute without MCP tool loop (single execution with streaming) /// /// For function tools or no tools - executes pipeline once and emits completion. /// The streaming processor handles all output items (reasoning, message, function tool calls). async fn execute_without_mcp_streaming( ctx: &HarmonyResponsesContext, current_request: &ResponsesRequest, emitter: &mut ResponseStreamEventEmitter, tx: &mpsc::UnboundedSender>, ) { debug!("No MCP tools - executing single iteration"); // Execute pipeline and get stream let execution_result = match ctx .pipeline .execute_harmony_responses_streaming(current_request, ctx) .await { Ok(result) => result, Err(err_response) => { emitter.emit_error( &format!("Pipeline execution failed: {:?}", err_response), Some("pipeline_error"), tx, ); return; } }; // Process stream (emits all output items during streaming - function tool path emits function_call_arguments.* events) if let Err(err_msg) = HarmonyStreamingProcessor::process_responses_iteration_stream_function( execution_result, emitter, tx, ) .await { emitter.emit_error(&err_msg, Some("processing_error"), tx); return; } // Emit response.completed let event = emitter.emit_completed(None); emitter.send_event_best_effort(&event, tx); } /// Build ResponsesResponse with function tool calls for caller to execute /// /// When tool calls are found but no MCP client is available (function tools only), /// this builds a response with status=Completed and tool calls without output field. /// The absence of output signals the caller should execute tools and resume. /// /// TODO: Refactor to use builder pattern fn build_function_tool_response( tool_calls: Vec, analysis: Option, partial_text: String, usage: Usage, request_id: String, responses_request: Arc, ) -> ResponsesResponse { let mut output: Vec = Vec::new(); // Add reasoning output item if analysis exists if let Some(analysis_text) = analysis { output.push(ResponseOutputItem::Reasoning { id: format!("reasoning_{}", request_id), summary: vec![], content: vec![ResponseReasoningContent::ReasoningText { text: analysis_text, }], status: Some("completed".to_string()), }); } // Add message output item if partial text exists if !partial_text.is_empty() { output.push(ResponseOutputItem::Message { id: format!("msg_{}", request_id), role: "assistant".to_string(), content: vec![ResponseContentPart::OutputText { text: partial_text, annotations: vec![], logprobs: None, }], status: "completed".to_string(), }); } // Add function tool calls as completed output items (no output field = needs execution) for tool_call in tool_calls { output.push(ResponseOutputItem::FunctionToolCall { id: tool_call.id.clone(), call_id: tool_call.id.clone(), name: tool_call.function.name.clone(), arguments: tool_call.function.arguments.clone().unwrap_or_default(), output: None, // No output = tool needs execution by caller status: "completed".to_string(), }); } // Build ResponsesResponse with Completed status // The presence of FunctionToolCall items without output signals tool execution needed let created_at = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs() as i64; ResponsesResponse { id: request_id, object: "response".to_string(), created_at, status: ResponseStatus::Completed, error: None, incomplete_details: None, instructions: responses_request.instructions.clone(), max_output_tokens: responses_request.max_output_tokens, model: responses_request.model.clone(), output, parallel_tool_calls: responses_request.parallel_tool_calls.unwrap_or(true), previous_response_id: responses_request.previous_response_id.clone(), reasoning: None, store: responses_request.store.unwrap_or(true), temperature: responses_request.temperature, text: None, tool_choice: responses_request .tool_choice .as_ref() .map(|tc| to_string(tc).unwrap_or_else(|_| "auto".to_string())) .unwrap_or_else(|| "auto".to_string()), tools: responses_request.tools.clone().unwrap_or_default(), top_p: responses_request.top_p, truncation: None, usage: Some(ResponsesUsage::Modern(ResponseUsage { input_tokens: usage.prompt_tokens, output_tokens: usage.completion_tokens, total_tokens: usage.total_tokens, input_tokens_details: None, output_tokens_details: None, })), user: None, safety_identifier: responses_request.user.clone(), metadata: responses_request.metadata.clone().unwrap_or_default(), } } /// Execute MCP tools and collect results /// /// Executes each tool call sequentially via the MCP manager. /// Tool execution errors are returned as error results to the model /// (allows model to handle gracefully). /// /// # Arguments /// /// * `mcp_manager` - MCP manager for tool execution /// * `tool_calls` - Tool calls from commentary channel /// /// # Returns /// /// Vector of tool results (one per tool call) async fn execute_mcp_tools( mcp_manager: &Arc, tool_calls: &[ToolCall], tracking: &mut McpCallTracking, ) -> Result, Response> { let mut results = Vec::new(); for tool_call in tool_calls { debug!( tool_name = %tool_call.function.name, call_id = %tool_call.id, "Executing MCP tool" ); // Parse tool arguments from JSON string let args_str = tool_call.function.arguments.as_deref().unwrap_or("{}"); let args: Value = from_str(args_str).map_err(|e| { error::internal_error(format!( "Invalid tool arguments JSON for tool '{}': {}", tool_call.function.name, e )) })?; // Execute tool via MCP manager let args_map = if let Value::Object(map) = args { Some(map) } else { None }; match mcp_manager .call_tool(&tool_call.function.name, args_map) .await { Ok(mcp_result) => { debug!( tool_name = %tool_call.function.name, call_id = %tool_call.id, "Tool execution succeeded" ); // Extract content from MCP result let output = if let Some(content) = mcp_result.content.first() { // Serialize the entire content item to_value(content) .unwrap_or_else(|_| json!({"error": "Failed to serialize tool result"})) } else { json!({"result": "success"}) }; let is_error = mcp_result.is_error.unwrap_or(false); let output_str = to_string(&output) .unwrap_or_else(|_| r#"{"error": "Failed to serialize output"}"#.to_string()); // Record this call in tracking tracking.record_call( tool_call.id.clone(), tool_call.function.name.clone(), args_str.to_string(), output_str.clone(), !is_error, if is_error { Some(output_str.clone()) } else { None }, ); results.push(ToolResult { call_id: tool_call.id.clone(), tool_name: tool_call.function.name.clone(), output, is_error, }); } Err(e) => { warn!( tool_name = %tool_call.function.name, call_id = %tool_call.id, error = %e, "Tool execution failed" ); let error_msg = format!("Tool execution failed: {}", e); let error_output = json!({ "error": error_msg.clone() }); let error_output_str = to_string(&error_output) .unwrap_or_else(|_| format!(r#"{{"error": "{}"}}"#, error_msg)); // Record failed call in tracking tracking.record_call( tool_call.id.clone(), tool_call.function.name.clone(), args_str.to_string(), error_output_str.clone(), false, Some(error_msg), ); // Return error result to model (let it handle gracefully) results.push(ToolResult { call_id: tool_call.id.clone(), tool_name: tool_call.function.name.clone(), output: error_output, is_error: true, }); } } } Ok(results) } /// Build next request with tool results appended to history /// /// Constructs a new ResponsesRequest with: /// 1. Original input items (preserved) /// 2. Assistant message with analysis (reasoning) + partial_text + tool_calls /// 3. Tool result messages for each tool execution /// /// # Arguments /// /// * `request` - Current request (contains original input) /// * `tool_calls` - Tool calls from commentary channel /// * `tool_results` - Results from MCP tool execution /// * `analysis` - Analysis channel content (becomes reasoning content) /// * `partial_text` - Final channel content (becomes message content) /// /// # Returns /// /// New ResponsesRequest with appended history fn build_next_request_with_tools( mut request: ResponsesRequest, tool_calls: Vec, tool_results: Vec, analysis: Option, partial_text: String, ) -> Result> { // Get current input items (or empty vec if Text variant) let mut items = match request.input { ResponseInput::Items(items) => items, ResponseInput::Text(text) => { // Convert text to items format vec![ResponseInputOutputItem::SimpleInputMessage { content: StringOrContentParts::String(text), role: "user".to_string(), r#type: None, }] } }; // Build assistant response item with reasoning + content + tool calls // This represents what the model generated in this iteration let assistant_id = format!("msg_{}", Uuid::new_v4()); // Add reasoning if present (from analysis channel) if let Some(analysis_text) = analysis { items.push(ResponseInputOutputItem::Reasoning { id: format!("reasoning_{}", assistant_id), summary: vec![], content: vec![ResponseReasoningContent::ReasoningText { text: analysis_text, }], status: Some("completed".to_string()), }); } // Add message content if present (from final channel) if !partial_text.is_empty() { items.push(ResponseInputOutputItem::Message { id: assistant_id.clone(), role: "assistant".to_string(), content: vec![ResponseContentPart::OutputText { text: partial_text, annotations: vec![], logprobs: None, }], status: Some("completed".to_string()), }); } // Add function tool calls (from commentary channel) for tool_call in tool_calls { items.push(ResponseInputOutputItem::FunctionToolCall { id: tool_call.id.clone(), call_id: tool_call.id.clone(), name: tool_call.function.name.clone(), arguments: tool_call .function .arguments .unwrap_or_else(|| "{}".to_string()), output: None, // Output will be added next status: Some("in_progress".to_string()), }); } // Add tool results for tool_result in tool_results { // Serialize tool output to string let output_str = to_string(&tool_result.output).unwrap_or_else(|e| { format!("{{\"error\": \"Failed to serialize tool output: {}\"}}", e) }); // Update the corresponding tool call with output and completed status // Find and update the matching FunctionToolCall if let Some(ResponseInputOutputItem::FunctionToolCall { output, status, .. }) = items .iter_mut() .find(|item| matches!(item, ResponseInputOutputItem::FunctionToolCall { id, .. } if id == &tool_result.call_id)) { *output = Some(output_str); *status = if tool_result.is_error { Some("failed".to_string()) } else { Some("completed".to_string()) }; } } // Update request with new items request.input = ResponseInput::Items(items); Ok(request) } /// Tool execution result /// /// Contains the result of executing a single MCP tool. struct ToolResult { /// Tool call ID (for matching with request) call_id: String, /// Tool name #[allow(dead_code)] // Kept for documentation and future use tool_name: String, /// Tool output (JSON value) output: Value, /// Whether this is an error result is_error: bool, } /// Convert MCP tools to Responses API tool format /// /// Converts MCP Tool entries (from rmcp SDK) to ResponseTool format so the model /// knows about available MCP tools when making tool calls. /// /// # Arguments /// /// * `mcp_tools` - MCP tools from the MCP manager inventory (rmcp::model::Tool) /// /// # Returns /// /// Vector of ResponseTool entries in MCP format pub fn convert_mcp_tools_to_response_tools(mcp_tools: &[mcp::Tool]) -> Vec { mcp_tools .iter() .map(|tool_info| ResponseTool { r#type: ResponseToolType::Mcp, function: Some(Function { name: tool_info.name.to_string(), description: tool_info.description.as_ref().map(|d| d.to_string()), parameters: Value::Object((*tool_info.input_schema).clone()), strict: None, }), server_url: None, // MCP tools from inventory don't have individual server URLs authorization: None, server_label: None, server_description: tool_info.description.as_ref().map(|d| d.to_string()), require_approval: None, allowed_tools: None, }) .collect() } /// Inject MCP metadata into final response /// /// Adds mcp_list_tools and mcp_call output items to the response output array. /// Following non-Harmony pipeline pattern: /// 1. Prepend mcp_list_tools at the beginning /// 2. Append all mcp_call items at the end /// /// # Arguments /// /// * `response` - Final response to modify /// * `tracking` - MCP call tracking data /// * `mcp_manager` - MCP manager for listing tools fn inject_mcp_metadata( response: &mut ResponsesResponse, tracking: &McpCallTracking, mcp_manager: &Arc, ) { // Build mcp_list_tools item let tools = mcp_manager.list_tools(); let tools_info: Vec = tools .iter() .map(|t| McpToolInfo { name: t.name.to_string(), description: t.description.as_ref().map(|d| d.to_string()), input_schema: Value::Object((*t.input_schema).clone()), annotations: Some(json!({ "read_only": false })), }) .collect(); let mcp_list_tools = ResponseOutputItem::McpListTools { id: format!("mcpl_{}", Uuid::new_v4()), server_label: tracking.server_label.clone(), tools: tools_info, }; // Build mcp_call items for each tracked call let mcp_call_items: Vec = tracking .tool_calls .iter() .map(|record| ResponseOutputItem::McpCall { id: format!("mcp_{}", Uuid::new_v4()), status: if record.success { "completed" } else { "failed" } .to_string(), approval_request_id: None, arguments: record.arguments.clone(), error: record.error.clone(), name: record.tool_name.clone(), output: record.output.clone(), server_label: tracking.server_label.clone(), }) .collect(); // Inject into response output: // 1. Prepend mcp_list_tools at the beginning response.output.insert(0, mcp_list_tools); // 2. Append all mcp_call items at the end response.output.extend(mcp_call_items); } /// Load previous conversation messages from storage /// /// If the request has `previous_response_id`, loads the response chain from storage /// and prepends the conversation history to the request input items. /// /// # Arguments /// /// * `ctx` - Harmony Responses context with response_storage /// * `request` - Current request (may have previous_response_id set) /// /// # Returns /// /// Modified request with conversation history prepended to input items async fn load_previous_messages( ctx: &HarmonyResponsesContext, request: ResponsesRequest, ) -> Result { let Some(ref prev_id_str) = request.previous_response_id else { // No previous_response_id, return request as-is return Ok(request); }; let prev_id = ResponseId::from(prev_id_str.as_str()); // Load response chain from storage let chain = ctx .response_storage .get_response_chain(&prev_id, None) .await .map_err(|e| { error::internal_error(format!( "Failed to load previous response chain for {}: {}", prev_id_str, e )) })?; // Build conversation history from stored responses let mut history_items = Vec::new(); // Helper to deserialize and collect items from a JSON array let deserialize_items = |arr: &Value, item_type: &str| -> Vec { arr.as_array() .into_iter() .flat_map(|items| items.iter()) .filter_map(|item| { from_value::(item.clone()) .map_err(|e| { warn!( "Failed to deserialize stored {} item: {}. Item: {}", item_type, e, item ); }) .ok() }) .collect() }; for stored in chain.responses.iter() { history_items.extend(deserialize_items(&stored.input, "input")); history_items.extend(deserialize_items(&stored.output, "output")); } debug!( previous_response_id = %prev_id_str, history_items_count = history_items.len(), "Loaded conversation history from previous response" ); // Build modified request with history prepended let mut modified_request = request; // Convert current input to items format let all_items = match modified_request.input { ResponseInput::Items(items) => { // Prepend history to existing items let mut combined = history_items; combined.extend(items); combined } ResponseInput::Text(text) => { // Convert text to item and prepend history history_items.push(ResponseInputOutputItem::SimpleInputMessage { content: StringOrContentParts::String(text), role: "user".to_string(), r#type: None, }); history_items } }; // Update request with combined items and clear previous_response_id modified_request.input = ResponseInput::Items(all_items); modified_request.previous_response_id = None; Ok(modified_request) }