Unverified Commit 57e6a79f authored by Graham King's avatar Graham King Committed by GitHub
Browse files

fix(llm): preserve reasoning content when tool-call starts mid-chunk (#6902)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
parent 869e7336
...@@ -608,6 +608,79 @@ impl OpenAIPreprocessor { ...@@ -608,6 +608,79 @@ impl OpenAIPreprocessor {
Ok((builder.build()?, annotations)) Ok((builder.build()?, annotations))
} }
pub fn postprocessor_parsing_stream<S>(
&self,
stream: S,
request: &NvCreateChatCompletionRequest,
) -> anyhow::Result<
impl Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send + 'static,
>
where
S: Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send + 'static,
{
// Try to parse reasoning content only if parser is configured
let should_parse_reasoning = self.runtime_config.reasoning_parser.is_some()
&& !Self::is_reasoning_disabled_by_request(
self.runtime_config.reasoning_parser.as_deref(),
request.chat_template_args.as_ref(),
);
// Reasoning Content Parsing Transformation Step
// Current Solution:
// This step operates on Deltas created by the transform_postprocessor_stream function
// Only access to text and not token_ids - so can not support parsing based on token_ids for now
// Future Solution:
// To address the limitation if needed in future: move this step before transform_postprocessor_stream and add new field of reasoning_content to the backend output
// Use backend_output.reasoning_content field to fill out the deltas.
let stream: Pin<Box<dyn Stream<Item = _> + Send>> = if should_parse_reasoning {
Box::pin(Self::parse_reasoning_content_from_stream(
stream,
self.runtime_config.reasoning_parser.clone().unwrap(), // Safety: We already checked that parser is some, so gtg
))
} else {
Box::pin(stream)
};
// Check if tools are present and if we should apply jail
let has_tools = request
.inner
.tools
.as_ref()
.is_some_and(|tools| !tools.is_empty());
// Determine if we should apply jail (do this before moving request)
let should_jail = Self::should_apply_tool_jail(
self.tool_call_parser.as_ref(),
request.inner.tool_choice.as_ref(),
has_tools,
)?;
// Convert OpenAI tools to parser ToolDefinition format before applying jail
let tool_definitions = request.inner.tools.as_ref().map(|tools| {
tools
.iter()
.map(|tool| dynamo_parsers::tool_calling::ToolDefinition {
name: tool.function.name.clone(),
parameters: tool.function.parameters.clone(),
})
.collect()
});
// Apply jail conditionally
let transformed_stream: Pin<Box<dyn Stream<Item = _> + Send>> = if should_jail {
Box::pin(Self::apply_tool_calling_jail(
self.tool_call_parser.clone(),
request.inner.tool_choice.clone(),
tool_definitions,
stream,
))
} else {
Box::pin(stream)
};
Ok(transformed_stream)
}
pub fn transform_postprocessor_stream<S, Resp>( pub fn transform_postprocessor_stream<S, Resp>(
stream: S, stream: S,
generator: Box<dyn DeltaGeneratorExt<Resp>>, generator: Box<dyn DeltaGeneratorExt<Resp>>,
...@@ -1121,71 +1194,18 @@ impl ...@@ -1121,71 +1194,18 @@ impl
// Extract context once // Extract context once
let context = response_stream.context(); let context = response_stream.context();
// transform the postprocessor stream (no boxing yet) // transform the postprocessor stream (no boxing yet) - detokenize
let stream = Self::transform_postprocessor_stream( let stream = Self::transform_postprocessor_stream(
response_stream, response_stream,
response_generator, response_generator,
context.clone(), context.clone(),
); );
// Try to parse reasoning content only if parser is configured let transformed_stream = self.postprocessor_parsing_stream(stream, &request)?;
let should_parse_reasoning = self.runtime_config.reasoning_parser.is_some()
&& !Self::is_reasoning_disabled_by_request(
self.runtime_config.reasoning_parser.as_deref(),
request.chat_template_args.as_ref(),
);
// Reasoning Content Parsing Transformation Step
// Current Solution:
// This step operates on Deltas created by the transform_postprocessor_stream function
// Only access to text and not token_ids - so can not support parsing based on token_ids for now
// Future Solution:
// To address the limitation if needed in future: move this step before transform_postprocessor_stream and add new field of reasoning_content to the backend output
// Use backend_output.reasoning_content field to fill out the deltas.
let stream: Pin<Box<dyn Stream<Item = _> + Send>> = if should_parse_reasoning {
Box::pin(Self::parse_reasoning_content_from_stream(
stream,
self.runtime_config.reasoning_parser.clone().unwrap(), // Safety: We already checked that parser is some, so gtg
))
} else {
Box::pin(stream)
};
// Check if tools are present and if we should apply jail
let has_tools =
request.inner.tools.is_some() && !request.inner.tools.as_ref().unwrap().is_empty();
// Determine if we should apply jail (do this before moving request) // Apply audit aggregation strategy.
let should_jail = Self::should_apply_tool_jail( // The audit branch already returns Pin<Box<...>> from scan/fold_aggregate_with_future,
self.tool_call_parser.as_ref(), // while the non-audit branch boxes the impl Stream from postprocessor_parsing_stream.
request.inner.tool_choice.as_ref(),
has_tools,
)?;
// Convert OpenAI tools to parser ToolDefinition format before applying jail
let tool_definitions = request.inner.tools.as_ref().map(|tools| {
tools
.iter()
.map(|tool| dynamo_parsers::tool_calling::ToolDefinition {
name: tool.function.name.clone(),
parameters: tool.function.parameters.clone(),
})
.collect()
});
// Apply jail conditionally
let transformed_stream: Pin<Box<dyn Stream<Item = _> + Send>> = if should_jail {
Box::pin(Self::apply_tool_calling_jail(
self.tool_call_parser.clone(),
request.inner.tool_choice.clone(),
tool_definitions,
stream,
))
} else {
Box::pin(stream)
};
// Step 4: Apply audit aggregation strategy
let final_stream = if let Some(mut audit) = audit_handle { let final_stream = if let Some(mut audit) = audit_handle {
let (stream, agg_fut) = if audit.streaming() { let (stream, agg_fut) = if audit.streaming() {
// Streaming: apply scan (pass-through + parallel aggregation) // Streaming: apply scan (pass-through + parallel aggregation)
...@@ -1202,9 +1222,9 @@ impl ...@@ -1202,9 +1222,9 @@ impl
audit.emit(); audit.emit();
}); });
Box::pin(stream) stream
} else { } else {
transformed_stream Box::pin(transformed_stream)
}; };
// Step 5: Speculative next-turn prefill // Step 5: Speculative next-turn prefill
......
...@@ -53,6 +53,16 @@ impl ChoiceEmission { ...@@ -53,6 +53,16 @@ impl ChoiceEmission {
ChoiceEmission::Trailing(choice) => choice.index, ChoiceEmission::Trailing(choice) => choice.index,
} }
} }
/// Get mutable access to the underlying choice.
fn choice_mut(&mut self) -> &mut ChatChoiceStream {
match self {
ChoiceEmission::PassThrough(choice) => choice,
ChoiceEmission::ToolCall(choice) => choice,
ChoiceEmission::Content(choice) => choice,
ChoiceEmission::Trailing(choice) => choice,
}
}
} }
/// Configuration for jail detection and parsing /// Configuration for jail detection and parsing
...@@ -96,6 +106,8 @@ struct ChoiceJailState { ...@@ -96,6 +106,8 @@ struct ChoiceJailState {
stream_finish_reason: Option<FinishReason>, stream_finish_reason: Option<FinishReason>,
/// Number of tool calls already emitted for this choice /// Number of tool calls already emitted for this choice
emitted_tool_calls_count: usize, emitted_tool_calls_count: usize,
/// Reasoning content collected while waiting for a suitable emission.
pending_reasoning_content: Option<String>,
} }
fn create_choice_stream( fn create_choice_stream(
...@@ -136,6 +148,7 @@ impl ChoiceJailState { ...@@ -136,6 +148,7 @@ impl ChoiceJailState {
partial_match_buffer: String::new(), partial_match_buffer: String::new(),
stream_finish_reason: None, stream_finish_reason: None,
emitted_tool_calls_count: 0, emitted_tool_calls_count: 0,
pending_reasoning_content: None,
} }
} }
...@@ -372,7 +385,7 @@ impl ChoiceJailState { ...@@ -372,7 +385,7 @@ impl ChoiceJailState {
None, None,
); );
let final_choice = jail_stream let mut final_choice = jail_stream
.create_tool_call_choice( .create_tool_call_choice(
self.index, self.index,
&self.accumulated_content, &self.accumulated_content,
...@@ -381,6 +394,15 @@ impl ChoiceJailState { ...@@ -381,6 +394,15 @@ impl ChoiceJailState {
) )
.await; .await;
// Preserve any pending reasoning content collected while jailed.
if let Some(pending_reasoning) = self.pending_reasoning_content.take() {
if let Some(existing_reasoning) = final_choice.delta.reasoning_content.as_mut() {
existing_reasoning.push_str(&pending_reasoning);
} else {
final_choice.delta.reasoning_content = Some(pending_reasoning);
}
}
if let Some(ref tool_calls) = final_choice.delta.tool_calls { if let Some(ref tool_calls) = final_choice.delta.tool_calls {
self.emitted_tool_calls_count += tool_calls.len(); self.emitted_tool_calls_count += tool_calls.len();
} }
...@@ -521,6 +543,13 @@ impl JailedStream { ...@@ -521,6 +543,13 @@ impl JailedStream {
let starts_jailed = matches!(self.jail_mode, JailMode::Immediate { .. }); let starts_jailed = matches!(self.jail_mode, JailMode::Immediate { .. });
let choice_state = choice_states.get_or_create_state(choice.index, starts_jailed); let choice_state = choice_states.get_or_create_state(choice.index, starts_jailed);
if let Some(reasoning_content) = &choice.delta.reasoning_content {
let pending = choice_state
.pending_reasoning_content
.get_or_insert_with(String::new);
pending.push_str(reasoning_content);
}
// Store metadata when any choice becomes jailed (first time only) // Store metadata when any choice becomes jailed (first time only)
if !choice_state.is_jailed && self.should_start_jail(text) if !choice_state.is_jailed && self.should_start_jail(text)
&& last_annotated_id.is_none() { && last_annotated_id.is_none() {
...@@ -533,7 +562,13 @@ impl JailedStream { ...@@ -533,7 +562,13 @@ impl JailedStream {
choice_state.stream_finish_reason = choice.finish_reason; choice_state.stream_finish_reason = choice.finish_reason;
// Process this choice and get emissions // Process this choice and get emissions
let emissions = choice_state.process_content(choice, text, &self).await; let mut emissions = choice_state.process_content(choice, text, &self).await;
if !emissions.is_empty()
&& let Some(reasoning) = choice_state.pending_reasoning_content.take()
&& let Some(first) = emissions.first_mut()
{
first.choice_mut().delta.reasoning_content = Some(reasoning);
}
all_emissions.extend(emissions); all_emissions.extend(emissions);
} }
// For multimodal content, pass through unchanged (no jailing) // For multimodal content, pass through unchanged (no jailing)
......
{"id":"chatcmpl-369a1572-4253-4632-bfc6-39631d9c98e9","choices":[{"index":0,"delta":{"content":"<think>","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}],"created":1772656349,"model":"Qwen/Qwen3-0.6B","service_tier":null,"system_fingerprint":null,"object":"chat.completion.chunk","usage":null}
{"id":"chatcmpl-369a1572-4253-4632-bfc6-39631d9c98e9","choices":[{"index":0,"delta":{"content":"\n","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}],"created":1772656349,"model":"Qwen/Qwen3-0.6B","service_tier":null,"system_fingerprint":null,"object":"chat.completion.chunk","usage":null}
{"id":"chatcmpl-369a1572-4253-4632-bfc6-39631d9c98e9","choices":[{"index":0,"delta":{"content":"Okay","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}],"created":1772656349,"model":"Qwen/Qwen3-0.6B","service_tier":null,"system_fingerprint":null,"object":"chat.completion.chunk","usage":null}
{"id":"chatcmpl-369a1572-4253-4632-bfc6-39631d9c98e9","choices":[{"index":0,"delta":{"content":".","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}],"created":1772656349,"model":"Qwen/Qwen3-0.6B","service_tier":null,"system_fingerprint":null,"object":"chat.completion.chunk","usage":null}
{"id":"chatcmpl-369a1572-4253-4632-bfc6-39631d9c98e9","choices":[{"index":0,"delta":{"content":"</think>","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}],"created":1772656349,"model":"Qwen/Qwen3-0.6B","service_tier":null,"system_fingerprint":null,"object":"chat.completion.chunk","usage":null}
{"id":"chatcmpl-369a1572-4253-4632-bfc6-39631d9c98e9","choices":[{"index":0,"delta":{"content":"\n\n","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}],"created":1772656349,"model":"Qwen/Qwen3-0.6B","service_tier":null,"system_fingerprint":null,"object":"chat.completion.chunk","usage":null}
{"id":"chatcmpl-369a1572-4253-4632-bfc6-39631d9c98e9","choices":[{"index":0,"delta":{"content":"The","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}],"created":1772656349,"model":"Qwen/Qwen3-0.6B","service_tier":null,"system_fingerprint":null,"object":"chat.completion.chunk","usage":null}
{"id":"chatcmpl-369a1572-4253-4632-bfc6-39631d9c98e9","choices":[{"index":0,"delta":{"content":" capital","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}],"created":1772656349,"model":"Qwen/Qwen3-0.6B","service_tier":null,"system_fingerprint":null,"object":"chat.completion.chunk","usage":null}
{"id":"chatcmpl-369a1572-4253-4632-bfc6-39631d9c98e9","choices":[{"index":0,"delta":{"content":" of","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}],"created":1772656349,"model":"Qwen/Qwen3-0.6B","service_tier":null,"system_fingerprint":null,"object":"chat.completion.chunk","usage":null}
{"id":"chatcmpl-369a1572-4253-4632-bfc6-39631d9c98e9","choices":[{"index":0,"delta":{"content":" Tu","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}],"created":1772656349,"model":"Qwen/Qwen3-0.6B","service_tier":null,"system_fingerprint":null,"object":"chat.completion.chunk","usage":null}
{"id":"chatcmpl-369a1572-4253-4632-bfc6-39631d9c98e9","choices":[{"index":0,"delta":{"content":"valu","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}],"created":1772656349,"model":"Qwen/Qwen3-0.6B","service_tier":null,"system_fingerprint":null,"object":"chat.completion.chunk","usage":null}
{"id":"chatcmpl-369a1572-4253-4632-bfc6-39631d9c98e9","choices":[{"index":0,"delta":{"content":".","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}],"created":1772656349,"model":"Qwen/Qwen3-0.6B","service_tier":null,"system_fingerprint":null,"object":"chat.completion.chunk","usage":null}
{"id":"chatcmpl-369a1572-4253-4632-bfc6-39631d9c98e9","choices":[{"index":0,"delta":{"content":null,"function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null},"finish_reason":"stop"}],"created":1772656349,"model":"Qwen/Qwen3-0.6B","service_tier":null,"system_fingerprint":null,"object":"chat.completion.chunk","usage":null,"nvext":{"timing":{"request_received_ms":1772656349456,"total_time_ms":465.26718700000004}}}
{"id":"chatcmpl-369a1572-4253-4632-bfc6-39631d9c98e9","choices":[],"created":1772656349,"model":"Qwen/Qwen3-0.6B","service_tier":null,"system_fingerprint":null,"object":"chat.completion.chunk","usage":{"prompt_tokens":16,"completion_tokens":206,"total_tokens":222}}
{"id":"chatcmpl-interval20","choices":[{"index":0,"delta":{"content":"<think>","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}],"created":1772656349,"model":"Qwen/Qwen3-0.6B","service_tier":null,"system_fingerprint":null,"object":"chat.completion.chunk","usage":null}
{"id":"chatcmpl-interval20","choices":[{"index":0,"delta":{"content":"\nOkay, the user is asking for the titles of some James Joyce books and wants me to use","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}],"created":1772656349,"model":"Qwen/Qwen3-0.6B","service_tier":null,"system_fingerprint":null,"object":"chat.completion.chunk","usage":null}
{"id":"chatcmpl-interval20","choices":[{"index":0,"delta":{"content":" the provided tool. Let me check the available functions. There's a search_gutenberg_books function that","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}],"created":1772656349,"model":"Qwen/Qwen3-0.6B","service_tier":null,"system_fingerprint":null,"object":"chat.completion.chunk","usage":null}
{"id":"chatcmpl-interval20","choices":[{"index":0,"delta":{"content":" takes an array of search terms. The user mentioned \"James Joyce books,\" so I need to use","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}],"created":1772656349,"model":"Qwen/Qwen3-0.6B","service_tier":null,"system_fingerprint":null,"object":"chat.completion.chunk","usage":null}
{"id":"chatcmpl-interval20","choices":[{"index":0,"delta":{"content":" the search terms related to that. I should make sure to list the relevant terms. Let me think","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}],"created":1772656349,"model":"Qwen/Qwen3-0.6B","service_tier":null,"system_fingerprint":null,"object":"chat.completion.chunk","usage":null}
{"id":"chatcmpl-interval20","choices":[{"index":0,"delta":{"content":"... \"James Joyce\" and \"Project Gutenberg\" might be the keywords here. So I'll structure","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}],"created":1772656349,"model":"Qwen/Qwen3-0.6B","service_tier":null,"system_fingerprint":null,"object":"chat.completion.chunk","usage":null}
{"id":"chatcmpl-interval20","choices":[{"index":0,"delta":{"content":" the search terms as [\"James Joyce\", \"Project Gutenberg\"] to find the books. That should cover","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}],"created":1772656349,"model":"Qwen/Qwen3-0.6B","service_tier":null,"system_fingerprint":null,"object":"chat.completion.chunk","usage":null}
{"id":"chatcmpl-interval20","choices":[{"index":0,"delta":{"content":" the user's request.\n</think>\n\n<tool_call>\n{\"name\": \"search_gutenberg_books\", \"arguments","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}],"created":1772656349,"model":"Qwen/Qwen3-0.6B","service_tier":null,"system_fingerprint":null,"object":"chat.completion.chunk","usage":null}
{"id":"chatcmpl-interval20","choices":[{"index":0,"delta":{"content":"\": {\"search_terms\": [\"James Joyce\", \"Project Gutenberg\"]}}\n</tool_call>","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null},"finish_reason":"stop"}],"created":1772656349,"model":"Qwen/Qwen3-0.6B","service_tier":null,"system_fingerprint":null,"object":"chat.completion.chunk","usage":null}
// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::collections::BTreeMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use dynamo_async_openai::types::{
ChatCompletionMessageContent, ChatCompletionToolChoiceOption, FinishReason,
};
use dynamo_llm::model_card::ModelDeploymentCard;
use dynamo_llm::preprocessor::OpenAIPreprocessor;
use dynamo_llm::protocols::openai::chat_completions::{
NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse,
};
use dynamo_runtime::protocols::annotated::Annotated;
use futures::{StreamExt, stream};
use serde_json::Value;
const REQUEST_JSON: &str = r#"{"messages":[{"role":"user","content":"What is the capital of Tuvalu?"}],"model":"Qwen/Qwen3-0.6B","max_completion_tokens":3000,"stream":true,"stream_options":{"include_usage":true,"continuous_usage_stats":false},"temperature":1.0,"top_p":1.0}"#;
fn build_preprocessor(
reasoning_parser: Option<&str>,
tool_call_parser: Option<&str>,
) -> Arc<OpenAIPreprocessor> {
let model_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests/data/sample-models/mock-llama-3.1-8b-instruct");
let mut mdc = ModelDeploymentCard::load_from_disk(model_path, None).unwrap();
mdc.runtime_config.reasoning_parser = reasoning_parser.map(ToString::to_string);
mdc.runtime_config.tool_call_parser = tool_call_parser.map(ToString::to_string);
OpenAIPreprocessor::new(mdc).unwrap()
}
fn fixture_path(name: &str) -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests/data/replays")
.join(name)
}
fn parse_fixture(
jsonl_path: &Path,
) -> (
NvCreateChatCompletionRequest,
Vec<Value>,
Vec<NvCreateChatCompletionStreamResponse>,
) {
let content = fs::read_to_string(jsonl_path)
.unwrap_or_else(|e| panic!("failed to read fixture {}: {e}", jsonl_path.display()));
let mut expected_stream_json = Vec::new();
let mut input_chunks = Vec::new();
for line in content.lines().filter(|l| !l.is_empty()) {
let value: Value = serde_json::from_str(line).unwrap();
let chunk: NvCreateChatCompletionStreamResponse =
serde_json::from_value(value.clone()).unwrap();
expected_stream_json.push(value);
input_chunks.push(chunk);
}
let request: NvCreateChatCompletionRequest = serde_json::from_str(REQUEST_JSON).unwrap();
assert!(
!input_chunks.is_empty(),
"missing stream chunks in fixture {}",
jsonl_path.display()
);
(request, expected_stream_json, input_chunks)
}
fn get_text(content: &ChatCompletionMessageContent) -> &str {
match content {
ChatCompletionMessageContent::Text(text) => text.as_str(),
ChatCompletionMessageContent::Parts(_) => "",
}
}
/// Accumulates streamed tool call deltas into complete tool calls for assertion.
#[derive(Default, Clone)]
struct MergedToolCall {
id: Option<String>,
r#type: Option<String>,
name: Option<String>,
arguments: String,
}
impl MergedToolCall {
fn merge_from(
&mut self,
tool_call: &dynamo_async_openai::types::ChatCompletionMessageToolCallChunk,
) {
if self.id.is_none() {
self.id = tool_call.id.clone();
}
if self.r#type.is_none() {
self.r#type = tool_call.r#type.as_ref().map(|t| {
serde_json::to_string(t)
.unwrap()
.trim_matches('"')
.to_string()
});
}
if let Some(function) = &tool_call.function {
if self.name.is_none() {
self.name = function.name.clone();
}
if let Some(arguments) = &function.arguments {
self.arguments.push_str(arguments);
}
}
}
}
#[tokio::test]
async fn postprocessor_parsing_stream_replays_unit_test_fixture() {
let preprocessor = build_preprocessor(None, None);
let (request, expected_stream_json, input_chunks) =
parse_fixture(&fixture_path("stream_interval_1.jsonl"));
let input_stream = stream::iter(input_chunks.into_iter().map(Annotated::from_data));
let output_stream = preprocessor
.postprocessor_parsing_stream(input_stream, &request)
.expect("postprocessor_parsing_stream should build");
let output_chunks: Vec<Annotated<NvCreateChatCompletionStreamResponse>> =
output_stream.collect().await;
assert_eq!(output_chunks.len(), expected_stream_json.len());
for (idx, (output, expected)) in output_chunks
.iter()
.zip(expected_stream_json.iter())
.enumerate()
{
let output_data = output
.data
.as_ref()
.expect("output stream chunk should include data");
let output_json = serde_json::to_value(output_data).unwrap();
assert_eq!(output_json, *expected, "chunk {idx} did not match fixture");
}
}
#[tokio::test]
async fn postprocessor_parsing_stream_replays_interval_20_fixture() {
let preprocessor = build_preprocessor(Some("qwen"), Some("hermes"));
let (mut request, _expected_stream_json, input_chunks) =
parse_fixture(&fixture_path("stream_interval_20.jsonl"));
// Mirror tests/frontend/test_prepost.py::request_for_sampling
let tools: Vec<dynamo_async_openai::types::ChatCompletionTool> =
serde_json::from_value(serde_json::json!([
{
"type": "function",
"function": {
"name": "search_gutenberg_books",
"description": "Search for books in the Project Gutenberg library",
"parameters": {
"type": "object",
"properties": {
"search_terms": {
"type": "array",
"items": {"type": "string"},
"description": "List of search terms to find books"
}
},
"required": ["search_terms"]
}
}
}
]))
.unwrap();
request.inner.tools = Some(tools);
request.inner.tool_choice = Some(ChatCompletionToolChoiceOption::Auto);
let input_stream = stream::iter(input_chunks.into_iter().map(Annotated::from_data));
let output_stream = preprocessor
.postprocessor_parsing_stream(input_stream, &request)
.expect("postprocessor_parsing_stream should build");
let output_chunks: Vec<Annotated<NvCreateChatCompletionStreamResponse>> =
output_stream.collect().await;
let mut reasoning = String::new();
let mut all_content = String::new();
let mut finish_reasons = Vec::new();
let mut merged_tool_calls: BTreeMap<u32, MergedToolCall> = BTreeMap::new();
for output in &output_chunks {
let Some(output_data) = output.data.as_ref() else {
continue;
};
for choice in &output_data.choices {
if let Some(reasoning_content) = &choice.delta.reasoning_content {
reasoning.push_str(reasoning_content);
}
if let Some(content) = &choice.delta.content {
all_content.push_str(get_text(content));
}
if let Some(reason) = choice.finish_reason {
finish_reasons.push(reason);
}
if let Some(tool_calls) = &choice.delta.tool_calls {
for tool_call in tool_calls {
merged_tool_calls
.entry(tool_call.index)
.or_default()
.merge_from(tool_call);
}
}
}
}
let tool_calls: Vec<MergedToolCall> = merged_tool_calls.values().cloned().collect();
// Port of tests/frontend/test_prepost.py::test_stream_interval_20
assert!(
reasoning.contains("the user is asking for the titles of some James Joyce books"),
"reasoning did not contain expected phrase: {reasoning}"
);
assert!(
reasoning.contains("the user's request.\n"),
"reasoning did not contain expected ending: {reasoning}"
);
assert_eq!(
tool_calls.len(),
1,
"Expected 1 tool call but got {}. Tool-call markup was likely emitted as plain content instead.",
tool_calls.len()
);
let tc = &tool_calls[0];
assert_eq!(tc.name.as_deref(), Some("search_gutenberg_books"));
let arguments_json: Value = serde_json::from_str(&tc.arguments).unwrap();
assert_eq!(
arguments_json,
serde_json::json!({
"search_terms": ["James Joyce", "Project Gutenberg"]
})
);
assert!(
tc.id
.as_ref()
.is_some_and(|id| id.starts_with("call-") || id.starts_with("chatcmpl-tool-")),
"tool call id did not match expected prefix: {:?}",
tc.id
);
assert_eq!(tc.r#type.as_deref(), Some("function"));
assert!(
!all_content.contains("<tool_call>"),
"Raw <tool_call> markup leaked into content: {all_content:?}"
);
assert!(!all_content.contains("</tool_call>"));
if !finish_reasons.is_empty() {
assert!(
finish_reasons.contains(&FinishReason::Stop)
|| finish_reasons.contains(&FinishReason::ToolCalls),
"expected terminal finish reason (stop/tool_calls), got: {:?}",
finish_reasons
);
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment