//! Pipeline orchestrator for gRPC router request processing //! //! This module defines the RequestPipeline orchestrator that coordinates //! the execution of pipeline stages from request preparation to response delivery. use std::sync::Arc; use axum::response::{IntoResponse, Response}; use tracing::error; use super::{ common::stages::*, context::*, error, harmony, regular::{processor, stages::*, streaming}, }; use crate::{ core::WorkerRegistry, policies::PolicyRegistry, protocols::{ chat::{ChatCompletionRequest, ChatCompletionResponse}, generate::GenerateRequest, }, reasoning_parser::ParserFactory as ReasoningParserFactory, tokenizer::traits::Tokenizer, tool_parser::ParserFactory as ToolParserFactory, }; /// Generic request pipeline for all request types /// /// Orchestrates all stages from request preparation to response delivery. /// Configured differently for regular vs PD mode. #[derive(Clone)] pub struct RequestPipeline { stages: Arc>>, } impl RequestPipeline { /// Create a regular (single-worker) pipeline pub fn new_regular( worker_registry: Arc, policy_registry: Arc, tokenizer: Arc, tool_parser_factory: ToolParserFactory, reasoning_parser_factory: ReasoningParserFactory, configured_tool_parser: Option, configured_reasoning_parser: Option, ) -> Self { let processor = processor::ResponseProcessor::new( tokenizer.clone(), tool_parser_factory.clone(), reasoning_parser_factory.clone(), configured_tool_parser.clone(), configured_reasoning_parser.clone(), ); let streaming_processor = Arc::new(streaming::StreamingProcessor::new( tokenizer, tool_parser_factory, reasoning_parser_factory, configured_tool_parser, configured_reasoning_parser, )); let stages: Vec> = vec![ Box::new(PreparationStage::new()), Box::new(WorkerSelectionStage::new( worker_registry, policy_registry, WorkerSelectionMode::Regular, )), Box::new(ClientAcquisitionStage), Box::new(RequestBuildingStage::new(false)), // No PD metadata Box::new(DispatchMetadataStage), Box::new(RequestExecutionStage::new(ExecutionMode::Single)), Box::new(ResponseProcessingStage::new(processor, streaming_processor)), ]; Self { stages: Arc::new(stages), } } /// Create a Harmony (single-worker) pipeline for Harmony-capable models pub fn new_harmony( worker_registry: Arc, policy_registry: Arc, _tokenizer: Arc, _tool_parser_factory: ToolParserFactory, _reasoning_parser_factory: ReasoningParserFactory, _configured_tool_parser: Option, _configured_reasoning_parser: Option, ) -> Self { let stages: Vec> = vec![ Box::new(harmony::stages::HarmonyPreparationStage::new()), Box::new(WorkerSelectionStage::new( worker_registry, policy_registry, WorkerSelectionMode::Regular, )), Box::new(ClientAcquisitionStage), Box::new(harmony::stages::HarmonyRequestBuildingStage::new(false)), Box::new(DispatchMetadataStage), Box::new(RequestExecutionStage::new(ExecutionMode::Single)), Box::new(harmony::stages::HarmonyResponseProcessingStage::new()), ]; Self { stages: Arc::new(stages), } } /// Create a Harmony PD (prefill-decode) pipeline pub fn new_harmony_pd( worker_registry: Arc, policy_registry: Arc, _tokenizer: Arc, _tool_parser_factory: ToolParserFactory, _reasoning_parser_factory: ReasoningParserFactory, _configured_tool_parser: Option, _configured_reasoning_parser: Option, ) -> Self { let stages: Vec> = vec![ Box::new(harmony::stages::HarmonyPreparationStage::new()), Box::new(WorkerSelectionStage::new( worker_registry, policy_registry, WorkerSelectionMode::PrefillDecode, )), Box::new(ClientAcquisitionStage), Box::new(harmony::stages::HarmonyRequestBuildingStage::new(true)), Box::new(DispatchMetadataStage), Box::new(RequestExecutionStage::new(ExecutionMode::DualDispatch)), Box::new(harmony::stages::HarmonyResponseProcessingStage::new()), ]; Self { stages: Arc::new(stages), } } /// Create a PD (prefill-decode) pipeline pub fn new_pd( worker_registry: Arc, policy_registry: Arc, tokenizer: Arc, tool_parser_factory: ToolParserFactory, reasoning_parser_factory: ReasoningParserFactory, configured_tool_parser: Option, configured_reasoning_parser: Option, ) -> Self { let processor = processor::ResponseProcessor::new( tokenizer.clone(), tool_parser_factory.clone(), reasoning_parser_factory.clone(), configured_tool_parser.clone(), configured_reasoning_parser.clone(), ); let streaming_processor = Arc::new(streaming::StreamingProcessor::new( tokenizer, tool_parser_factory, reasoning_parser_factory, configured_tool_parser, configured_reasoning_parser, )); let stages: Vec> = vec![ Box::new(PreparationStage::new()), Box::new(WorkerSelectionStage::new( worker_registry, policy_registry, WorkerSelectionMode::PrefillDecode, )), Box::new(ClientAcquisitionStage), Box::new(RequestBuildingStage::new(true)), // Inject PD metadata Box::new(DispatchMetadataStage), Box::new(RequestExecutionStage::new(ExecutionMode::DualDispatch)), Box::new(ResponseProcessingStage::new(processor, streaming_processor)), ]; Self { stages: Arc::new(stages), } } /// Execute the complete pipeline for a chat request pub async fn execute_chat( &self, request: Arc, headers: Option, model_id: Option, components: Arc, ) -> Response { let mut ctx = RequestContext::for_chat(request, headers, model_id, components); for (idx, stage) in self.stages.iter().enumerate() { match stage.execute(&mut ctx).await { Ok(Some(response)) => { // Stage completed successfully with a response (e.g., streaming) return response; } Ok(None) => { continue; } Err(response) => { // Error occurred error!( "Stage {} ({}) failed with status {}", idx + 1, stage.name(), response.status() ); return response; } } } match ctx.state.response.final_response { Some(FinalResponse::Chat(response)) => axum::Json(response).into_response(), Some(FinalResponse::Generate(_)) => { error::internal_error("Internal error: wrong response type") } None => error::internal_error("No response produced"), } } /// Execute the complete pipeline for a generate request pub async fn execute_generate( &self, request: Arc, headers: Option, model_id: Option, components: Arc, ) -> Response { let mut ctx = RequestContext::for_generate(request, headers, model_id, components); for (idx, stage) in self.stages.iter().enumerate() { match stage.execute(&mut ctx).await { Ok(Some(response)) => { // Stage completed successfully with a response (e.g., streaming) return response; } Ok(None) => { continue; } Err(response) => { // Error occurred error!( "Stage {} ({}) failed with status {}", idx + 1, stage.name(), response.status() ); return response; } } } match ctx.state.response.final_response { Some(FinalResponse::Generate(response)) => axum::Json(response).into_response(), Some(FinalResponse::Chat(_)) => { error::internal_error("Internal error: wrong response type") } None => error::internal_error("No response produced"), } } /// Execute chat pipeline for responses endpoint /// /// Used by ALL non-streaming /v1/responses requests. /// Uses the same 7 pipeline stages as execute_chat(), with two differences: /// 1. Returns Result for tool_loop composition /// 2. Disallows streaming (responses endpoint uses different SSE format) pub async fn execute_chat_for_responses( &self, request: Arc, headers: Option, model_id: Option, components: Arc, ) -> Result { let mut ctx = RequestContext::for_chat(request, headers, model_id, components); for (idx, stage) in self.stages.iter().enumerate() { match stage.execute(&mut ctx).await { Ok(Some(_response)) => { // Streaming not supported for responses sync mode return Err(error::bad_request( "Streaming is not supported in this context".to_string(), )); } Ok(None) => { continue; } Err(response) => { // Error occurred - return the response as-is to preserve HTTP status codes error!( "Stage {} ({}) failed with status {}", idx + 1, stage.name(), response.status() ); return Err(response); } } } match ctx.state.response.final_response { Some(FinalResponse::Chat(response)) => Ok(response), Some(FinalResponse::Generate(_)) => { Err(error::internal_error("Internal error: wrong response type")) } None => Err(error::internal_error("No response produced")), } } /// Execute Harmony Responses API request through all pipeline stages /// /// This method runs a single iteration of the Responses API request, /// returning either ToolCallsFound (continue serving) or Completed (final response). /// /// Called by harmony::responses::serve_harmony_responses() for each iteration. /// /// # Arguments /// /// * `request` - Responses API request /// * `ctx` - Harmony Responses context with MCP manager and components /// /// # Returns /// /// ResponsesIterationResult indicating whether to continue iteration or return pub async fn execute_harmony_responses( &self, request: &crate::protocols::responses::ResponsesRequest, harmony_ctx: &harmony::responses::HarmonyResponsesContext, ) -> Result { // Create RequestContext for this Responses request let mut ctx = RequestContext::for_responses( Arc::new(request.clone()), None, // No headers needed for internal pipeline execution None, // Model ID already set in request harmony_ctx.components.clone(), ); for (idx, stage) in self.stages.iter().enumerate() { match stage.execute(&mut ctx).await { Ok(Some(response)) => { // Stage returned early response (e.g., streaming) - not expected for Responses iteration error!( "Stage {} ({}) returned unexpected response during Responses iteration", idx + 1, stage.name() ); return Err(response); } Ok(None) => { continue; } Err(response) => { // Stage failed error!( "Stage {} ({}) failed with status {}", idx + 1, stage.name(), response.status() ); return Err(response); } } } // Extract ResponsesIterationResult from context // This should have been set by HarmonyResponseProcessingStage ctx.state .response .responses_iteration_result .take() .ok_or_else(|| { error::internal_error("No ResponsesIterationResult produced by pipeline") }) } /// Execute Harmony Responses pipeline iteration with streaming support /// /// This version executes the pipeline up to the dispatch stage and returns /// the raw ExecutionResult (with stream) for token-level streaming processing. pub async fn execute_harmony_responses_streaming( &self, request: &crate::protocols::responses::ResponsesRequest, harmony_ctx: &harmony::responses::HarmonyResponsesContext, ) -> Result { // Create RequestContext for this Responses request let mut ctx = RequestContext::for_responses( Arc::new(request.clone()), None, None, harmony_ctx.components.clone(), ); for (idx, stage) in self.stages.iter().enumerate() { match stage.execute(&mut ctx).await { Ok(Some(response)) => { error!( "Stage {} ({}) returned unexpected response during streaming Responses", idx + 1, stage.name() ); return Err(response); } Ok(None) => continue, Err(response) => { error!( "Stage {} ({}) failed with status {}", idx + 1, stage.name(), response.status() ); return Err(response); } } } // Extract execution_result (the raw stream from workers) ctx.state .response .execution_result .take() .ok_or_else(|| error::internal_error("No ExecutionResult produced by pipeline")) } }