Unverified Commit b07c9c76 authored by Chang Su's avatar Chang Su Committed by GitHub
Browse files

[router][grpc] Refine streaming processes (#11277)

parent 748f86f3
...@@ -187,6 +187,10 @@ impl ReasoningParser for BaseReasoningParser { ...@@ -187,6 +187,10 @@ impl ReasoningParser for BaseReasoningParser {
fn model_type(&self) -> &str { fn model_type(&self) -> &str {
&self.model_type &self.model_type
} }
fn is_in_reasoning(&self) -> bool {
self.in_reasoning
}
} }
#[cfg(test)] #[cfg(test)]
......
...@@ -55,6 +55,10 @@ impl ReasoningParser for DeepSeekR1Parser { ...@@ -55,6 +55,10 @@ impl ReasoningParser for DeepSeekR1Parser {
fn model_type(&self) -> &str { fn model_type(&self) -> &str {
self.base.model_type() self.base.model_type()
} }
fn is_in_reasoning(&self) -> bool {
self.base.is_in_reasoning()
}
} }
#[cfg(test)] #[cfg(test)]
......
...@@ -54,6 +54,10 @@ impl ReasoningParser for Glm45Parser { ...@@ -54,6 +54,10 @@ impl ReasoningParser for Glm45Parser {
fn model_type(&self) -> &str { fn model_type(&self) -> &str {
self.base.model_type() self.base.model_type()
} }
fn is_in_reasoning(&self) -> bool {
self.base.is_in_reasoning()
}
} }
#[cfg(test)] #[cfg(test)]
......
...@@ -54,6 +54,10 @@ impl ReasoningParser for KimiParser { ...@@ -54,6 +54,10 @@ impl ReasoningParser for KimiParser {
fn model_type(&self) -> &str { fn model_type(&self) -> &str {
self.base.model_type() self.base.model_type()
} }
fn is_in_reasoning(&self) -> bool {
self.base.is_in_reasoning()
}
} }
#[cfg(test)] #[cfg(test)]
......
...@@ -55,6 +55,10 @@ impl ReasoningParser for Qwen3Parser { ...@@ -55,6 +55,10 @@ impl ReasoningParser for Qwen3Parser {
fn model_type(&self) -> &str { fn model_type(&self) -> &str {
self.base.model_type() self.base.model_type()
} }
fn is_in_reasoning(&self) -> bool {
self.base.is_in_reasoning()
}
} }
/// QwenThinking parser - variant that assumes reasoning from start. /// QwenThinking parser - variant that assumes reasoning from start.
...@@ -106,6 +110,10 @@ impl ReasoningParser for QwenThinkingParser { ...@@ -106,6 +110,10 @@ impl ReasoningParser for QwenThinkingParser {
fn model_type(&self) -> &str { fn model_type(&self) -> &str {
self.base.model_type() self.base.model_type()
} }
fn is_in_reasoning(&self) -> bool {
self.base.is_in_reasoning()
}
} }
#[cfg(test)] #[cfg(test)]
......
...@@ -54,6 +54,10 @@ impl ReasoningParser for Step3Parser { ...@@ -54,6 +54,10 @@ impl ReasoningParser for Step3Parser {
fn model_type(&self) -> &str { fn model_type(&self) -> &str {
self.base.model_type() self.base.model_type()
} }
fn is_in_reasoning(&self) -> bool {
self.base.is_in_reasoning()
}
} }
#[cfg(test)] #[cfg(test)]
......
...@@ -69,6 +69,11 @@ pub trait ReasoningParser: Send + Sync { ...@@ -69,6 +69,11 @@ pub trait ReasoningParser: Send + Sync {
/// Get the model type this parser is designed for. /// Get the model type this parser is designed for.
fn model_type(&self) -> &str; fn model_type(&self) -> &str;
/// Check if the parser is currently in reasoning mode.
///
/// Returns true if the parser is currently parsing reasoning content.
fn is_in_reasoning(&self) -> bool;
} }
/// Error types for reasoning parsing operations. /// Error types for reasoning parsing operations.
......
...@@ -922,8 +922,9 @@ impl GrpcPDRouter { ...@@ -922,8 +922,9 @@ impl GrpcPDRouter {
stream_buffer.push_str(&delta); stream_buffer.push_str(&delta);
// Reasoning content handling // Reasoning content handling
if separate_reasoning { let in_reasoning = if separate_reasoning {
let (normal_text, reasoning_chunk) = router.process_reasoning_stream( let (normal_text, reasoning_chunk, in_reasoning) = router
.process_reasoning_stream(
&delta, &delta,
index, index,
&mut reasoning_parsers, &mut reasoning_parsers,
...@@ -936,13 +937,16 @@ impl GrpcPDRouter { ...@@ -936,13 +937,16 @@ impl GrpcPDRouter {
.map_err(|_| "Failed to send reasoning chunk".to_string())?; .map_err(|_| "Failed to send reasoning chunk".to_string())?;
} }
delta = normal_text; delta = normal_text;
} in_reasoning
} else {
false
};
// Tool call handling // Tool call handling
let tool_choice_enabled = let tool_choice_enabled =
!matches!(tool_choice, Some(ToolChoice::Value(ToolChoiceValue::None))); !matches!(tool_choice, Some(ToolChoice::Value(ToolChoiceValue::None)));
if tool_choice_enabled && tools.is_some() { if !in_reasoning && tool_choice_enabled && tools.is_some() {
let (should_skip, tool_chunks) = router let (should_skip, tool_chunks) = router
.process_tool_calls_stream( .process_tool_calls_stream(
&delta, &delta,
...@@ -1173,16 +1177,18 @@ impl GrpcPDRouter { ...@@ -1173,16 +1177,18 @@ impl GrpcPDRouter {
request_id: &str, request_id: &str,
model: &str, model: &str,
created: u64, created: u64,
) -> (String, Option<ChatCompletionStreamResponse>) { ) -> (String, Option<ChatCompletionStreamResponse>, bool) {
// Get or create parser for this index // Get or create parser for this index
reasoning_parsers reasoning_parsers
.entry(index) .entry(index)
.or_insert_with(|| self.reasoning_parser_factory.get_pooled(model)); .or_insert_with(|| self.reasoning_parser_factory.get_pooled(model));
if let Some(pooled_parser) = reasoning_parsers.get(&index) { if let Some(pooled_parser) = reasoning_parsers.get(&index) {
let parse_result = { let (parse_result, in_reasoning) = {
let mut parser = pooled_parser.lock().unwrap(); let mut parser = pooled_parser.lock().unwrap();
parser.parse_reasoning_streaming_incremental(delta) let result = parser.parse_reasoning_streaming_incremental(delta);
let in_reasoning = parser.is_in_reasoning();
(result, in_reasoning)
}; };
match parse_result { match parse_result {
...@@ -1214,7 +1220,7 @@ impl GrpcPDRouter { ...@@ -1214,7 +1220,7 @@ impl GrpcPDRouter {
} else { } else {
None None
}; };
return (normal_text, chunk); return (normal_text, chunk, in_reasoning);
} }
Err(e) => { Err(e) => {
warn!("Reasoning parsing error: {}", e); warn!("Reasoning parsing error: {}", e);
...@@ -1222,7 +1228,7 @@ impl GrpcPDRouter { ...@@ -1222,7 +1228,7 @@ impl GrpcPDRouter {
} }
} }
(delta.to_string(), None) (delta.to_string(), None, false)
} }
/// Helper: Process tool calls in streaming mode /// Helper: Process tool calls in streaming mode
......
...@@ -494,16 +494,18 @@ impl GrpcRouter { ...@@ -494,16 +494,18 @@ impl GrpcRouter {
request_id: &str, request_id: &str,
model: &str, model: &str,
created: u64, created: u64,
) -> (String, Option<ChatCompletionStreamResponse>) { ) -> (String, Option<ChatCompletionStreamResponse>, bool) {
// Get or create parser for this index // Get or create parser for this index
reasoning_parsers reasoning_parsers
.entry(index) .entry(index)
.or_insert_with(|| self.reasoning_parser_factory.get_pooled(model)); .or_insert_with(|| self.reasoning_parser_factory.get_pooled(model));
if let Some(pooled_parser) = reasoning_parsers.get(&index) { if let Some(pooled_parser) = reasoning_parsers.get(&index) {
let parse_result = { let (parse_result, in_reasoning) = {
let mut parser = pooled_parser.lock().unwrap(); let mut parser = pooled_parser.lock().unwrap();
parser.parse_reasoning_streaming_incremental(delta) let result = parser.parse_reasoning_streaming_incremental(delta);
let in_reasoning = parser.is_in_reasoning();
(result, in_reasoning)
}; };
match parse_result { match parse_result {
...@@ -535,7 +537,7 @@ impl GrpcRouter { ...@@ -535,7 +537,7 @@ impl GrpcRouter {
} else { } else {
None None
}; };
return (normal_text, chunk); return (normal_text, chunk, in_reasoning);
} }
Err(e) => { Err(e) => {
warn!("Reasoning parsing error: {}", e); warn!("Reasoning parsing error: {}", e);
...@@ -543,7 +545,7 @@ impl GrpcRouter { ...@@ -543,7 +545,7 @@ impl GrpcRouter {
} }
} }
(delta.to_string(), None) (delta.to_string(), None, false)
} }
/// Helper: Process tool calls in streaming mode /// Helper: Process tool calls in streaming mode
...@@ -901,8 +903,9 @@ impl GrpcRouter { ...@@ -901,8 +903,9 @@ impl GrpcRouter {
stream_buffer.push_str(&delta); stream_buffer.push_str(&delta);
// Reasoning content handling // Reasoning content handling
if separate_reasoning { let in_reasoning = if separate_reasoning {
let (normal_text, reasoning_chunk) = router.process_reasoning_stream( let (normal_text, reasoning_chunk, in_reasoning) = router
.process_reasoning_stream(
&delta, &delta,
index, index,
&mut reasoning_parsers, &mut reasoning_parsers,
...@@ -915,13 +918,16 @@ impl GrpcRouter { ...@@ -915,13 +918,16 @@ impl GrpcRouter {
.map_err(|_| "Failed to send reasoning chunk".to_string())?; .map_err(|_| "Failed to send reasoning chunk".to_string())?;
} }
delta = normal_text; delta = normal_text;
} in_reasoning
} else {
false
};
// Tool call handling // Tool call handling
let tool_choice_enabled = let tool_choice_enabled =
!matches!(tool_choice, Some(ToolChoice::Value(ToolChoiceValue::None))); !matches!(tool_choice, Some(ToolChoice::Value(ToolChoiceValue::None)));
if tool_choice_enabled && tools.is_some() { if !in_reasoning && tool_choice_enabled && tools.is_some() {
let (should_skip, tool_chunks) = router let (should_skip, tool_chunks) = router
.process_tool_calls_stream( .process_tool_calls_stream(
&delta, &delta,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment