"vscode:/vscode.git/clone" did not exist on "57393715e804387588241fbdb4ec94a7570230b6"
Unverified Commit f12c8605 authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

fix(frontend): reconstruct tool-call arguments split across streaming deltas (#8582)


Signed-off-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent dee63d96
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use std::collections::HashMap; use std::collections::{BTreeMap, HashMap};
use dynamo_parsers::tool_calling::try_tool_call_parse_aggregate; use dynamo_parsers::tool_calling::try_tool_call_parse_aggregate;
...@@ -56,7 +56,18 @@ struct DeltaChoice { ...@@ -56,7 +56,18 @@ struct DeltaChoice {
stop_reason: Option<StopReason>, stop_reason: Option<StopReason>,
/// Optional log probabilities for the chat choice. /// Optional log probabilities for the chat choice.
logprobs: Option<dynamo_protocols::types::ChatChoiceLogprobs>, logprobs: Option<dynamo_protocols::types::ChatChoiceLogprobs>,
// Optional tool calls for the chat choice. // Tool-call chunks accumulated in the order they arrived from the stream,
// keyed by `index` so chunks that carry only argument fragments can be
// merged into the entry created by the initial (id + name) chunk.
// BTreeMap preserves deterministic iteration order on the index dimension.
// See [`merge_tool_call_chunk`] for per-field merge semantics.
// #8640: replaces the old `Option<Vec<ChatCompletionMessageToolCall>>`
// which required id/name/arguments to all be set on the same chunk.
tool_call_chunks: BTreeMap<u32, dynamo_protocols::types::ChatCompletionMessageToolCallChunk>,
// Optional tool calls for the chat choice, populated *after* fold either
// by finalizing `tool_call_chunks` above, or by
// `try_tool_call_parse_aggregate` running against `text` for producers
// that put tool calls in content rather than as structured chunks.
tool_calls: Option<Vec<dynamo_protocols::types::ChatCompletionMessageToolCall>>, tool_calls: Option<Vec<dynamo_protocols::types::ChatCompletionMessageToolCall>>,
/// Optional reasoning content for the chat choice. /// Optional reasoning content for the chat choice.
...@@ -73,26 +84,102 @@ impl Default for DeltaAggregator { ...@@ -73,26 +84,102 @@ impl Default for DeltaAggregator {
} }
} }
fn convert_tool_chunk_to_message_tool_call( /// Merge an incoming chunk into the per-index accumulator.
chunk: &dynamo_protocols::types::ChatCompletionMessageToolCallChunk, ///
/// #8640: the prior implementation required `id`, `name`, and `arguments`
/// all on the same chunk, and thus the argument-fragment deltas were dropped
/// and the client saw `arguments: ""`.
///
/// The fix here merges by `index` across deltas: `id`, `type`, `function.name`
/// first-wins; `function.arguments` concatenated across fragments. This matches
/// the OpenAI streaming spec and vLLM/SGLang hermes emission:
///
/// * delta 1: `{index, id, type, function: { name }}`
/// * delta 2..N: `{index, function: { arguments: "<fragment>" }}`
fn merge_tool_call_chunk(
existing: &mut dynamo_protocols::types::ChatCompletionMessageToolCallChunk,
incoming: dynamo_protocols::types::ChatCompletionMessageToolCallChunk,
) {
if existing.id.is_none()
&& let Some(id) = incoming.id
{
existing.id = Some(id);
}
if existing.r#type.is_none()
&& let Some(ty) = incoming.r#type
{
existing.r#type = Some(ty);
}
let Some(incoming_fn) = incoming.function else {
return;
};
match &mut existing.function {
None => existing.function = Some(incoming_fn),
Some(existing_fn) => {
if existing_fn.name.is_none()
&& let Some(name) = incoming_fn.name
{
existing_fn.name = Some(name);
}
if let Some(args_fragment) = incoming_fn.arguments {
existing_fn
.arguments
.get_or_insert_with(String::new)
.push_str(&args_fragment);
}
}
}
}
/// Convert a fully merged chunk (post-merge accumulator state) to a finalized
/// `ChatCompletionMessageToolCall`. Returns `None` only if `id` or
/// `function.name` never arrived across any chunk — those are required by the
/// final OpenAI response schema. Missing `arguments` is legal (empty-args
/// tool calls) and becomes `""`. A warning is logged on drop so a producer
/// bug in upstream (e.g. vLLM / SGLang emitting fragments without ever
/// establishing the id+name opener) doesn't silently eat a tool call the
/// way the pre-fix code did.
fn finalize_merged_tool_chunk(
chunk: dynamo_protocols::types::ChatCompletionMessageToolCallChunk,
) -> Option<dynamo_protocols::types::ChatCompletionMessageToolCall> { ) -> Option<dynamo_protocols::types::ChatCompletionMessageToolCall> {
// Convert ChatCompletionMessageToolCallChunk to ChatCompletionMessageToolCall let index = chunk.index;
if let (Some(id), Some(function)) = (&chunk.id, &chunk.function) { let Some(id) = chunk.id else {
if let (Some(name), Some(arguments)) = (&function.name, &function.arguments) { tracing::warn!(
tool_call_index = index,
"dropping merged tool-call chunk: no `id` arrived across any delta"
);
return None;
};
let Some(function) = chunk.function else {
tracing::warn!(
tool_call_index = index,
tool_call_id = %id,
"dropping merged tool-call chunk: no `function` arrived across any delta"
);
return None;
};
let Some(name) = function.name else {
tracing::warn!(
tool_call_index = index,
tool_call_id = %id,
"dropping merged tool-call chunk: no `function.name` arrived across any delta"
);
return None;
};
Some(dynamo_protocols::types::ChatCompletionMessageToolCall { Some(dynamo_protocols::types::ChatCompletionMessageToolCall {
id: id.clone(), id,
r#type: dynamo_protocols::types::FunctionType::Function, // Use the merged r#type if the stream carried one. Falls back to
// `Function` — today the only variant in the OpenAI schema, but
// threading the merged value keeps us forward-compat if variants
// are added later and avoids dead state in `merge_tool_call_chunk`.
r#type: chunk
.r#type
.unwrap_or(dynamo_protocols::types::FunctionType::Function),
function: dynamo_protocols::types::FunctionCall { function: dynamo_protocols::types::FunctionCall {
name: name.clone(), name,
arguments: arguments.clone(), arguments: function.arguments.unwrap_or_default(),
}, },
}) })
} else {
None
}
} else {
None
}
} }
impl DeltaAggregator { impl DeltaAggregator {
...@@ -169,6 +256,7 @@ impl DeltaAggregator { ...@@ -169,6 +256,7 @@ impl DeltaAggregator {
finish_reason: None, finish_reason: None,
stop_reason: None, stop_reason: None,
logprobs: None, logprobs: None,
tool_call_chunks: BTreeMap::new(),
tool_calls: None, tool_calls: None,
reasoning_content: None, reasoning_content: None,
content_parts: Vec::new(), content_parts: Vec::new(),
...@@ -192,27 +280,25 @@ impl DeltaAggregator { ...@@ -192,27 +280,25 @@ impl DeltaAggregator {
.push_str(reasoning_content); .push_str(reasoning_content);
} }
// Since one tool call is one chunk, we don't need to aggregate them // #8640: streaming producers split a single tool call across
// We just need to convert the ChatCompletionMessageToolCallChunk to ChatCompletionMessageToolCall and append to the state_choice.tool_calls // multiple deltas (delta 1 = id + name; delta 2..N = argument
if let Some(tool_calls) = &choice.delta.tool_calls // fragments), so we merge chunks into a per-index accumulator
&& !tool_calls.is_empty() // here instead of treating each chunk as a complete tool call.
{ // Finalization to `tool_calls` happens after the fold.
// Convert ChatCompletionMessageToolCallChunk to ChatCompletionMessageToolCall if let Some(incoming_chunks) = choice.delta.tool_calls {
let converted_tool_calls: Vec< for chunk in incoming_chunks {
dynamo_protocols::types::ChatCompletionMessageToolCall, let entry = state_choice
> = tool_calls .tool_call_chunks
.iter() .entry(chunk.index)
.filter_map(convert_tool_chunk_to_message_tool_call) .or_insert_with(|| {
.collect(); dynamo_protocols::types::ChatCompletionMessageToolCallChunk {
index: chunk.index,
// Initialize and push the converted tool calls to state_choice.tool_calls id: None,
// Only set tool_calls to Some if there are actual tool calls r#type: None,
if !converted_tool_calls.is_empty() { function: None,
if let Some(existing_tool_calls) = &mut state_choice.tool_calls {
existing_tool_calls.extend(converted_tool_calls);
} else {
state_choice.tool_calls = Some(converted_tool_calls);
} }
});
merge_tool_call_chunk(entry, chunk);
} }
} }
...@@ -258,6 +344,28 @@ impl DeltaAggregator { ...@@ -258,6 +344,28 @@ impl DeltaAggregator {
return Err(error); return Err(error);
} }
// #8640: finalize the per-index tool-call chunk accumulator into the
// choice's `tool_calls` vector. Chunks missing id or name across the
// whole stream are dropped here (they're not a valid tool call in the
// final schema), but chunks missing only `arguments` get defaulted to
// "" — the old code dropped those entirely.
for choice in aggregator.choices.values_mut() {
if choice.tool_call_chunks.is_empty() {
continue;
}
let finalized: Vec<_> = std::mem::take(&mut choice.tool_call_chunks)
.into_values()
.filter_map(finalize_merged_tool_chunk)
.collect();
// choice.tool_calls is always None at this point: or_insert
// initializes it to None, try_tool_call_parse_aggregate runs
// strictly after this loop. Unconditional assign is the only
// reachable path; no merge-with-existing needed.
if !finalized.is_empty() {
choice.tool_calls = Some(finalized);
}
}
if let Some(parser) = parsing_options.tool_call_parser.as_deref() { if let Some(parser) = parsing_options.tool_call_parser.as_deref() {
for choice in aggregator.choices.values_mut() { for choice in aggregator.choices.values_mut() {
if choice if choice
...@@ -500,6 +608,257 @@ mod tests { ...@@ -500,6 +608,257 @@ mod tests {
} }
} }
/// Build a stream delta carrying a raw list of tool-call chunks (no content).
/// Used by multi-chunk tests that mimic vLLM hermes' streaming emission:
/// the first chunk carries `id` + `function.name` only, subsequent chunks
/// carry `function.arguments` fragments with neither `id` nor `name`.
fn create_test_delta_with_tool_chunks(
index: u32,
tool_chunks: Vec<dynamo_protocols::types::ChatCompletionMessageToolCallChunk>,
finish_reason: Option<dynamo_protocols::types::FinishReason>,
role: Option<dynamo_protocols::types::Role>,
) -> Annotated<NvCreateChatCompletionStreamResponse> {
#[allow(deprecated)]
let delta = dynamo_protocols::types::ChatCompletionStreamResponseDelta {
content: None,
function_call: None,
tool_calls: Some(tool_chunks),
role,
refusal: None,
reasoning_content: None,
};
let choice = dynamo_protocols::types::ChatChoiceStream {
index,
delta,
finish_reason,
stop_reason: None,
logprobs: None,
};
let data = NvCreateChatCompletionStreamResponse {
inner: dynamo_protocols::types::CreateChatCompletionStreamResponse {
id: "test_id".to_string(),
model: "meta/llama-3.1-8b-instruct".to_string(),
created: 1234567890,
service_tier: None,
usage: None,
system_fingerprint: None,
choices: vec![choice],
object: "chat.completion".to_string(),
},
nvext: None,
};
Annotated {
data: Some(data),
id: Some("test_id".to_string()),
event: None,
comment: None,
error: None,
}
}
/// Repro for [#8640](https://github.com/ai-dynamo/dynamo/issues/8640):
/// vLLM hermes (and any spec-compliant OpenAI tool-call streaming producer)
/// splits a single tool call across multiple deltas:
/// delta 1: `{index: 0, id: "tc1", type: function, function: {name: "get_weather"}}`
/// delta 2: `{index: 0, function: {arguments: "{\"city\":"}}`
/// delta 3: `{index: 0, function: {arguments: "\"Tokyo\"}"}}`
/// The aggregated non-stream response must reconstruct
/// `arguments = "{\"city\":\"Tokyo\"}"`. Before the fix,
/// `convert_tool_chunk_to_message_tool_call` requires id/name/arguments all
/// set on the *same* chunk and drops the argument-fragment chunks — so the
/// client sees `arguments: ""`.
#[tokio::test]
async fn test_issue_8640_split_tool_call_arguments_reconstructed() {
let name_chunk = dynamo_protocols::types::ChatCompletionMessageToolCallChunk {
index: 0,
id: Some("tc1".to_string()),
r#type: Some(dynamo_protocols::types::FunctionType::Function),
function: Some(dynamo_protocols::types::FunctionCallStream {
name: Some("get_weather".to_string()),
arguments: None,
}),
};
let args_chunk_1 = dynamo_protocols::types::ChatCompletionMessageToolCallChunk {
index: 0,
id: None,
r#type: None,
function: Some(dynamo_protocols::types::FunctionCallStream {
name: None,
arguments: Some("{\"city\":".to_string()),
}),
};
let args_chunk_2 = dynamo_protocols::types::ChatCompletionMessageToolCallChunk {
index: 0,
id: None,
r#type: None,
function: Some(dynamo_protocols::types::FunctionCallStream {
name: None,
arguments: Some("\"Tokyo\"}".to_string()),
}),
};
let deltas = vec![
create_test_delta_with_tool_chunks(
0,
vec![name_chunk],
None,
Some(dynamo_protocols::types::Role::Assistant),
),
create_test_delta_with_tool_chunks(0, vec![args_chunk_1], None, None),
create_test_delta_with_tool_chunks(
0,
vec![args_chunk_2],
Some(dynamo_protocols::types::FinishReason::ToolCalls),
None,
),
];
let stream = Box::pin(stream::iter(deltas));
let result = DeltaAggregator::apply(stream, ParsingOptions::default()).await;
assert!(result.is_ok(), "aggregation should not error");
let response = result.unwrap();
assert_eq!(response.inner.choices.len(), 1);
let choice = &response.inner.choices[0];
let tool_calls = choice
.message
.tool_calls
.as_ref()
.expect("tool_calls should be Some after aggregation");
assert_eq!(
tool_calls.len(),
1,
"must produce exactly one aggregated tool_call, got {}",
tool_calls.len()
);
let tc = &tool_calls[0];
assert_eq!(tc.id, "tc1");
assert_eq!(tc.function.name, "get_weather");
assert_eq!(
tc.function.arguments, "{\"city\":\"Tokyo\"}",
"#8640: arguments must be reconstructed from split fragments, \
not dropped (got {:?})",
tc.function.arguments
);
}
/// Two parallel tool calls (index=0 and index=1), their chunks interleaved
/// in emission order. Exercises that the per-index accumulator correctly
/// keeps the two calls separate — not just that split args get merged
/// within one call (which [`test_issue_8640_split_tool_call_arguments_reconstructed`]
/// already covers). Related: [#8636](https://github.com/ai-dynamo/dynamo/issues/8636)
/// is about the streaming path dropping the second call; the non-stream
/// aggregator now handles the parallel case too, and this test pins it.
#[tokio::test]
async fn test_parallel_tool_calls_interleaved_chunks_aggregate_independently() {
let make_name = |idx: u32, id: &str, name: &str| {
dynamo_protocols::types::ChatCompletionMessageToolCallChunk {
index: idx,
id: Some(id.to_string()),
r#type: Some(dynamo_protocols::types::FunctionType::Function),
function: Some(dynamo_protocols::types::FunctionCallStream {
name: Some(name.to_string()),
arguments: None,
}),
}
};
let make_args = |idx: u32, fragment: &str| {
dynamo_protocols::types::ChatCompletionMessageToolCallChunk {
index: idx,
id: None,
r#type: None,
function: Some(dynamo_protocols::types::FunctionCallStream {
name: None,
arguments: Some(fragment.to_string()),
}),
}
};
// Emission order mimics a hermes-style parser:
// open call 0 → open call 1 → args-frag-0a → args-frag-1a →
// args-frag-0b → args-frag-1b → finish
let deltas = vec![
create_test_delta_with_tool_chunks(
0,
vec![make_name(0, "tc0", "get_weather")],
None,
Some(dynamo_protocols::types::Role::Assistant),
),
create_test_delta_with_tool_chunks(
0,
vec![make_name(1, "tc1", "get_time")],
None,
None,
),
create_test_delta_with_tool_chunks(0, vec![make_args(0, "{\"city\":")], None, None),
create_test_delta_with_tool_chunks(0, vec![make_args(1, "{\"tz\":")], None, None),
create_test_delta_with_tool_chunks(0, vec![make_args(0, "\"Tokyo\"}")], None, None),
create_test_delta_with_tool_chunks(
0,
vec![make_args(1, "\"JST\"}")],
Some(dynamo_protocols::types::FinishReason::ToolCalls),
None,
),
];
let stream = Box::pin(stream::iter(deltas));
let response = DeltaAggregator::apply(stream, ParsingOptions::default())
.await
.expect("aggregation should succeed");
assert_eq!(response.inner.choices.len(), 1);
let tool_calls = response.inner.choices[0]
.message
.tool_calls
.as_ref()
.expect("tool_calls should be Some");
assert_eq!(tool_calls.len(), 2, "must produce both parallel tool calls");
// BTreeMap iteration is index-ordered, so [0] is tc0, [1] is tc1.
assert_eq!(tool_calls[0].id, "tc0");
assert_eq!(tool_calls[0].function.name, "get_weather");
assert_eq!(tool_calls[0].function.arguments, "{\"city\":\"Tokyo\"}");
assert_eq!(tool_calls[1].id, "tc1");
assert_eq!(tool_calls[1].function.name, "get_time");
assert_eq!(tool_calls[1].function.arguments, "{\"tz\":\"JST\"}");
}
/// When fragment-only chunks arrive but no id/name ever establishes the
/// call opener (producer bug), `finalize_merged_tool_chunk` drops the
/// chunk with a warn! log instead of emitting a malformed tool call.
/// This test just pins the "no panic, no phantom tool call" half — the
/// warn! is observable via tracing subscriber in prod, not asserted here.
#[tokio::test]
async fn test_fragment_only_chunks_without_opener_drop_cleanly() {
let args_only = dynamo_protocols::types::ChatCompletionMessageToolCallChunk {
index: 0,
id: None,
r#type: None,
function: Some(dynamo_protocols::types::FunctionCallStream {
name: None,
arguments: Some("{\"orphaned\":true}".to_string()),
}),
};
let deltas = vec![create_test_delta_with_tool_chunks(
0,
vec![args_only],
Some(dynamo_protocols::types::FinishReason::Stop),
Some(dynamo_protocols::types::Role::Assistant),
)];
let stream = Box::pin(stream::iter(deltas));
let response = DeltaAggregator::apply(stream, ParsingOptions::default())
.await
.expect("aggregation should succeed even with dropped chunk");
// Finalization only assigns `tool_calls` when the finalized vec is
// non-empty, so the strict post-condition here is `None`. Tight
// assertion catches a regression that flips to `Some(vec![])`.
assert!(
response.inner.choices[0].message.tool_calls.is_none(),
"orphaned fragment must not produce a tool call (got {:?})",
response.inner.choices[0].message.tool_calls,
);
}
#[tokio::test] #[tokio::test]
async fn test_empty_stream() { async fn test_empty_stream() {
// Create an empty stream // Create an empty stream
...@@ -1200,6 +1559,7 @@ mod tests { ...@@ -1200,6 +1559,7 @@ mod tests {
finish_reason: Some(dynamo_protocols::types::FinishReason::Stop), finish_reason: Some(dynamo_protocols::types::FinishReason::Stop),
stop_reason: None, stop_reason: None,
logprobs: None, logprobs: None,
tool_call_chunks: BTreeMap::new(),
tool_calls: None, tool_calls: None,
reasoning_content: Some("Analyzing the question.".to_string()), reasoning_content: Some("Analyzing the question.".to_string()),
content_parts: vec![], content_parts: vec![],
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment