//! Request context types for gRPC router pipeline //! //! This module provides the core context types that flow through the router pipeline, //! eliminating deep parameter passing chains and providing a single source of truth //! for request state. use std::{collections::HashMap, sync::Arc}; use axum::http::HeaderMap; use serde_json::Value; use crate::{ core::Worker, grpc_client::{proto, sglang_scheduler::AbortOnDropStream, SglangSchedulerClient}, protocols::{ chat::{ChatCompletionRequest, ChatCompletionResponse}, generate::{GenerateRequest, GenerateResponse}, responses::ResponsesRequest, }, reasoning_parser::ParserFactory as ReasoningParserFactory, tokenizer::{stop::StopSequenceDecoder, traits::Tokenizer}, tool_parser::ParserFactory as ToolParserFactory, }; /// Main request processing context /// /// This is the single source of truth for all request state as it flows /// through the pipeline stages. Uses Rust's type system to enforce proper /// stage ordering at compile time. pub struct RequestContext { pub input: RequestInput, pub components: Arc, pub state: ProcessingState, } /// Immutable request input pub struct RequestInput { pub request_type: RequestType, pub headers: Option, pub model_id: Option, } /// Request type variants /// Using Arc instead of Box to enable cheap cloning for background tasks pub enum RequestType { Chat(Arc), Generate(Arc), Responses(Arc), } /// Shared components (injected once at creation) pub struct SharedComponents { pub tokenizer: Arc, pub tool_parser_factory: ToolParserFactory, pub reasoning_parser_factory: ReasoningParserFactory, } /// Mutable processing state (evolves through pipeline stages) #[derive(Default)] pub struct ProcessingState { // Stage 1: Preparation outputs pub preparation: Option, // Stage 2: Worker selection outputs pub workers: Option, // Stage 3: Client acquisition outputs pub clients: Option, // Stage 4: Request building outputs pub proto_request: Option, // Stage 5: Dispatch metadata pub dispatch: Option, // Stage 6: Response processing state pub response: ResponseState, } /// Output from preparation stage (Step 1) pub struct PreparationOutput { /// Original text (for chat) or resolved text (for generate) pub original_text: Option, /// Tokenized input pub token_ids: Vec, /// Processed messages (chat only) pub processed_messages: Option, /// Tool call constraints (if applicable) pub tool_constraints: Option<(String, String)>, /// Filtered request (if tools were filtered) pub filtered_request: Option, // Harmony-specific fields /// Whether this is a Harmony request (default: false) pub harmony_mode: bool, /// Selection text for worker routing (Harmony only) pub selection_text: Option, /// Harmony messages for history tracking (Harmony only) pub harmony_messages: Option>, /// Stop token IDs for Harmony models pub harmony_stop_ids: Option>, } /// Worker selection (Step 2) pub enum WorkerSelection { Single { worker: Arc, }, Dual { prefill: Arc, decode: Arc, }, } /// Client selection (Step 3) pub enum ClientSelection { Single { client: SglangSchedulerClient, }, Dual { prefill: SglangSchedulerClient, decode: SglangSchedulerClient, }, } /// Dispatch metadata (Step 5) #[derive(Clone)] pub struct DispatchMetadata { pub request_id: String, pub model: String, pub created: u64, pub weight_version: Option, pub is_streaming: bool, } /// Response processing state (Step 6) #[derive(Default)] pub struct ResponseState { /// Stop sequence decoder pub stop_decoder: Option, /// Per-index streaming state (for n>1 support) pub streaming: StreamingState, /// Collected responses (non-streaming) pub collected: Option>, /// Execution result (streams from workers) pub execution_result: Option, /// Final processed response pub final_response: Option, /// Responses API iteration result (Harmony only, for tool loop orchestration) pub responses_iteration_result: Option, // Harmony-specific parser state /// Harmony parser for non-streaming (single parser for all indices) pub harmony_parser: Option, /// Harmony parsers for streaming (one per index for n>1 support) pub harmony_parser_per_index: Option>, } /// Streaming state (per-choice tracking) #[derive(Default)] pub struct StreamingState { pub is_firsts: HashMap, pub stream_buffers: HashMap, pub finish_reasons: HashMap, pub matched_stops: HashMap>, pub prompt_tokens: HashMap, pub completion_tokens: HashMap, pub cached_tokens: HashMap, // Parser state (lazy initialization per index) pub reasoning_parsers: HashMap>>>, pub tool_parsers: HashMap>>>, pub has_tool_calls: HashMap, } impl RequestContext { /// Create context for chat completion request pub fn for_chat( request: Arc, headers: Option, model_id: Option, components: Arc, ) -> Self { Self { input: RequestInput { request_type: RequestType::Chat(request), headers, model_id, }, components, state: ProcessingState::default(), } } /// Create context for generate request pub fn for_generate( request: Arc, headers: Option, model_id: Option, components: Arc, ) -> Self { Self { input: RequestInput { request_type: RequestType::Generate(request), headers, model_id, }, components, state: ProcessingState::default(), } } /// Create context for Responses API request pub fn for_responses( request: Arc, headers: Option, model_id: Option, components: Arc, ) -> Self { Self { input: RequestInput { request_type: RequestType::Responses(request), headers, model_id, }, components, state: ProcessingState::default(), } } /// Get reference to original request (type-safe) pub fn request(&self) -> &RequestType { &self.input.request_type } /// Get chat request (panics if not chat) pub fn chat_request(&self) -> &ChatCompletionRequest { match &self.input.request_type { RequestType::Chat(req) => req.as_ref(), _ => panic!("Expected chat request"), } } /// Get Arc clone of chat request (panics if not chat) pub fn chat_request_arc(&self) -> Arc { match &self.input.request_type { RequestType::Chat(req) => Arc::clone(req), _ => panic!("Expected chat request"), } } /// Get generate request (panics if not generate) pub fn generate_request(&self) -> &GenerateRequest { match &self.input.request_type { RequestType::Generate(req) => req.as_ref(), _ => panic!("Expected generate request"), } } /// Get Arc clone of generate request (panics if not generate) pub fn generate_request_arc(&self) -> Arc { match &self.input.request_type { RequestType::Generate(req) => Arc::clone(req), _ => panic!("Expected generate request"), } } /// Get responses request (panics if not responses) pub fn responses_request(&self) -> &ResponsesRequest { match &self.input.request_type { RequestType::Responses(req) => req.as_ref(), _ => panic!("Expected responses request"), } } /// Get Arc clone of responses request (panics if not responses) pub fn responses_request_arc(&self) -> Arc { match &self.input.request_type { RequestType::Responses(req) => Arc::clone(req), _ => panic!("Expected responses request"), } } /// Check if request is streaming pub fn is_streaming(&self) -> bool { match &self.input.request_type { RequestType::Chat(req) => req.stream, RequestType::Generate(req) => req.stream, RequestType::Responses(req) => req.stream.unwrap_or(false), } } } impl WorkerSelection { pub fn is_dual(&self) -> bool { matches!(self, Self::Dual { .. }) } pub fn single(&self) -> Option<&Arc> { match self { Self::Single { worker } => Some(worker), _ => None, } } #[allow(clippy::type_complexity)] pub fn dual(&self) -> Option<(&Arc, &Arc)> { match self { Self::Dual { prefill, decode } => Some((prefill, decode)), _ => None, } } pub fn prefill_worker(&self) -> Option<&Arc> { match self { Self::Dual { prefill, .. } => Some(prefill), _ => None, } } pub fn decode_worker(&self) -> Option<&Arc> { match self { Self::Dual { decode, .. } => Some(decode), _ => None, } } } impl ClientSelection { pub fn is_dual(&self) -> bool { matches!(self, Self::Dual { .. }) } pub fn single(&self) -> Option<&SglangSchedulerClient> { match self { Self::Single { client } => Some(client), _ => None, } } pub fn single_mut(&mut self) -> Option<&mut SglangSchedulerClient> { match self { Self::Single { client } => Some(client), _ => None, } } pub fn dual(&self) -> Option<(&SglangSchedulerClient, &SglangSchedulerClient)> { match self { Self::Dual { prefill, decode } => Some((prefill, decode)), _ => None, } } pub fn dual_mut(&mut self) -> Option<(&mut SglangSchedulerClient, &mut SglangSchedulerClient)> { match self { Self::Dual { prefill, decode } => Some((prefill, decode)), _ => None, } } pub fn prefill_client(&self) -> Option<&SglangSchedulerClient> { match self { Self::Dual { prefill, .. } => Some(prefill), _ => None, } } pub fn prefill_client_mut(&mut self) -> Option<&mut SglangSchedulerClient> { match self { Self::Dual { prefill, .. } => Some(prefill), _ => None, } } pub fn decode_client(&self) -> Option<&SglangSchedulerClient> { match self { Self::Dual { decode, .. } => Some(decode), _ => None, } } pub fn decode_client_mut(&mut self) -> Option<&mut SglangSchedulerClient> { match self { Self::Dual { decode, .. } => Some(decode), _ => None, } } } /// Result of request execution (streams from workers) /// Uses AbortOnDropStream to automatically abort on cancellation pub enum ExecutionResult { Single { stream: AbortOnDropStream, }, Dual { prefill: AbortOnDropStream, decode: Box, }, } /// Final processed response pub enum FinalResponse { Chat(ChatCompletionResponse), /// Generate response is a Vec of GenerateResponse (n=1 returns single item, n>1 returns multiple) Generate(Vec), }