"vllm/vscode:/vscode.git/clone" did not exist on "62de37a38ed4a3877f3b1607b7163135f7ab9e36"
Unverified Commit 5671d8e3 authored by Richard Huo's avatar Richard Huo Committed by GitHub
Browse files

fix(frontend): fix the rust frontend tool calling with guided decoding and...

fix(frontend): fix the rust frontend tool calling with guided decoding and some model parsers fixes (#8442)
parent b2aea8f3
...@@ -755,12 +755,38 @@ impl OpenAIPreprocessor { ...@@ -755,12 +755,38 @@ impl OpenAIPreprocessor {
where where
S: Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send + 'static, S: Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send + 'static,
{ {
// Try to parse reasoning content only if parser is configured // Tool-continuation turns (last message role=tool) gate the force-
// reasoning flag off: the model produces the final user-facing answer
// directly from the tool result and typically does not re-enter
// reasoning, so leaving the parser in forced-reasoning mode would
// mislabel the final answer as reasoning_content. Matches SGLang's
// observed behavior for Kimi K2.5 tool-result follow-ups.
let last_is_tool = matches!(
request.inner.messages.last(),
Some(ChatCompletionRequestMessage::Tool(_))
);
let prompt_injected_reasoning = prompt_injected_reasoning && !last_is_tool;
// tool_choice=required/named forces the backend into guided decoding,
// which constrains output to a bare JSON shape with no reasoning
// wrapper. Running the reasoning parser on that output is both
// pointless (nothing to extract) and actively harmful for parsers
// that inject a `<think>` prefix unconditionally (e.g. MiniMax
// append-think), because the prefix would contaminate the
// tool-call JSON fed into the jail.
let tool_choice_forces_guided_json = matches!(
request.inner.tool_choice,
Some(ChatCompletionToolChoiceOption::Required)
| Some(ChatCompletionToolChoiceOption::Named(_))
);
// Try to parse reasoning content only if parser is configured.
let should_parse_reasoning = self.runtime_config.reasoning_parser.is_some() let should_parse_reasoning = self.runtime_config.reasoning_parser.is_some()
&& !Self::is_reasoning_disabled_by_request( && !Self::is_reasoning_disabled_by_request(
self.runtime_config.reasoning_parser.as_deref(), self.runtime_config.reasoning_parser.as_deref(),
request.chat_template_args.as_ref(), request.chat_template_args.as_ref(),
); )
&& !tool_choice_forces_guided_json;
// Reasoning Content Parsing Transformation Step // Reasoning Content Parsing Transformation Step
// Current Solution: // Current Solution:
...@@ -1160,33 +1186,27 @@ impl OpenAIPreprocessor { ...@@ -1160,33 +1186,27 @@ impl OpenAIPreprocessor {
// Configure jail based on tool_choice // Configure jail based on tool_choice
// //
// When a tool_call_parser is configured, always use marker-based mode // For tool_choice=required or named we mirror SGLang / vLLM: assume the
// so that format-specific parsers (e.g. qwen3_coder XML) are invoked. // backend applied guided decoding and emit a bare JSON shape, so parse
// Immediate JSON mode is only a fallback for required/named when no // via the JSON array parser (base_json_parser) rather than the model's
// parser exists (the model is expected to emit raw JSON in that case). // native-format parser. If a parser is also configured we still carry
// it so the Immediate branch can fall back to marker-based parsing for
// backends that do not honor guided decoding (e.g. XML-native models
// like qwen3_coder — see regression test_tool_choice_required_with_
// qwen3_coder_parser).
match tool_choice { match tool_choice {
Some(ChatCompletionToolChoiceOption::Named(named)) => { Some(ChatCompletionToolChoiceOption::Named(named)) => {
if let Some(parser) = tool_call_parser {
// Parser-aware path: use marker-based jail so the parser
// handles format-specific output (XML, pythonic, etc.).
// Also install a named-tool filter so that if the model emits
// the wrong tool, the parsed call is rejected before emission.
builder = builder builder = builder
.tool_call_parser(parser) .tool_choice_named(named.function.name.clone())
.named_tool_filter(named.function.name.clone()); .named_tool_filter(named.function.name.clone());
} else { if let Some(parser) = tool_call_parser {
// No parser: fall back to Immediate JSON jail mode. builder = builder.tool_call_parser(parser);
builder = builder.tool_choice_named(named.function.name.clone());
} }
} }
Some(ChatCompletionToolChoiceOption::Required) => { Some(ChatCompletionToolChoiceOption::Required) => {
builder = builder.tool_choice_required();
if let Some(parser) = tool_call_parser { if let Some(parser) = tool_call_parser {
// Parser-aware path: use marker-based jail so the parser
// handles format-specific output (XML, pythonic, etc.).
builder = builder.tool_call_parser(parser); builder = builder.tool_call_parser(parser);
} else {
// No parser: fall back to Immediate JSON jail mode.
builder = builder.tool_choice_required();
} }
} }
Some(ChatCompletionToolChoiceOption::Auto) Some(ChatCompletionToolChoiceOption::Auto)
......
...@@ -7,6 +7,8 @@ use dynamo_protocols::types::{ ...@@ -7,6 +7,8 @@ use dynamo_protocols::types::{
ChatCompletionStreamResponseDelta, FinishReason, FunctionCallStream, FunctionType, Role, ChatCompletionStreamResponseDelta, FinishReason, FunctionCallStream, FunctionType, Role,
}; };
use dynamo_parsers::tool_calling::config::JsonParserConfig;
use dynamo_parsers::tool_calling::json::try_tool_call_parse_basic_json;
use dynamo_parsers::tool_calling::parsers::get_tool_parser_map; use dynamo_parsers::tool_calling::parsers::get_tool_parser_map;
use dynamo_parsers::tool_calling::{ use dynamo_parsers::tool_calling::{
detect_tool_call_start, find_tool_call_end_position, try_tool_call_parse_aggregate, detect_tool_call_start, find_tool_call_end_position, try_tool_call_parse_aggregate,
...@@ -643,6 +645,12 @@ impl JailedStream { ...@@ -643,6 +645,12 @@ impl JailedStream {
// Only filter out if this choice was ever jailed and lacks role // Only filter out if this choice was ever jailed and lacks role
// (to avoid aggregator issues with deltas missing role after unjail) // (to avoid aggregator issues with deltas missing role after unjail)
let choice_state = choice_states.get_or_create_state(choice.index, false); let choice_state = choice_states.get_or_create_state(choice.index, false);
// Also track stream finish reason from content-less final chunks
// (e.g. finish_reason=Stop arriving in a chunk with content=None) so
// the Immediate-mode finalize path can emit the correct finish_reason.
if choice.finish_reason.is_some() {
choice_state.stream_finish_reason = choice.finish_reason;
}
let was_ever_jailed = !choice_state.accumulated_content.is_empty() || choice_state.is_jailed; let was_ever_jailed = !choice_state.accumulated_content.is_empty() || choice_state.is_jailed;
let should_emit = choice.delta.role.is_some() let should_emit = choice.delta.role.is_some()
...@@ -997,9 +1005,113 @@ impl JailedStream { ...@@ -997,9 +1005,113 @@ impl JailedStream {
) )
} }
JailMode::Immediate { format } => { JailMode::Immediate { format } => {
// tool_choice mode: parse JSON and convert to tool calls // tool_choice=required/named path (SGLang/vLLM-style).
match self.parse_tool_choice_json(accumulated_content, format) { //
Ok(tool_call_chunks) if !tool_call_chunks.is_empty() => create_choice_stream( // Primary parser is try_tool_call_parse_basic_json (the
// base_json_parser) since guided decoding constrains output
// to a bare JSON shape. Fallbacks cover two edge cases:
//
// * Named tool_choice when the schema produces just the
// parameters object (no {name, parameters} wrapper) —
// handled by parse_tool_choice_json, which knows the
// target tool_name from ToolChoiceFormat::SingleObject.
//
// * Backends that do not honor guided decoding and emit
// the model's native format instead (e.g. qwen3_coder
// XML). In that case try_tool_call_parse_aggregate with
// the configured tool_call_parser recovers the call.
let mut tool_call_chunks: Vec<ChatCompletionMessageToolCallChunk> = Vec::new();
// 1. Primary: bare-JSON extraction — handles
// `[{name,parameters}, ...]`, `{name,parameters}`,
// `{name,arguments}`, and arrays of either.
let basic_json_cfg = JsonParserConfig {
bare_json_mode: true,
..Default::default()
};
// Per-path indices are placeholders — final indices are assigned
// below after the named filter so dropped entries don't leave
// gaps and multi-emission streams don't collide.
if let Ok((parsed, _)) = try_tool_call_parse_basic_json(
accumulated_content,
&basic_json_cfg,
self.tool_definitions.as_deref(),
) && !parsed.is_empty()
{
tool_call_chunks.extend(parsed.into_iter().map(|tc| {
ChatCompletionMessageToolCallChunk {
index: 0,
id: Some(tc.id),
r#type: Some(FunctionType::Function),
function: Some(FunctionCallStream {
name: Some(tc.function.name),
arguments: Some(tc.function.arguments),
}),
}
}));
}
// 2. Named-only fallback: output is just the parameters object
// (tool_name is supplied by SingleObject format).
if tool_call_chunks.is_empty()
&& let Ok(chunks) = self.parse_tool_choice_json(accumulated_content, format)
{
tool_call_chunks = chunks;
}
// 3. Marker-based fallback for backends that did not enforce
// guided decoding and emitted the model's native format.
if tool_call_chunks.is_empty()
&& self.tool_call_parser.is_some()
&& let Ok((tool_calls, _)) = try_tool_call_parse_aggregate(
accumulated_content,
self.tool_call_parser.as_deref(),
self.tool_definitions.as_deref(),
)
.await
{
tool_call_chunks.extend(tool_calls.into_iter().map(|tc| {
ChatCompletionMessageToolCallChunk {
index: 0,
id: Some(tc.id),
r#type: Some(FunctionType::Function),
function: Some(FunctionCallStream {
name: Some(tc.function.name),
arguments: Some(tc.function.arguments),
}),
}
}));
}
// Named filter: drop any parsed calls whose name doesn't match.
// Track whether the filter drained a non-empty list so we can
// suppress the content fallback below — otherwise the raw
// wrong-tool JSON would leak to the client as assistant text.
let mut filter_dropped_all = false;
if let Some(ref required_name) = self.named_tool_name {
let pre_filter_len = tool_call_chunks.len();
tool_call_chunks.retain(|tc| {
tc.function.as_ref().and_then(|f| f.name.as_deref())
== Some(required_name.as_str())
});
if pre_filter_len > 0 && tool_call_chunks.is_empty() {
filter_dropped_all = true;
tracing::warn!(
required = %required_name,
"tool_choice=named: parsers emitted no matching tool calls; dropping jail output"
);
}
}
// Assign final indices: renumber survivors 0..n (no gaps from
// the filter) then add the cumulative offset for consistency
// with the MarkerBased branch across multi-emission streams.
for (new_idx, chunk) in tool_call_chunks.iter_mut().enumerate() {
chunk.index = (tool_call_offset + new_idx) as u32;
}
if !tool_call_chunks.is_empty() {
create_choice_stream(
choice_index, choice_index,
Some(Role::Assistant), Some(Role::Assistant),
"", "",
...@@ -1007,9 +1119,21 @@ impl JailedStream { ...@@ -1007,9 +1119,21 @@ impl JailedStream {
base_choice.finish_reason, base_choice.finish_reason,
None, None,
base_choice.logprobs.clone(), base_choice.logprobs.clone(),
), )
Ok(_) | Err(_) => { } else if filter_dropped_all {
// Parsing failed, return as content // Named filter rejected every parsed call — do not leak
// the wrong-tool JSON back as content.
create_choice_stream(
choice_index,
Some(Role::Assistant),
"",
None,
base_choice.finish_reason,
base_choice.stop_reason.clone(),
base_choice.logprobs.clone(),
)
} else {
// All parsing paths failed — return accumulated content as text.
create_choice_stream( create_choice_stream(
choice_index, choice_index,
Some(Role::Assistant), Some(Role::Assistant),
...@@ -1023,7 +1147,6 @@ impl JailedStream { ...@@ -1023,7 +1147,6 @@ impl JailedStream {
} }
} }
} }
}
/// Helper to create a ChatCompletionMessageToolCallChunk /// Helper to create a ChatCompletionMessageToolCallChunk
fn create_tool_call_chunk( fn create_tool_call_chunk(
...@@ -1133,30 +1256,24 @@ impl JailedStream { ...@@ -1133,30 +1256,24 @@ impl JailedStream {
if finish == FinishReason::Stop { if finish == FinishReason::Stop {
let has_tool_calls = has_tool_calls_per_choice.get(&choice.index).copied().unwrap_or(false); let has_tool_calls = has_tool_calls_per_choice.get(&choice.index).copied().unwrap_or(false);
// OpenAI spec: whenever tool_calls were emitted on this
// choice, finish_reason MUST be "tool_calls" — regardless of
// whether tool_choice was "auto", "required", or a named
// function.
let _ = named_tool_active;
match &jail_mode { match &jail_mode {
JailMode::MarkerBased => { JailMode::MarkerBased => {
if has_tool_calls && !named_tool_active { if has_tool_calls {
choice.finish_reason = Some(FinishReason::ToolCalls); choice.finish_reason = Some(FinishReason::ToolCalls);
} }
// When named_tool_active, keep Stop (OpenAI spec for tool_choice=named)
} }
JailMode::Immediate { format } => { JailMode::Immediate { format: _ } => {
// tool_choice mode: apply specific finish_reason logic
match format {
ToolChoiceFormat::SingleObject { .. } => {
// Named tool choice: keep Stop
// (already Stop, no change needed)
}
ToolChoiceFormat::ArrayOfTools => {
// Required tool choice: change to ToolCalls
if has_tool_calls { if has_tool_calls {
choice.finish_reason = Some(FinishReason::ToolCalls); choice.finish_reason = Some(FinishReason::ToolCalls);
} }
} }
} }
} }
}
}
// Length and ContentFilter are preserved as-is // Length and ContentFilter are preserved as-is
} }
} }
......
...@@ -270,3 +270,337 @@ async fn postprocessor_parsing_stream_replays_interval_20_fixture() { ...@@ -270,3 +270,337 @@ async fn postprocessor_parsing_stream_replays_interval_20_fixture() {
); );
} }
} }
/// Construct a minimal stream chunk carrying `content` as a text delta.
fn mock_content_chunk(content: &str) -> NvCreateChatCompletionStreamResponse {
use dynamo_protocols::types::{
ChatChoiceStream, ChatCompletionStreamResponseDelta, CreateChatCompletionStreamResponse,
Role,
};
#[allow(deprecated)]
let choice = ChatChoiceStream {
index: 0,
delta: ChatCompletionStreamResponseDelta {
role: Some(Role::Assistant),
content: Some(ChatCompletionMessageContent::Text(content.to_string())),
tool_calls: None,
function_call: None,
refusal: None,
reasoning_content: None,
},
finish_reason: None,
stop_reason: None,
logprobs: None,
};
NvCreateChatCompletionStreamResponse {
inner: CreateChatCompletionStreamResponse {
id: "test-id".to_string(),
choices: vec![choice],
created: 0,
model: "test-model".to_string(),
system_fingerprint: None,
object: "chat.completion.chunk".to_string(),
usage: None,
service_tier: None,
},
nvext: None,
}
}
/// Construct a terminal `finish_reason=Stop` chunk with no content.
fn mock_final_chunk() -> NvCreateChatCompletionStreamResponse {
use dynamo_protocols::types::{
ChatChoiceStream, ChatCompletionStreamResponseDelta, CreateChatCompletionStreamResponse,
};
#[allow(deprecated)]
let choice = ChatChoiceStream {
index: 0,
delta: ChatCompletionStreamResponseDelta {
role: None,
content: None,
tool_calls: None,
function_call: None,
refusal: None,
reasoning_content: None,
},
finish_reason: Some(FinishReason::Stop),
stop_reason: None,
logprobs: None,
};
NvCreateChatCompletionStreamResponse {
inner: CreateChatCompletionStreamResponse {
id: "test-id".to_string(),
choices: vec![choice],
created: 0,
model: "test-model".to_string(),
system_fingerprint: None,
object: "chat.completion.chunk".to_string(),
usage: None,
service_tier: None,
},
nvext: None,
}
}
/// Regression: MiniMax + tool_choice=required + SGLang guided decoding.
///
/// The reasoning parser (minimax_append_think) synthesizes a `<think>` opener
/// on the first chunk, so without guardrails the constrained JSON tool-call
/// payload would be classified entirely as `reasoning_content` because the
/// constrained output never emits `</think>`. tool_choice=required/named
/// must therefore bypass the reasoning parser, letting the jail extract the
/// bare JSON array into structured tool_calls.
#[tokio::test]
async fn postprocessor_parsing_stream_minimax_required_bypasses_reasoning() {
let preprocessor = build_preprocessor(Some("minimax_append_think"), Some("minimax_m2"));
// Baseline request with tools, then force tool_choice=required.
let mut request: NvCreateChatCompletionRequest = serde_json::from_str(REQUEST_JSON).unwrap();
let tools: Vec<dynamo_protocols::types::ChatCompletionTool> =
serde_json::from_value(serde_json::json!([{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get the current weather for a location.",
"parameters": {
"type": "object",
"properties": {"location": {"type": "string"}},
"required": ["location"]
}
}
}]))
.unwrap();
request.inner.tools = Some(tools);
request.inner.tool_choice = Some(ChatCompletionToolChoiceOption::Required);
// Simulate SGLang guided-decoding output: bare JSON array, no markers.
let bare_json = r#"[{"name": "get_weather", "parameters": {"location": "San Francisco"}}]"#;
let input_chunks = vec![mock_content_chunk(bare_json), mock_final_chunk()];
let input_stream = stream::iter(input_chunks.into_iter().map(Annotated::from_data));
let output_stream = preprocessor
.postprocessor_parsing_stream(input_stream, &request, false)
.expect("postprocessor_parsing_stream should build");
let output_chunks: Vec<Annotated<NvCreateChatCompletionStreamResponse>> =
output_stream.collect().await;
let mut reasoning = String::new();
let mut content = String::new();
let mut merged_tool_calls: BTreeMap<u32, MergedToolCall> = BTreeMap::new();
let mut finish_reasons = Vec::new();
for output in &output_chunks {
let Some(data) = output.data.as_ref() else {
continue;
};
for choice in &data.inner.choices {
if let Some(r) = &choice.delta.reasoning_content {
reasoning.push_str(r);
}
if let Some(c) = &choice.delta.content {
content.push_str(get_text(c));
}
if let Some(tcs) = &choice.delta.tool_calls {
for tc in tcs {
merged_tool_calls
.entry(tc.index)
.or_default()
.merge_from(tc);
}
}
if let Some(fr) = choice.finish_reason {
finish_reasons.push(fr);
}
}
}
// The bare-JSON tool call must end up in tool_calls — not in reasoning_content.
assert!(
reasoning.is_empty(),
"reasoning_content must be empty when tool_choice=required forces bare JSON, got: {reasoning:?}"
);
assert!(
!content.contains("get_weather"),
"tool call JSON must not leak into content, got: {content:?}"
);
let tool_calls: Vec<MergedToolCall> = merged_tool_calls.values().cloned().collect();
assert_eq!(tool_calls.len(), 1, "expected one tool call");
assert_eq!(tool_calls[0].name.as_deref(), Some("get_weather"));
let args: Value = serde_json::from_str(&tool_calls[0].arguments).unwrap();
assert_eq!(args, serde_json::json!({"location": "San Francisco"}));
// tool_choice=required: finish_reason must be rewritten to ToolCalls.
assert!(
finish_reasons.contains(&FinishReason::ToolCalls),
"expected ToolCalls finish_reason, got: {finish_reasons:?}"
);
}
/// Regression: MiniMax + tool_choice=named + SGLang guided decoding.
/// Same constraint as the required variant, but OpenAI spec says named
/// keeps finish_reason=Stop.
#[tokio::test]
async fn postprocessor_parsing_stream_minimax_named_bypasses_reasoning() {
let preprocessor = build_preprocessor(Some("minimax_append_think"), Some("minimax_m2"));
let mut request: NvCreateChatCompletionRequest = serde_json::from_str(REQUEST_JSON).unwrap();
let tools: Vec<dynamo_protocols::types::ChatCompletionTool> =
serde_json::from_value(serde_json::json!([{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get the current weather for a location.",
"parameters": {
"type": "object",
"properties": {"location": {"type": "string"}},
"required": ["location"]
}
}
}]))
.unwrap();
request.inner.tools = Some(tools);
request.inner.tool_choice = Some(ChatCompletionToolChoiceOption::Named(
"get_weather".to_string().into(),
));
let bare_json = r#"[{"name": "get_weather", "parameters": {"location": "Tokyo"}}]"#;
let input_chunks = vec![mock_content_chunk(bare_json), mock_final_chunk()];
let input_stream = stream::iter(input_chunks.into_iter().map(Annotated::from_data));
let output_stream = preprocessor
.postprocessor_parsing_stream(input_stream, &request, false)
.expect("postprocessor_parsing_stream should build");
let output_chunks: Vec<Annotated<NvCreateChatCompletionStreamResponse>> =
output_stream.collect().await;
let mut reasoning = String::new();
let mut merged_tool_calls: BTreeMap<u32, MergedToolCall> = BTreeMap::new();
let mut finish_reasons = Vec::new();
for output in &output_chunks {
let Some(data) = output.data.as_ref() else {
continue;
};
for choice in &data.inner.choices {
if let Some(r) = &choice.delta.reasoning_content {
reasoning.push_str(r);
}
if let Some(tcs) = &choice.delta.tool_calls {
for tc in tcs {
merged_tool_calls
.entry(tc.index)
.or_default()
.merge_from(tc);
}
}
if let Some(fr) = choice.finish_reason {
finish_reasons.push(fr);
}
}
}
assert!(
reasoning.is_empty(),
"reasoning_content must be empty for tool_choice=named, got: {reasoning:?}"
);
let tool_calls: Vec<MergedToolCall> = merged_tool_calls.values().cloned().collect();
assert_eq!(tool_calls.len(), 1);
assert_eq!(tool_calls[0].name.as_deref(), Some("get_weather"));
// OpenAI spec: emitting tool_calls always rewrites finish_reason to ToolCalls,
// regardless of whether tool_choice was auto, required, or named.
assert!(
finish_reasons.contains(&FinishReason::ToolCalls),
"named tool_choice with emitted tool_calls should finish as ToolCalls, got: {finish_reasons:?}"
);
}
/// Regression: MiniMax + tool_choice=named + the SingleObject guided-decoding
/// schema (bare parameters, no `{name, parameters}` wrapper). Exercises the
/// `parse_tool_choice_json` fallback — if the reasoning parser weren't gated
/// off, the `<think>` prefix it unconditionally prepends would make the bare
/// JSON unparseable by that fallback, and the tool call would leak as content.
#[tokio::test]
async fn postprocessor_parsing_stream_minimax_named_bare_parameters() {
let preprocessor = build_preprocessor(Some("minimax_append_think"), Some("minimax_m2"));
let mut request: NvCreateChatCompletionRequest = serde_json::from_str(REQUEST_JSON).unwrap();
let tools: Vec<dynamo_protocols::types::ChatCompletionTool> =
serde_json::from_value(serde_json::json!([{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get the current weather for a location.",
"parameters": {
"type": "object",
"properties": {"location": {"type": "string"}},
"required": ["location"]
}
}
}]))
.unwrap();
request.inner.tools = Some(tools);
request.inner.tool_choice = Some(ChatCompletionToolChoiceOption::Named(
"get_weather".to_string().into(),
));
// SingleObject schema: just the parameters, no wrapper.
let bare_params = r#"{"location": "Paris", "unit": "celsius"}"#;
let input_chunks = vec![mock_content_chunk(bare_params), mock_final_chunk()];
let input_stream = stream::iter(input_chunks.into_iter().map(Annotated::from_data));
let output_stream = preprocessor
.postprocessor_parsing_stream(input_stream, &request, false)
.expect("postprocessor_parsing_stream should build");
let output_chunks: Vec<Annotated<NvCreateChatCompletionStreamResponse>> =
output_stream.collect().await;
let mut reasoning = String::new();
let mut content = String::new();
let mut merged_tool_calls: BTreeMap<u32, MergedToolCall> = BTreeMap::new();
for output in &output_chunks {
let Some(data) = output.data.as_ref() else {
continue;
};
for choice in &data.inner.choices {
if let Some(r) = &choice.delta.reasoning_content {
reasoning.push_str(r);
}
if let Some(c) = &choice.delta.content {
content.push_str(get_text(c));
}
if let Some(tcs) = &choice.delta.tool_calls {
for tc in tcs {
merged_tool_calls
.entry(tc.index)
.or_default()
.merge_from(tc);
}
}
}
}
assert!(
reasoning.is_empty(),
"reasoning_content must be empty (parser must be gated off), got: {reasoning:?}"
);
assert!(
!content.contains("<think>"),
"no <think> prefix should reach the client, got: {content:?}"
);
let tool_calls: Vec<MergedToolCall> = merged_tool_calls.values().cloned().collect();
assert_eq!(tool_calls.len(), 1, "expected one tool call");
assert_eq!(tool_calls[0].name.as_deref(), Some("get_weather"));
let args: Value = serde_json::from_str(&tool_calls[0].arguments).unwrap();
assert_eq!(
args,
serde_json::json!({"location": "Paris", "unit": "celsius"})
);
}
...@@ -3343,7 +3343,8 @@ fahrenheit ...@@ -3343,7 +3343,8 @@ fahrenheit
} }
} }
// Verify finish_reason is Stop (not ToolCalls) for named tool choice // OpenAI spec: whenever tool_calls are emitted, finish_reason must be
// ToolCalls — regardless of whether tool_choice was auto, required, or named.
let finish_reasons: Vec<_> = results let finish_reasons: Vec<_> = results
.iter() .iter()
.filter_map(|r| { .filter_map(|r| {
...@@ -3353,10 +3354,302 @@ fahrenheit ...@@ -3353,10 +3354,302 @@ fahrenheit
}) })
.collect(); .collect();
// For tool_choice=named, finish_reason should be Stop (OpenAI spec)
assert!( assert!(
finish_reasons.contains(&FinishReason::Stop), finish_reasons.contains(&FinishReason::ToolCalls),
"tool_choice=named should have Stop finish reason" "tool_choice=named with emitted tool_calls should have ToolCalls finish reason, got {:?}",
finish_reasons
);
}
/// tool_choice=required + parser configured + backend applied guided
/// decoding → model emits a bare JSON array of tool calls.
///
/// This is the minimax/SGLang-after-PR-#6620 regression: previously we
/// fell through to the marker-based parser (looking for `<minimax:
/// tool_call>` etc.) which cannot parse unmarked JSON, so tool_calls
/// were empty and the JSON leaked into content/reasoning_content.
/// The Immediate branch now routes through base_json_parser first.
#[tokio::test]
async fn test_tool_choice_required_with_parser_bare_json() {
let bare_json = r#"[{"name":"get_weather","parameters":{"location":"San Francisco"}}]"#;
let input_chunks = vec![
test_utils::create_mock_response_chunk(bare_json.to_string(), 0),
test_utils::create_final_response_chunk(0),
];
let results: Vec<_> = OpenAIPreprocessor::apply_tool_calling_jail(
Some("minimax_m2".to_string()),
Some(ChatCompletionToolChoiceOption::Required),
None,
stream::iter(input_chunks),
)
.collect()
.await;
let tool_calls: Vec<_> = results
.iter()
.flat_map(|r| {
r.data
.as_ref()
.into_iter()
.flat_map(|d| d.inner.choices.iter())
.flat_map(|c| c.delta.tool_calls.iter().flatten())
})
.cloned()
.collect();
assert_eq!(
tool_calls.len(),
1,
"bare JSON array must be parsed by base_json_parser even when parser is set"
);
assert_eq!(
tool_calls[0].function.as_ref().unwrap().name.as_deref(),
Some("get_weather")
);
// finish_reason should be rewritten to ToolCalls (required path).
let finish_reasons: Vec<_> = results
.iter()
.filter_map(|r| {
r.data
.as_ref()
.and_then(|d| d.inner.choices.first().and_then(|c| c.finish_reason))
})
.collect();
assert!(
finish_reasons.contains(&FinishReason::ToolCalls),
"tool_choice=required with tool_calls emitted should have ToolCalls finish_reason"
);
// No raw JSON leaked as content.
for r in &results {
if let Some(data) = &r.data {
for choice in &data.inner.choices {
if let Some(content) = &choice.delta.content {
let text = test_utils::extract_text(content);
assert!(
!text.contains("get_weather"),
"tool call JSON should not leak as content, got: {}",
text
);
}
}
}
}
}
/// tool_choice=required with the alternate `arguments` key (SGLang's
/// JsonArrayParser and some vLLM paths emit this variant). The
/// base_json_parser accepts either `parameters` or `arguments`.
#[tokio::test]
async fn test_tool_choice_required_bare_json_with_arguments_key() {
let bare_json = r#"[{"name":"get_weather","arguments":{"location":"Paris"}}]"#;
let input_chunks = vec![
test_utils::create_mock_response_chunk(bare_json.to_string(), 0),
test_utils::create_final_response_chunk(0),
];
let results: Vec<_> = OpenAIPreprocessor::apply_tool_calling_jail(
Some("hermes".to_string()),
Some(ChatCompletionToolChoiceOption::Required),
None,
stream::iter(input_chunks),
)
.collect()
.await;
let tool_calls: Vec<_> = results
.iter()
.flat_map(|r| {
r.data
.as_ref()
.into_iter()
.flat_map(|d| d.inner.choices.iter())
.flat_map(|c| c.delta.tool_calls.iter().flatten())
})
.cloned()
.collect();
assert_eq!(
tool_calls.len(),
1,
"base_json_parser must accept the `arguments` key variant"
);
let args = tool_calls[0]
.function
.as_ref()
.and_then(|f| f.arguments.as_deref())
.unwrap_or_default();
assert!(
args.contains("Paris"),
"arguments should carry the parameters payload, got: {}",
args
);
}
/// tool_choice=named + parser configured + bare JSON array from guided
/// decoding. The call must be parsed (by base_json_parser) and the
/// named-tool filter must accept a matching name; finish_reason stays
/// Stop per OpenAI spec for named tool_choice.
#[tokio::test]
async fn test_tool_choice_named_with_parser_bare_json() {
let bare_json = r#"[{"name":"get_weather","parameters":{"location":"Tokyo"}}]"#;
let input_chunks = vec![
test_utils::create_mock_response_chunk(bare_json.to_string(), 0),
test_utils::create_final_response_chunk(0),
];
let results: Vec<_> = OpenAIPreprocessor::apply_tool_calling_jail(
Some("minimax_m2".to_string()),
Some(ChatCompletionToolChoiceOption::Named(
"get_weather".to_string().into(),
)),
None,
stream::iter(input_chunks),
)
.collect()
.await;
let tool_calls: Vec<_> = results
.iter()
.flat_map(|r| {
r.data
.as_ref()
.into_iter()
.flat_map(|d| d.inner.choices.iter())
.flat_map(|c| c.delta.tool_calls.iter().flatten())
})
.cloned()
.collect();
assert_eq!(tool_calls.len(), 1);
assert_eq!(
tool_calls[0].function.as_ref().unwrap().name.as_deref(),
Some("get_weather")
);
// OpenAI spec: whenever tool_calls are emitted, finish_reason must be
// ToolCalls — regardless of whether tool_choice was auto, required, or named.
let finish_reasons: Vec<_> = results
.iter()
.filter_map(|r| {
r.data
.as_ref()
.and_then(|d| d.inner.choices.first().and_then(|c| c.finish_reason))
})
.collect();
assert!(
finish_reasons.contains(&FinishReason::ToolCalls),
"tool_choice=named with emitted tool_calls should be rewritten to ToolCalls, got {:?}",
finish_reasons
);
}
/// tool_choice=named + parser + bare JSON where the model emits a
/// different tool than requested. The named_tool_filter must drop
/// the mismatched call so nothing is emitted as a tool call.
#[tokio::test]
async fn test_tool_choice_named_bare_json_wrong_tool_filtered() {
let bare_json = r#"[{"name":"search","parameters":{"q":"foo"}}]"#;
let input_chunks = vec![
test_utils::create_mock_response_chunk(bare_json.to_string(), 0),
test_utils::create_final_response_chunk(0),
];
let results: Vec<_> = OpenAIPreprocessor::apply_tool_calling_jail(
Some("minimax_m2".to_string()),
Some(ChatCompletionToolChoiceOption::Named(
"get_weather".to_string().into(),
)),
None,
stream::iter(input_chunks),
)
.collect()
.await;
let tool_call_count: usize = results
.iter()
.map(|r| {
r.data.as_ref().map_or(0, |d| {
d.inner
.choices
.iter()
.map(|c: &ChatChoiceStream| {
c.delta.tool_calls.as_ref().map_or(0, |tc| tc.len())
})
.sum::<usize>()
})
})
.sum();
assert_eq!(
tool_call_count, 0,
"named_tool_filter must drop tool calls whose name doesn't match the requested tool"
);
}
#[tokio::test]
async fn test_tool_choice_named_no_parser_bare_json_wrong_tool_filtered() {
// Regression: with tool_choice=Named and NO parser, the base-JSON
// parser (step 1 in create_tool_call_choice) can now parse arbitrary
// JSON arrays. The named filter must still apply or a mismatched
// tool name would leak to the client.
let bare_json = r#"[{"name":"search","parameters":{"q":"foo"}}]"#;
let input_chunks = vec![
test_utils::create_mock_response_chunk(bare_json.to_string(), 0),
test_utils::create_final_response_chunk(0),
];
let results: Vec<_> = OpenAIPreprocessor::apply_tool_calling_jail(
None,
Some(ChatCompletionToolChoiceOption::Named(
"get_weather".to_string().into(),
)),
None,
stream::iter(input_chunks),
)
.collect()
.await;
let tool_call_count: usize = results
.iter()
.map(|r| {
r.data.as_ref().map_or(0, |d| {
d.inner
.choices
.iter()
.map(|c: &ChatChoiceStream| {
c.delta.tool_calls.as_ref().map_or(0, |tc| tc.len())
})
.sum::<usize>()
})
})
.sum();
assert_eq!(
tool_call_count, 0,
"Named + no-parser: wrong-name tool call must be filtered"
);
// The filtered-out tool JSON must not leak as assistant content.
let emitted_text: String = results
.iter()
.flat_map(|r| r.data.as_ref().map(|d| &d.inner.choices).into_iter())
.flatten()
.filter_map(|c| match c.delta.content.as_ref()? {
ChatCompletionMessageContent::Text(t) => Some(t.as_str()),
_ => None,
})
.collect();
assert!(
!emitted_text.contains("search"),
"wrong-tool JSON leaked to content: {emitted_text:?}"
); );
} }
} }
...@@ -162,7 +162,7 @@ async fn test_named_tool_choice_parses_json() { ...@@ -162,7 +162,7 @@ async fn test_named_tool_choice_parses_json() {
assert_eq!( assert_eq!(
choice.finish_reason, choice.finish_reason,
Some(dynamo_protocols::types::FinishReason::Stop) Some(dynamo_protocols::types::FinishReason::ToolCalls)
); );
let delta = &choice.delta; let delta = &choice.delta;
assert!(delta.content.is_none() || delta.content.as_ref().map(get_text) == Some("")); assert!(delta.content.is_none() || delta.content.as_ref().map(get_text) == Some(""));
...@@ -320,7 +320,7 @@ async fn test_streaming_named_tool_buffers_until_finish() { ...@@ -320,7 +320,7 @@ async fn test_streaming_named_tool_buffers_until_finish() {
let response = &all_responses[0]; let response = &all_responses[0];
assert_eq!( assert_eq!(
response.inner.choices[0].finish_reason, response.inner.choices[0].finish_reason,
Some(dynamo_protocols::types::FinishReason::Stop) Some(dynamo_protocols::types::FinishReason::ToolCalls)
); );
let tool_calls = response.inner.choices[0].delta.tool_calls.as_ref().unwrap(); let tool_calls = response.inner.choices[0].delta.tool_calls.as_ref().unwrap();
......
...@@ -199,10 +199,10 @@ fn test_required_tool_choice_preserves_content_filter() { ...@@ -199,10 +199,10 @@ fn test_required_tool_choice_preserves_content_filter() {
); );
} }
#[test] #[tokio::test]
fn test_named_tool_choice_normal_stop_becomes_stop() { async fn test_named_tool_choice_normal_stop_becomes_tool_calls() {
let mut request = create_test_request(); let mut request = create_test_request();
request.inner.tool_choice = Some(ChatCompletionToolChoiceOption::Named( let tool_choice = Some(ChatCompletionToolChoiceOption::Named(
ChatCompletionNamedToolChoice { ChatCompletionNamedToolChoice {
r#type: ChatCompletionToolType::Function, r#type: ChatCompletionToolType::Function,
function: FunctionName { function: FunctionName {
...@@ -210,6 +210,7 @@ fn test_named_tool_choice_normal_stop_becomes_stop() { ...@@ -210,6 +210,7 @@ fn test_named_tool_choice_normal_stop_becomes_stop() {
}, },
}, },
)); ));
request.inner.tool_choice = tool_choice.clone();
let mut generator = request.response_generator("req-stop-1".to_string()); let mut generator = request.response_generator("req-stop-1".to_string());
let backend_output = build_backend_output_with_finish( let backend_output = build_backend_output_with_finish(
...@@ -217,14 +218,17 @@ fn test_named_tool_choice_normal_stop_becomes_stop() { ...@@ -217,14 +218,17 @@ fn test_named_tool_choice_normal_stop_becomes_stop() {
common::FinishReason::Stop, common::FinishReason::Stop,
); );
let response = generator let raw_response = generator
.choice_from_postprocessor(backend_output) .choice_from_postprocessor(backend_output)
.expect("choice generation"); .expect("choice generation");
// Normal completion: Stop should remain Stop for named tool choice let response = apply_jail_transformation(raw_response, tool_choice).await;
// OpenAI spec: when tool_calls are emitted, finish_reason must be ToolCalls
// regardless of whether tool_choice was auto, required, or a named function.
assert_eq!( assert_eq!(
response.inner.choices[0].finish_reason, response.inner.choices[0].finish_reason,
Some(dynamo_protocols::types::FinishReason::Stop), Some(dynamo_protocols::types::FinishReason::ToolCalls),
); );
} }
......
...@@ -3,40 +3,30 @@ ...@@ -3,40 +3,30 @@
use crate::{ParserResult, ReasoningParser}; use crate::{ParserResult, ReasoningParser};
use super::base_parser::BasicReasoningParser;
/// MiniMax Append-Think Reasoning Parser. /// MiniMax Append-Think Reasoning Parser.
/// ///
/// The MiniMax model starts generating reasoning content immediately WITHOUT /// The MiniMax model starts generating reasoning content immediately WITHOUT
/// a `<think>` prefix. The model output looks like: /// emitting a `<think>` opener in its output. SGLang's `MiniMaxAppendThinkDetector`
/// `reasoning content here...</think>actual response` /// and vLLM's `MiniMaxM2AppendThinkReasoningParser` both handle this by simply
/// /// prepending `<think>` to the emitted text and classifying the whole stream
/// This parser prepends `<think>` to the first chunk, transforming the stream into: /// as `normal_text`/content — neither extracts reasoning based on a `</think>`
/// `<think>reasoning content here...</think>actual response` /// marker. The tag is left inline for downstream consumers that want to render
/// or post-process it.
/// ///
/// It then delegates to `BasicReasoningParser` for standard `<think>...</think>` /// This parser matches those upstream implementations verbatim: a pass-through
/// extraction, splitting output into `reasoning_text` and `normal_text`. /// with a one-time `<think>` prefix on the first streamed chunk. Reasoning
/// content is never populated.
/// ///
/// Reference: SGLang MiniMaxAppendThinkDetector /// References:
/// - SGLang MiniMaxAppendThinkDetector:
/// https://github.com/sgl-project/sglang/blob/main/python/sglang/srt/parser/reasoning_parser.py /// https://github.com/sgl-project/sglang/blob/main/python/sglang/srt/parser/reasoning_parser.py
#[derive(Debug)] /// - vLLM MiniMaxM2AppendThinkReasoningParser:
/// https://github.com/vllm-project/vllm/blob/main/vllm/reasoning/minimax_m2_reasoning_parser.py
#[derive(Debug, Default)]
pub struct MiniMaxAppendThinkParser { pub struct MiniMaxAppendThinkParser {
inner: BasicReasoningParser, /// Flips to true after the first streamed chunk has received the `<think>`
is_first_chunk: bool, /// prefix so subsequent chunks pass through unchanged.
} prefix_emitted: bool,
impl Default for MiniMaxAppendThinkParser {
fn default() -> Self {
Self {
inner: BasicReasoningParser::new(
"<think>".into(),
"</think>".into(),
false, // force_reasoning=false; we synthesize <think> ourselves
true, // stream_reasoning=true
),
is_first_chunk: true,
}
}
} }
impl MiniMaxAppendThinkParser { impl MiniMaxAppendThinkParser {
...@@ -45,26 +35,32 @@ impl MiniMaxAppendThinkParser { ...@@ -45,26 +35,32 @@ impl MiniMaxAppendThinkParser {
} }
} }
const THINK_START_TOKEN: &str = "<think>";
impl ReasoningParser for MiniMaxAppendThinkParser { impl ReasoningParser for MiniMaxAppendThinkParser {
fn detect_and_parse_reasoning(&mut self, text: &str, token_ids: &[u32]) -> ParserResult { fn detect_and_parse_reasoning(&mut self, text: &str, _token_ids: &[u32]) -> ParserResult {
// Prepend <think> and delegate to the inner parser // Non-streaming: return the full text with a single `<think>` prefix,
let augmented = format!("<think>{}", text); // all as normal_text. Reasoning extraction is intentionally a no-op.
self.inner.detect_and_parse_reasoning(&augmented, token_ids) ParserResult {
normal_text: format!("{THINK_START_TOKEN}{text}"),
reasoning_text: String::new(),
}
} }
fn parse_reasoning_streaming_incremental( fn parse_reasoning_streaming_incremental(
&mut self, &mut self,
text: &str, text: &str,
token_ids: &[u32], _token_ids: &[u32],
) -> ParserResult { ) -> ParserResult {
if self.is_first_chunk { let normal_text = if !self.prefix_emitted {
self.is_first_chunk = false; self.prefix_emitted = true;
let augmented = format!("<think>{}", text); format!("{THINK_START_TOKEN}{text}")
self.inner
.parse_reasoning_streaming_incremental(&augmented, token_ids)
} else { } else {
self.inner text.to_string()
.parse_reasoning_streaming_incremental(text, token_ids) };
ParserResult {
normal_text,
reasoning_text: String::new(),
} }
} }
} }
...@@ -74,123 +70,80 @@ mod tests { ...@@ -74,123 +70,80 @@ mod tests {
use super::*; use super::*;
#[test] #[test]
fn test_detect_and_parse_no_end_token() { fn test_detect_and_parse_prepends_think_all_as_normal_text() {
let mut parser = MiniMaxAppendThinkParser::new(); let mut parser = MiniMaxAppendThinkParser::new();
let result = parser.detect_and_parse_reasoning("reasoning content here", &[]); let result = parser.detect_and_parse_reasoning("reasoning content here", &[]);
assert_eq!(result.reasoning_text, "reasoning content here"); // Matches SGLang: everything is normal_text with a `<think>` prefix.
assert_eq!(result.normal_text, ""); assert_eq!(result.normal_text, "<think>reasoning content here");
assert_eq!(result.reasoning_text, "");
} }
#[test] #[test]
fn test_detect_and_parse_with_end_token() { fn test_detect_and_parse_with_end_token_is_still_normal_text() {
let mut parser = MiniMaxAppendThinkParser::new(); let mut parser = MiniMaxAppendThinkParser::new();
let result = let result =
parser.detect_and_parse_reasoning("reasoning content</think>normal response", &[]); parser.detect_and_parse_reasoning("reasoning content</think>normal response", &[]);
assert_eq!(result.reasoning_text, "reasoning content"); // SGLang does not split on `</think>` — the whole string (with the
assert_eq!(result.normal_text, "normal response"); // prepended `<think>`) flows through as normal_text.
assert_eq!(
result.normal_text,
"<think>reasoning content</think>normal response"
);
assert_eq!(result.reasoning_text, "");
} }
#[test] #[test]
fn test_streaming_basic_flow() { fn test_streaming_first_chunk_gets_prefix_rest_pass_through() {
let mut parser = MiniMaxAppendThinkParser::new(); let mut parser = MiniMaxAppendThinkParser::new();
// First chunk: model starts reasoning without <think>
let r1 = parser.parse_reasoning_streaming_incremental("I need to ", &[]); let r1 = parser.parse_reasoning_streaming_incremental("I need to ", &[]);
assert_eq!(r1.reasoning_text, "I need to "); assert_eq!(r1.normal_text, "<think>I need to ");
assert_eq!(r1.normal_text, ""); assert_eq!(r1.reasoning_text, "");
// Middle chunk: still reasoning
let r2 = parser.parse_reasoning_streaming_incremental("check the weather", &[]); let r2 = parser.parse_reasoning_streaming_incremental("check the weather", &[]);
assert_eq!(r2.reasoning_text, "check the weather"); assert_eq!(r2.normal_text, "check the weather");
assert_eq!(r2.normal_text, "");
// End of reasoning
let r3 = parser.parse_reasoning_streaming_incremental("</think>The weather is sunny.", &[]);
assert_eq!(r3.reasoning_text, "");
assert_eq!(r3.normal_text, "The weather is sunny.");
}
#[test]
fn test_streaming_end_token_split_across_chunks() {
let mut parser = MiniMaxAppendThinkParser::new();
// With stream_reasoning=true, reasoning is emitted immediately
let r1 = parser.parse_reasoning_streaming_incremental("reasoning", &[]);
assert_eq!(r1.reasoning_text, "reasoning");
assert_eq!(r1.normal_text, "");
// </think> split across chunks - partial match should buffer
let r2 = parser.parse_reasoning_streaming_incremental("</thi", &[]);
assert_eq!(r2.reasoning_text, ""); assert_eq!(r2.reasoning_text, "");
assert_eq!(r2.normal_text, "");
// Complete the end token - reasoning already streamed in r1, let r3 = parser.parse_reasoning_streaming_incremental("</think>The weather is sunny.", &[]);
// so r3 only contains the normal text after </think> // No split — `</think>` passes through verbatim in normal_text.
let r3 = parser.parse_reasoning_streaming_incremental("nk>normal text", &[]); assert_eq!(r3.normal_text, "</think>The weather is sunny.");
assert_eq!(r3.reasoning_text, ""); assert_eq!(r3.reasoning_text, "");
assert_eq!(r3.normal_text, "normal text");
} }
#[test] #[test]
fn test_streaming_only_reasoning_no_end() { fn test_streaming_bare_json_tool_call_is_normal_text() {
// Regression: under SGLang guided decoding the model emits a bare
// JSON array with no `</think>`. The parser must not capture it as
// reasoning — it must pass through so the tool-call jail can extract
// it into structured tool_calls.
let mut parser = MiniMaxAppendThinkParser::new(); let mut parser = MiniMaxAppendThinkParser::new();
let r = parser.parse_reasoning_streaming_incremental(
let r1 = parser.parse_reasoning_streaming_incremental("still thinking", &[]); r#"[{"name":"get_weather","parameters":{"location":"San Francisco"}}]"#,
assert_eq!(r1.reasoning_text, "still thinking"); &[],
assert_eq!(r1.normal_text, ""); );
assert_eq!(
let r2 = parser.parse_reasoning_streaming_incremental(" more thought", &[]); r.normal_text,
assert_eq!(r2.reasoning_text, " more thought"); r#"<think>[{"name":"get_weather","parameters":{"location":"San Francisco"}}]"#
assert_eq!(r2.normal_text, ""); );
assert_eq!(r.reasoning_text, "");
} }
#[test] #[test]
fn test_streaming_with_tool_call_after_reasoning() { fn test_streaming_tool_call_after_reasoning_is_all_normal_text() {
let mut parser = MiniMaxAppendThinkParser::new(); let mut parser = MiniMaxAppendThinkParser::new();
let r1 = parser.parse_reasoning_streaming_incremental("let me call a tool", &[]); let r1 = parser.parse_reasoning_streaming_incremental("let me call a tool", &[]);
assert_eq!(r1.reasoning_text, "let me call a tool"); assert_eq!(r1.normal_text, "<think>let me call a tool");
let r2 = parser.parse_reasoning_streaming_incremental( let r2 = parser.parse_reasoning_streaming_incremental(
"</think><minimax:tool_call><invoke name=\"get_weather\">", "</think><minimax:tool_call><invoke name=\"get_weather\">",
&[], &[],
); );
assert_eq!(r2.reasoning_text, ""); // Entire chunk is normal_text — `</think>` is not consumed.
assert!( assert_eq!(
r2.normal_text r2.normal_text,
.contains("<minimax:tool_call><invoke name=\"get_weather\">") "</think><minimax:tool_call><invoke name=\"get_weather\">"
); );
}
#[test]
fn test_streaming_tool_call_angle_bracket_split_tokens() {
// Reproduces the bug where `<` before `<invoke` is consumed by the
// reasoning parser's prefix matching after reasoning ends.
let mut parser = MiniMaxAppendThinkParser::new();
// Reasoning phase
let r1 = parser.parse_reasoning_streaming_incremental("let me check the weather", &[]);
assert_eq!(r1.reasoning_text, "let me check the weather");
// End reasoning
let r2 = parser.parse_reasoning_streaming_incremental("</think>", &[]);
assert_eq!(r2.reasoning_text, ""); assert_eq!(r2.reasoning_text, "");
assert_eq!(r2.normal_text, "");
// Tool call start marker
let r3 = parser.parse_reasoning_streaming_incremental("<minimax:tool_call>", &[]);
assert_eq!(r3.normal_text, "<minimax:tool_call>");
// Newline
let r4 = parser.parse_reasoning_streaming_incremental("\n", &[]);
assert_eq!(r4.normal_text, "\n");
// `<` as a separate token must NOT be buffered after reasoning ends
let r5 = parser.parse_reasoning_streaming_incremental("<", &[]);
assert_eq!(r5.normal_text, "<");
// Rest of the invoke tag
let r6 = parser.parse_reasoning_streaming_incremental("invoke name=\"get_weather\">", &[]);
assert_eq!(r6.normal_text, "invoke name=\"get_weather\">");
} }
} }
...@@ -25,6 +25,13 @@ pub struct JsonParserConfig { ...@@ -25,6 +25,13 @@ pub struct JsonParserConfig {
/// The type of JSON parser to use /// The type of JSON parser to use
#[serde(default)] #[serde(default)]
pub parser_type: JsonParserType, pub parser_type: JsonParserType,
/// Parse input as bare JSON (a `{...}` object or `[...]` array) with no
/// wrapping markers. Intended for guided-decoding paths where the backend
/// emits a raw JSON shape. When true, `tool_call_start_tokens` /
/// `tool_call_end_tokens` are ignored.
#[serde(default)]
pub bare_json_mode: bool,
} }
impl Default for JsonParserConfig { impl Default for JsonParserConfig {
...@@ -36,6 +43,7 @@ impl Default for JsonParserConfig { ...@@ -36,6 +43,7 @@ impl Default for JsonParserConfig {
function_name_keys: vec!["name".to_string()], function_name_keys: vec!["name".to_string()],
arguments_keys: vec!["arguments".to_string(), "parameters".to_string()], arguments_keys: vec!["arguments".to_string(), "parameters".to_string()],
parser_type: JsonParserType::Basic, parser_type: JsonParserType::Basic,
bare_json_mode: false,
} }
} }
} }
......
...@@ -180,8 +180,9 @@ pub fn try_tool_call_parse_basic_json( ...@@ -180,8 +180,9 @@ pub fn try_tool_call_parse_basic_json(
let tool_call_start_tokens = &config.tool_call_start_tokens; let tool_call_start_tokens = &config.tool_call_start_tokens;
let tool_call_end_tokens = &config.tool_call_end_tokens; let tool_call_end_tokens = &config.tool_call_end_tokens;
// Early exit if no tokens configured // Early exit if no tokens configured (unless bare_json_mode forces the
if tool_call_start_tokens.is_empty() { // no-marker extraction path).
if tool_call_start_tokens.is_empty() && !config.bare_json_mode {
return Ok((vec![], Some(trimmed.to_string()))); return Ok((vec![], Some(trimmed.to_string())));
} }
...@@ -191,8 +192,10 @@ pub fn try_tool_call_parse_basic_json( ...@@ -191,8 +192,10 @@ pub fn try_tool_call_parse_basic_json(
let mut normal_text = trimmed.to_string(); let mut normal_text = trimmed.to_string();
let mut found_start_token_with_no_valid_json = false; let mut found_start_token_with_no_valid_json = false;
// First, check if ANY start token exists in the input // First, check if ANY start token exists in the input. `bare_json_mode`
let has_start_token = tool_call_start_tokens // short-circuits this to false so we always take the no-marker branch.
let has_start_token = !config.bare_json_mode
&& tool_call_start_tokens
.iter() .iter()
.any(|token| !token.is_empty() && normal_text.contains(token)); .any(|token| !token.is_empty() && normal_text.contains(token));
......
...@@ -50,6 +50,7 @@ pub fn get_tool_parser_map() -> &'static HashMap<&'static str, ToolCallConfig> { ...@@ -50,6 +50,7 @@ pub fn get_tool_parser_map() -> &'static HashMap<&'static str, ToolCallConfig> {
map.insert("kimi_k2", ToolCallConfig::kimi_k2()); map.insert("kimi_k2", ToolCallConfig::kimi_k2());
map.insert("default", ToolCallConfig::default()); map.insert("default", ToolCallConfig::default());
map.insert("nemotron_nano", ToolCallConfig::qwen3_coder()); // nemotron nano follows qwen3_coder format map.insert("nemotron_nano", ToolCallConfig::qwen3_coder()); // nemotron nano follows qwen3_coder format
map.insert("qwen25", ToolCallConfig::hermes()); // qwen2.5 uses the same <tool_call>...</tool_call> format as hermes
map map
}) })
} }
...@@ -246,6 +247,7 @@ mod tests { ...@@ -246,6 +247,7 @@ mod tests {
"minimax_m2", "minimax_m2",
"glm47", "glm47",
"kimi_k2", "kimi_k2",
"qwen25",
]; ];
for parser in available_parsers { for parser in available_parsers {
assert!(parsers.contains(&parser)); assert!(parsers.contains(&parser));
...@@ -1714,6 +1716,72 @@ Remember, San Francisco weather can be quite unpredictable, particularly with it ...@@ -1714,6 +1716,72 @@ Remember, San Francisco weather can be quite unpredictable, particularly with it
assert_eq!(args["location"], "San Francisco, CA"); assert_eq!(args["location"], "San Francisco, CA");
assert_eq!(args["unit"], "celsius"); assert_eq!(args["unit"], "celsius");
} }
#[tokio::test]
async fn test_qwen25_simple() {
// Qwen2.5 tool call format (matches sglang qwen25_detector):
// <tool_call>\n{"name": ..., "arguments": {...}}\n</tool_call>
let input = r#"<tool_call>
{"name": "get_weather", "arguments": {"location": "San Francisco, CA", "unit": "fahrenheit"}}
</tool_call>"#;
let (result, content) = detect_and_parse_tool_call(input, Some("qwen25"), None)
.await
.unwrap();
assert_eq!(content, Some("".to_string()));
assert_eq!(result.len(), 1);
let (name, args) = extract_name_and_args(result[0].clone());
assert_eq!(name, "get_weather");
assert_eq!(args["location"], "San Francisco, CA");
assert_eq!(args["unit"], "fahrenheit");
}
#[tokio::test]
async fn test_qwen25_with_normal_text() {
let input = r#"I'll check the weather for you. <tool_call>
{"name": "get_weather", "arguments": {"location": "San Francisco, CA", "unit": "fahrenheit"}}
</tool_call>"#;
let (result, content) = detect_and_parse_tool_call(input, Some("qwen25"), None)
.await
.unwrap();
assert_eq!(content, Some("I'll check the weather for you.".to_string()));
assert_eq!(result.len(), 1);
let (name, _args) = extract_name_and_args(result[0].clone());
assert_eq!(name, "get_weather");
}
#[tokio::test]
async fn test_qwen25_multiple_tool_calls() {
let input = r#"<tool_call>
{"name": "get_weather", "arguments": {"location": "San Francisco, CA", "unit": "fahrenheit"}}
</tool_call>
<tool_call>
{"name": "get_weather", "arguments": {"location": "New York, NY", "unit": "fahrenheit"}}
</tool_call>"#;
let (result, content) = detect_and_parse_tool_call(input, Some("qwen25"), None)
.await
.unwrap();
assert_eq!(content, Some("".to_string()));
assert_eq!(result.len(), 2);
let (name, args) = extract_name_and_args(result[0].clone());
assert_eq!(name, "get_weather");
assert_eq!(args["location"], "San Francisco, CA");
let (name, args) = extract_name_and_args(result[1].clone());
assert_eq!(name, "get_weather");
assert_eq!(args["location"], "New York, NY");
}
#[tokio::test]
async fn test_qwen25_plain_text_only() {
let input = "Hello, how can I help you today?";
let (result, content) = detect_and_parse_tool_call(input, Some("qwen25"), None)
.await
.unwrap();
assert!(result.is_empty());
assert_eq!(
content,
Some("Hello, how can I help you today?".to_string())
);
}
} }
// Comprehensive parallel tool calling tests based on the examples provided // Comprehensive parallel tool calling tests based on the examples provided
...@@ -2545,6 +2613,15 @@ mod detect_parser_tests { ...@@ -2545,6 +2613,15 @@ mod detect_parser_tests {
assert!(result); assert!(result);
} }
#[test]
fn test_e2e_detect_tool_call_start_qwen25() {
let text = r#"<tool_call>
{"name": "get_current_weather", "arguments": {"location": "Tokyo"}}
</tool_call>"#;
let result = detect_tool_call_start(text, Some("qwen25")).unwrap();
assert!(result);
}
#[test] #[test]
fn test_e2e_detect_tool_call_start_pythonic() { fn test_e2e_detect_tool_call_start_pythonic() {
let text = r#"foo(a=1, b=2), bar(x=3)]"#; let text = r#"foo(a=1, b=2), bar(x=3)]"#;
......
...@@ -19,10 +19,12 @@ import json ...@@ -19,10 +19,12 @@ import json
import logging import logging
import os import os
import shutil import shutil
import signal
import time import time
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any, Generator from typing import Any, Generator
import psutil
import pytest import pytest
from tests.conftest import EtcdServer, NatsServer from tests.conftest import EtcdServer, NatsServer
...@@ -68,17 +70,89 @@ def _prepare_log_dir(request, suffix: str) -> str: ...@@ -68,17 +70,89 @@ def _prepare_log_dir(request, suffix: str) -> str:
return log_dir return log_dir
# Command-line patterns used to identify processes this fixture spawned. Each
# pattern is distinctive enough that a targeted sweep between topology
# switches will not touch unrelated processes in the system.
_SGLANG_PROCESS_PATTERNS: tuple[str, ...] = (
"-m dynamo.sglang",
"-m dynamo.frontend",
"SGLANG:EngineCore",
"sglang::scheduler",
)
def _cleanup_sglang_stragglers(timeout: float = 10.0) -> None:
"""Force-kill any surviving SGLang / dynamo-frontend processes.
``ManagedProcess`` is configured with ``terminate_all_matching_process_names
=False`` so its built-in straggler sweep is disabled (it does a system-wide
pkill, which is not xdist-safe). But SGLang's ``EngineCore`` child
processes can outlive the parent's SIGTERM and keep GPU memory pinned,
which would prevent the next topology's worker from initializing.
This helper performs a narrowly-scoped sweep using cmdline/name patterns
that only match processes spawned by this fixture, and waits for them to
exit before returning.
"""
procs: list[psutil.Process] = []
for proc in psutil.process_iter(["pid", "name", "cmdline"]):
try:
cmdline = " ".join(proc.info.get("cmdline") or [])
name = proc.info.get("name") or ""
if any(pat in cmdline or pat in name for pat in _SGLANG_PROCESS_PATTERNS):
procs.append(proc)
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
if not procs:
return
logger.info(
"Post-teardown straggler sweep: sending SIGKILL to %d process(es): %s",
len(procs),
[(p.pid, (p.info.get("name") or "")[:40]) for p in procs],
)
for proc in procs:
try:
proc.send_signal(signal.SIGKILL)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
# Wait for processes to actually exit before the next topology boots.
gone, alive = psutil.wait_procs(procs, timeout=timeout)
if alive:
logger.warning(
"Straggler(s) still alive after %ss: %s",
timeout,
[(p.pid, p.name()) for p in alive],
)
# Topology identifiers. Both drive the exact same test bodies; they differ in
# where the SGLang chat processor / tool-call parser / reasoning parser live:
#
# * ``chat_processor_frontend`` — Python SGLang chat processor on the
# frontend (``--dyn-chat-processor sglang``) with parsers declared as
# frontend flags. Worker is a plain ``dynamo.sglang`` engine.
#
# * ``rust_parsers`` — plain Rust frontend (``dynamo.frontend``) with no
# chat processor or parser flags. Parsers are declared on the worker
# (``--dyn-reasoning-parser`` / ``--dyn-tool-call-parser``) and
# propagated to the frontend via the model runtime config registered at
# discovery time.
TOPOLOGIES = ("chat_processor_frontend", "rust_parsers")
class WorkerProcess(ManagedProcess): class WorkerProcess(ManagedProcess):
"""backend worker for the tool-calling tests.""" """backend worker for the tool-calling tests."""
def __init__(self, request, *, system_port: int): def __init__(self, request, *, system_port: int, topology: str):
env = os.environ.copy() env = os.environ.copy()
env["DYN_LOG"] = "info" env["DYN_LOG"] = "info"
env["DYN_SYSTEM_PORT"] = str(system_port) env["DYN_SYSTEM_PORT"] = str(system_port)
env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]' env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]'
super().__init__( command = [
command=[
"python3", "python3",
"-m", "-m",
"dynamo.sglang", "dynamo.sglang",
...@@ -87,7 +161,17 @@ class WorkerProcess(ManagedProcess): ...@@ -87,7 +161,17 @@ class WorkerProcess(ManagedProcess):
"--served-model-name", "--served-model-name",
MODEL_NAME, MODEL_NAME,
"--trust-remote-code", "--trust-remote-code",
], ]
if topology == "rust_parsers":
command += [
"--dyn-reasoning-parser",
"qwen3",
"--dyn-tool-call-parser",
"qwen25",
]
super().__init__(
command=command,
env=env, env=env,
health_check_urls=[ health_check_urls=[
(f"http://localhost:{system_port}/health", _check_ready), (f"http://localhost:{system_port}/health", _check_ready),
...@@ -97,19 +181,20 @@ class WorkerProcess(ManagedProcess): ...@@ -97,19 +181,20 @@ class WorkerProcess(ManagedProcess):
terminate_all_matching_process_names=False, terminate_all_matching_process_names=False,
stragglers=["SGLANG:EngineCore"], stragglers=["SGLANG:EngineCore"],
straggler_commands=["-m dynamo.sglang"], straggler_commands=["-m dynamo.sglang"],
log_dir=_prepare_log_dir(request, "sglang-worker"), log_dir=_prepare_log_dir(request, f"sglang-worker-{topology}"),
) )
class ToolCallingFrontendProcess(ManagedProcess): class ToolCallingFrontendProcess(ManagedProcess):
"""Frontend HTTP ingress. """Frontend HTTP ingress.
SGLang-specific chat processor, tool-call parser, and reasoning parser The chat processor + parser flags are attached only for the
flags are only attached when ``sglang`` is importable in the current ``chat_processor_frontend`` topology. The ``rust_parsers`` topology
environment (otherwise the frontend would fail to load them). uses a plain Rust frontend and relies on parsers declared on the worker
side.
""" """
def __init__(self, request, *, frontend_port: int): def __init__(self, request, *, frontend_port: int, topology: str):
env = os.environ.copy() env = os.environ.copy()
env["DYN_LOG"] = "info" env["DYN_LOG"] = "info"
env.pop("DYN_SYSTEM_PORT", None) env.pop("DYN_SYSTEM_PORT", None)
...@@ -122,6 +207,9 @@ class ToolCallingFrontendProcess(ManagedProcess): ...@@ -122,6 +207,9 @@ class ToolCallingFrontendProcess(ManagedProcess):
str(frontend_port), str(frontend_port),
"--router-mode", "--router-mode",
"round-robin", "round-robin",
]
if topology == "chat_processor_frontend":
command += [
"--dyn-chat-processor", "--dyn-chat-processor",
"sglang", "sglang",
"--tool-call-parser", "--tool-call-parser",
...@@ -141,7 +229,7 @@ class ToolCallingFrontendProcess(ManagedProcess): ...@@ -141,7 +229,7 @@ class ToolCallingFrontendProcess(ManagedProcess):
display_output=True, display_output=True,
terminate_all_matching_process_names=False, terminate_all_matching_process_names=False,
straggler_commands=["-m dynamo.frontend"], straggler_commands=["-m dynamo.frontend"],
log_dir=_prepare_log_dir(request, "frontend"), log_dir=_prepare_log_dir(request, f"frontend-{topology}"),
) )
...@@ -171,26 +259,44 @@ def runtime_services(request) -> Generator[None, None, None]: ...@@ -171,26 +259,44 @@ def runtime_services(request) -> Generator[None, None, None]:
os.environ.pop("ETCD_ENDPOINTS", None) os.environ.pop("ETCD_ENDPOINTS", None)
@pytest.fixture(scope="module") @pytest.fixture(scope="module", params=TOPOLOGIES)
def tool_calling_services( def tool_calling_services(
request, runtime_services, predownload_models request, runtime_services, predownload_models
) -> Generator[int, None, None]: ) -> Generator[int, None, None]:
"""Start the SGLang worker + tool-calling-aware frontend. """Start the SGLang worker + frontend for the selected topology.
Parameterized so every test runs once per entry in :data:`TOPOLOGIES`,
giving side-by-side coverage of the Python chat-processor frontend and
the plain Rust frontend with worker-declared parsers.
Yields the frontend HTTP port. Yields the frontend HTTP port.
""" """
topology: str = request.param
frontend_port, system_port = allocate_ports(count=2, start_port=10000) frontend_port, system_port = allocate_ports(count=2, start_port=10000)
with WorkerProcess(request, system_port=system_port): try:
with WorkerProcess(request, system_port=system_port, topology=topology):
# Allow worker to register with discovery. # Allow worker to register with discovery.
time.sleep(2) time.sleep(2)
with ToolCallingFrontendProcess(request, frontend_port=frontend_port): with ToolCallingFrontendProcess(
request, frontend_port=frontend_port, topology=topology
):
logger.info( logger.info(
"Tool calling stack ready (frontend=%d worker_system=%d)", "Tool calling stack ready (topology=%s frontend=%d worker_system=%d)",
topology,
frontend_port, frontend_port,
system_port, system_port,
) )
yield frontend_port yield frontend_port
finally:
# ManagedProcess.__exit__ has run for both context managers. Do a
# narrowly-scoped straggler sweep so any surviving EngineCore / worker
# / frontend process is gone before the next topology boots — otherwise
# the next worker would race against pinned GPU memory or a stale
# discovery registration. Followed by a brief settle delay so the OS
# reclaims bound ports and the GPU frees its VRAM.
_cleanup_sglang_stragglers()
time.sleep(3)
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment