Unverified Commit 5178a4a4 authored by MatejKosec's avatar MatejKosec Committed by GitHub
Browse files

feat: streaming tool call and reasoning dispatch SSE events (#7114)


Signed-off-by: default avatarMatej Kosec <mkosec@nvidia.com>
parent d16862ad
...@@ -73,6 +73,8 @@ class FrontendConfig(KvRouterConfigBase): ...@@ -73,6 +73,8 @@ class FrontendConfig(KvRouterConfigBase):
enable_anthropic_api: bool enable_anthropic_api: bool
strip_anthropic_preamble: bool strip_anthropic_preamble: bool
debug_perf: bool debug_perf: bool
enable_streaming_tool_dispatch: bool
enable_streaming_reasoning_dispatch: bool
preprocess_workers: int preprocess_workers: int
def validate(self) -> None: def validate(self) -> None:
...@@ -355,6 +357,30 @@ class FrontendArgGroup(ArgGroup): ...@@ -355,6 +357,30 @@ class FrontendArgGroup(ArgGroup):
"from the system prompt. Saves tokens and improves prompt caching." "from the system prompt. Saves tokens and improves prompt caching."
), ),
) )
add_negatable_bool_argument(
g,
flag_name="--enable-streaming-tool-dispatch",
env_var="DYN_ENABLE_STREAMING_TOOL_DISPATCH",
default=False,
help=(
"[EXPERIMENTAL] Enable streaming tool call dispatch. Emits "
"'event: tool_call_dispatch' SSE events on /v1/chat/completions "
"for each complete tool call before finish_reason arrives. "
"Can be combined with --enable-streaming-reasoning-dispatch."
),
)
add_negatable_bool_argument(
g,
flag_name="--enable-streaming-reasoning-dispatch",
env_var="DYN_ENABLE_STREAMING_REASONING_DISPATCH",
default=False,
help=(
"[EXPERIMENTAL] Enable streaming reasoning dispatch. Emits a "
"single 'event: reasoning_dispatch' SSE event on /v1/chat/completions "
"with the complete reasoning block once thinking ends. "
"Can be combined with --enable-streaming-tool-dispatch."
),
)
add_argument( add_argument(
g, g,
flag_name="--dyn-chat-processor", flag_name="--dyn-chat-processor",
......
...@@ -264,6 +264,16 @@ async def async_main(): ...@@ -264,6 +264,16 @@ async def async_main():
else: else:
os.environ.pop("DYN_STRIP_ANTHROPIC_PREAMBLE", None) os.environ.pop("DYN_STRIP_ANTHROPIC_PREAMBLE", None)
if config.enable_streaming_tool_dispatch:
os.environ["DYN_ENABLE_STREAMING_TOOL_DISPATCH"] = "1"
else:
os.environ.pop("DYN_ENABLE_STREAMING_TOOL_DISPATCH", None)
if config.enable_streaming_reasoning_dispatch:
os.environ["DYN_ENABLE_STREAMING_REASONING_DISPATCH"] = "1"
else:
os.environ.pop("DYN_ENABLE_STREAMING_REASONING_DISPATCH", None)
if config.chat_processor == "vllm": if config.chat_processor == "vllm":
assert ( assert (
vllm_flags is not None vllm_flags is not None
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use std::{ use std::{
collections::HashSet, collections::{HashMap, HashSet},
fmt::Display, fmt::Display,
sync::Arc, sync::Arc,
time::{SystemTime, UNIX_EPOCH}, time::{SystemTime, UNIX_EPOCH},
...@@ -17,7 +17,7 @@ use axum::{ ...@@ -17,7 +17,7 @@ use axum::{
middleware::{self, Next}, middleware::{self, Next},
response::{ response::{
IntoResponse, Response, IntoResponse, Response,
sse::{KeepAlive, Sse}, sse::{Event, KeepAlive, Sse},
}, },
routing::{get, post}, routing::{get, post},
}; };
...@@ -940,6 +940,104 @@ pub(super) async fn check_for_backend_error( ...@@ -940,6 +940,104 @@ pub(super) async fn check_for_backend_error(
} }
} }
/// Serialize `payload` and wrap it as an SSE event with the given name.
fn make_dispatch_event(
event_name: &str,
payload: &impl serde::Serialize,
) -> Option<Result<Event, axum::Error>> {
match serde_json::to_string(payload) {
Ok(json) => Some(Ok(Event::default().event(event_name).data(json))),
Err(e) => {
tracing::warn!("streaming_{event_name}: failed to serialize: {e}");
None
}
}
}
/// Emits early `event: tool_call_dispatch` SSE events for any complete tool calls found in a
/// streaming response chunk, when `DYN_ENABLE_STREAMING_TOOL_DISPATCH` is enabled.
///
/// Dynamo backends emit each tool call as a single complete chunk (id + name + arguments
/// all present), so we can dispatch immediately upon seeing the chunk rather than waiting
/// for `finish_reason="tool_calls"` to arrive. Each event payload includes `choice_index`
/// for correct disambiguation when `n > 1`.
fn streaming_tool_dispatch_events(
response: &crate::types::Annotated<NvCreateChatCompletionStreamResponse>,
dispatched_ids: &mut HashSet<String>,
) -> Vec<Result<Event, axum::Error>> {
let Some(data) = &response.data else {
return vec![];
};
let mut events = vec![];
for choice in &data.choices {
let Some(tool_calls) = &choice.delta.tool_calls else {
continue;
};
for chunk in tool_calls {
// Only dispatch when the tool call is fully formed (id + name + arguments)
let has_name_and_args = chunk
.function
.as_ref()
.is_some_and(|f| f.name.is_some() && f.arguments.is_some());
if let (true, Some(id)) = (has_name_and_args, &chunk.id) {
// Skip already-dispatched tool calls (dedup guard, matches
// the stopped/done flags in Anthropic/Responses converters).
if !dispatched_ids.insert(id.clone()) {
continue;
}
let payload = serde_json::json!({
"choice_index": choice.index,
"tool_call": chunk,
});
events.extend(make_dispatch_event("tool_call_dispatch", &payload));
}
}
}
events
}
/// Accumulates reasoning tokens and emits a single `event: reasoning_dispatch` SSE event
/// when the complete reasoning block has been decoded (i.e. when `reasoning_content`
/// transitions from `Some(token)` to `None`), matching the UX of `tool_call_dispatch`.
///
/// The buffer is maintained across chunks by the caller (captured in the flat_map closure).
/// Flushing also occurs when `finish_reason` is set, to handle max_tokens during reasoning.
fn accumulate_reasoning_dispatch(
response: &crate::types::Annotated<NvCreateChatCompletionStreamResponse>,
buffers: &mut HashMap<u32, String>,
) -> Vec<Result<Event, axum::Error>> {
let Some(data) = &response.data else {
return vec![];
};
let mut events = vec![];
for choice in &data.choices {
let buffer = buffers.entry(choice.index).or_default();
let has_reasoning = choice
.delta
.reasoning_content
.as_ref()
.is_some_and(|r| !r.is_empty());
if has_reasoning {
buffer.push_str(choice.delta.reasoning_content.as_ref().unwrap());
}
// Emit when reasoning transitions to None OR when the stream ends (finish_reason).
if !buffer.is_empty() && (!has_reasoning || choice.finish_reason.is_some()) {
let payload = serde_json::json!({
"index": choice.index,
"reasoning_content": buffer.as_str(),
});
events.extend(make_dispatch_event("reasoning_dispatch", &payload));
buffer.clear();
}
}
events
}
/// OpenAI Chat Completions Request Handler /// OpenAI Chat Completions Request Handler
/// ///
/// This method will handle the incoming request for the /v1/chat/completions endpoint. The endpoint is a "source" /// This method will handle the incoming request for the /v1/chat/completions endpoint. The endpoint is a "source"
...@@ -1073,20 +1171,46 @@ async fn chat_completions( ...@@ -1073,20 +1171,46 @@ async fn chat_completions(
stream_handle.arm(); // allows the system to detect client disconnects and cancel the LLM generation stream_handle.arm(); // allows the system to detect client disconnects and cancel the LLM generation
let mut http_queue_guard = Some(http_queue_guard); let mut http_queue_guard = Some(http_queue_guard);
let stream = stream let tool_dispatch_enabled = state.streaming_tool_dispatch_enabled();
.map(move |response| { let reasoning_dispatch_enabled = state.streaming_reasoning_dispatch_enabled();
// Calls observe_response() on each token let mut reasoning_buffer: HashMap<u32, String> = HashMap::new();
// EventConverter will detect `event: "error"` and convert to SSE error events let mut dispatched_tool_ids: HashSet<String> = HashSet::new();
process_response_using_event_converter_and_observe_metrics(
// flat_map lets us optionally prepend extra SSE events before each regular chunk:
// - `event: tool_call_dispatch` — complete tool call detected early (tool dispatch)
// - `event: reasoning_dispatch` — complete reasoning block (emitted once)
// When both flags are off the flat_map is equivalent to the original map + filter_map.
let stream = stream.flat_map(move |response| {
// Extract side-channel events before the response is consumed by EventConverter.
let mut events: Vec<Result<Event, axum::Error>> = vec![];
if tool_dispatch_enabled {
events.extend(streaming_tool_dispatch_events(
&response,
&mut dispatched_tool_ids,
));
}
if reasoning_dispatch_enabled {
events.extend(accumulate_reasoning_dispatch(
&response,
&mut reasoning_buffer,
));
}
// Convert to SSE event (this consumes the response).
// EventConverter will detect `event: "error"` and convert to SSE error events.
let sse_result = process_response_using_event_converter_and_observe_metrics(
EventConverter::from(response), EventConverter::from(response),
&mut response_collector, &mut response_collector,
&mut http_queue_guard, &mut http_queue_guard,
) );
})
.filter_map(|result| { // Side-channel events come first, then the regular data event.
use futures::future; match sse_result {
// Transpose Result<Option<T>> -> Option<Result<T>> Ok(Some(ev)) => events.push(Ok(ev)),
future::ready(result.transpose()) Ok(None) => {}
Err(e) => events.push(Err(e)),
}
stream::iter(events)
}); });
let stream = monitor_for_disconnects(stream, ctx, inflight_guard, stream_handle); let stream = monitor_for_disconnects(stream, ctx, inflight_guard, stream_handle);
...@@ -2891,4 +3015,702 @@ mod tests { ...@@ -2891,4 +3015,702 @@ mod tests {
ErrorType::NotImplemented ErrorType::NotImplemented
); );
} }
// ── streaming dispatch tests ──────────────────────────────────────
use std::collections::{HashMap, HashSet};
use dynamo_async_openai::types::{
ChatChoiceStream, ChatCompletionMessageToolCallChunk, ChatCompletionStreamResponseDelta,
ChatCompletionToolType, CreateChatCompletionStreamResponse, FinishReason,
FunctionCallStream,
};
use dynamo_runtime::protocols::annotated::Annotated;
/// Extract the JSON data payload from an SSE Event's Debug output.
///
/// `axum::response::sse::Event` doesn't expose its fields publicly and doesn't
/// implement `Display` (the wire format is only produced during response
/// serialization). The `Debug` representation includes the event name and data
/// string, so we parse it here.
///
/// WARNING: Coupled to axum's internal Debug format for `Event`. If an axum
/// upgrade changes the Debug output, these tests will break. Preferred over
/// spinning up an actual SSE stream for unit test simplicity.
fn extract_sse_data_json(event: &axum::response::sse::Event) -> serde_json::Value {
// The Event Debug format is:
// Event { buffer: b"event: <name>\ndata: <json>\n", flags: ... }
// We extract the JSON after "data: " and unescape the byte-string encoding.
let debug = format!("{:?}", event);
let data_marker = "data: ";
let after_data = debug
.find(data_marker)
.map(|p| p + data_marker.len())
.expect("no 'data: ' in Event debug output");
let rest = &debug[after_data..];
let json_start = rest.find('{').expect("no JSON object after data:");
let mut depth = 0i32;
let mut json_end = 0;
for (i, b) in rest[json_start..].bytes().enumerate() {
match b {
b'{' => depth += 1,
b'}' => {
depth -= 1;
if depth == 0 {
json_end = json_start + i + 1;
break;
}
}
_ => {}
}
}
let raw = &rest[json_start..json_end];
// Unescape byte-string Debug format:
// \\\\\" -> PLACEHOLDER (nested escaped quotes in JSON string values)
// \\\" -> " (structural quotes)
// Then restore: PLACEHOLDER -> \"
let s = raw
.replace("\\\\\\\"", "\x00NESTED\x00")
.replace("\\\"", "\"")
.replace("\x00NESTED\x00", "\\\"");
// Handle \\xHH byte sequences (non-ASCII in Debug byte-string format)
let mut result = Vec::new();
let sbytes = s.as_bytes();
let mut idx = 0;
while idx < sbytes.len() {
if idx + 3 < sbytes.len()
&& sbytes[idx] == b'\\'
&& sbytes[idx + 1] == b'x'
&& let Ok(v) = u8::from_str_radix(
std::str::from_utf8(&sbytes[idx + 2..idx + 4]).unwrap_or(""),
16,
)
{
result.push(v);
idx += 4;
continue;
}
result.push(sbytes[idx]);
idx += 1;
}
let final_str = String::from_utf8_lossy(&result);
serde_json::from_str(&final_str).unwrap_or_else(|e| {
panic!(
"failed to parse JSON from Event: {e}\nraw: {raw}\nunescaped: {s}\nfinal: {final_str}"
)
})
}
/// Assert that an SSE Event has the expected event type name.
/// Uses "event: <name>\n" pattern to avoid substring false-matches.
fn assert_event_type(event: &axum::response::sse::Event, expected: &str) {
let debug = format!("{:?}", event);
let pattern = format!("event: {expected}\\n");
assert!(
debug.contains(&pattern),
"expected event type '{expected}' not found in: {debug}"
);
}
/// Build a minimal Annotated<Response> with the given choices.
fn make_stream_response(
choices: Vec<ChatChoiceStream>,
) -> Annotated<NvCreateChatCompletionStreamResponse> {
let response = CreateChatCompletionStreamResponse {
id: "test-id".to_string(),
choices,
created: 0,
model: "test-model".to_string(),
system_fingerprint: None,
object: "chat.completion.chunk".to_string(),
usage: None,
service_tier: None,
nvext: None,
};
Annotated {
id: Some("test-id".to_string()),
data: Some(response),
event: None,
comment: None,
error: None,
}
}
fn make_choice_with_reasoning(
index: u32,
reasoning: Option<&str>,
finish: Option<FinishReason>,
) -> ChatChoiceStream {
#[allow(deprecated)]
ChatChoiceStream {
index,
delta: ChatCompletionStreamResponseDelta {
content: None,
function_call: None,
tool_calls: None,
role: None,
refusal: None,
reasoning_content: reasoning.map(|s| s.to_string()),
},
finish_reason: finish,
stop_reason: None,
logprobs: None,
}
}
fn make_choice_with_tool_call(
index: u32,
id: Option<&str>,
name: Option<&str>,
arguments: Option<&str>,
) -> ChatChoiceStream {
let tool_call = ChatCompletionMessageToolCallChunk {
index: 0,
id: id.map(|s| s.to_string()),
r#type: Some(ChatCompletionToolType::Function),
function: Some(FunctionCallStream {
name: name.map(|s| s.to_string()),
arguments: arguments.map(|s| s.to_string()),
}),
};
#[allow(deprecated)]
ChatChoiceStream {
index,
delta: ChatCompletionStreamResponseDelta {
content: None,
function_call: None,
tool_calls: Some(vec![tool_call]),
role: None,
refusal: None,
reasoning_content: None,
},
finish_reason: None,
stop_reason: None,
logprobs: None,
}
}
// ── streaming_tool_dispatch_events tests ──
#[test]
fn test_tool_dispatch_emits_event_for_complete_tool_call() {
let response = make_stream_response(vec![make_choice_with_tool_call(
0,
Some("call_123"),
Some("get_weather"),
Some(r#"{"city":"Paris"}"#),
)]);
let events = streaming_tool_dispatch_events(&response, &mut HashSet::new());
assert_eq!(events.len(), 1);
let event = events[0].as_ref().unwrap();
assert_event_type(event, "tool_call_dispatch");
let json = extract_sse_data_json(event);
assert_eq!(json["choice_index"], 0);
assert_eq!(json["tool_call"]["id"], "call_123");
assert_eq!(json["tool_call"]["function"]["name"], "get_weather");
assert_eq!(
json["tool_call"]["function"]["arguments"],
r#"{"city":"Paris"}"#
);
}
#[test]
fn test_tool_dispatch_skips_incomplete_tool_call_no_id() {
let response = make_stream_response(vec![make_choice_with_tool_call(
0,
None, // no id
Some("get_weather"),
Some(r#"{"city":"Paris"}"#),
)]);
let events = streaming_tool_dispatch_events(&response, &mut HashSet::new());
assert!(events.is_empty(), "should not dispatch without id");
}
#[test]
fn test_tool_dispatch_skips_incomplete_tool_call_no_name() {
let response = make_stream_response(vec![make_choice_with_tool_call(
0,
Some("call_123"),
None, // no name
Some(r#"{"city":"Paris"}"#),
)]);
let events = streaming_tool_dispatch_events(&response, &mut HashSet::new());
assert!(events.is_empty(), "should not dispatch without name");
}
#[test]
fn test_tool_dispatch_skips_incomplete_tool_call_no_arguments() {
let response = make_stream_response(vec![make_choice_with_tool_call(
0,
Some("call_123"),
Some("get_weather"),
None, // no arguments
)]);
let events = streaming_tool_dispatch_events(&response, &mut HashSet::new());
assert!(events.is_empty(), "should not dispatch without arguments");
}
#[test]
fn test_tool_dispatch_multiple_tool_calls() {
let tc1 = ChatCompletionMessageToolCallChunk {
index: 0,
id: Some("call_1".to_string()),
r#type: Some(ChatCompletionToolType::Function),
function: Some(FunctionCallStream {
name: Some("get_weather".to_string()),
arguments: Some(r#"{"city":"Paris"}"#.to_string()),
}),
};
let tc2 = ChatCompletionMessageToolCallChunk {
index: 1,
id: Some("call_2".to_string()),
r#type: Some(ChatCompletionToolType::Function),
function: Some(FunctionCallStream {
name: Some("get_time".to_string()),
arguments: Some(r#"{"tz":"UTC"}"#.to_string()),
}),
};
#[allow(deprecated)]
let choice = ChatChoiceStream {
index: 0,
delta: ChatCompletionStreamResponseDelta {
content: None,
function_call: None,
tool_calls: Some(vec![tc1, tc2]),
role: None,
refusal: None,
reasoning_content: None,
},
finish_reason: None,
stop_reason: None,
logprobs: None,
};
let response = make_stream_response(vec![choice]);
let events = streaming_tool_dispatch_events(&response, &mut HashSet::new());
assert_eq!(events.len(), 2, "should dispatch both tool calls");
// Verify each dispatched event has the correct tool call data
let json0 = extract_sse_data_json(events[0].as_ref().unwrap());
assert_eq!(json0["tool_call"]["id"], "call_1");
assert_eq!(json0["tool_call"]["function"]["name"], "get_weather");
let json1 = extract_sse_data_json(events[1].as_ref().unwrap());
assert_eq!(json1["tool_call"]["id"], "call_2");
assert_eq!(json1["tool_call"]["function"]["name"], "get_time");
}
#[test]
fn test_tool_dispatch_no_data() {
let response: Annotated<NvCreateChatCompletionStreamResponse> = Annotated {
id: Some("test".to_string()),
data: None,
event: None,
comment: None,
error: None,
};
let events = streaming_tool_dispatch_events(&response, &mut HashSet::new());
assert!(events.is_empty());
}
#[test]
fn test_tool_dispatch_empty_choices() {
let response = make_stream_response(vec![]);
let events = streaming_tool_dispatch_events(&response, &mut HashSet::new());
assert!(events.is_empty());
}
#[test]
fn test_tool_dispatch_mixed_complete_and_incomplete() {
// One complete tool call and one incomplete (missing arguments = streaming delta).
// Only the complete one should dispatch.
let complete = ChatCompletionMessageToolCallChunk {
index: 0,
id: Some("call_complete".to_string()),
r#type: Some(ChatCompletionToolType::Function),
function: Some(FunctionCallStream {
name: Some("get_weather".to_string()),
arguments: Some(r#"{"city":"Paris"}"#.to_string()),
}),
};
let incomplete = ChatCompletionMessageToolCallChunk {
index: 1,
id: Some("call_partial".to_string()),
r#type: Some(ChatCompletionToolType::Function),
function: Some(FunctionCallStream {
name: Some("search".to_string()),
arguments: None, // still streaming
}),
};
#[allow(deprecated)]
let choice = ChatChoiceStream {
index: 0,
delta: ChatCompletionStreamResponseDelta {
content: None,
function_call: None,
tool_calls: Some(vec![complete, incomplete]),
role: None,
refusal: None,
reasoning_content: None,
},
finish_reason: None,
stop_reason: None,
logprobs: None,
};
let response = make_stream_response(vec![choice]);
let events = streaming_tool_dispatch_events(&response, &mut HashSet::new());
assert_eq!(
events.len(),
1,
"only the complete tool call should dispatch"
);
let json = extract_sse_data_json(events[0].as_ref().unwrap());
assert_eq!(json["tool_call"]["id"], "call_complete");
}
#[test]
fn test_tool_dispatch_function_none() {
// Tool call chunk with function: None — should not dispatch and should not panic.
let tool_call = ChatCompletionMessageToolCallChunk {
index: 0,
id: Some("call_999".to_string()),
r#type: Some(ChatCompletionToolType::Function),
function: None,
};
#[allow(deprecated)]
let choice = ChatChoiceStream {
index: 0,
delta: ChatCompletionStreamResponseDelta {
content: None,
function_call: None,
tool_calls: Some(vec![tool_call]),
role: None,
refusal: None,
reasoning_content: None,
},
finish_reason: None,
stop_reason: None,
logprobs: None,
};
let response = make_stream_response(vec![choice]);
let events = streaming_tool_dispatch_events(&response, &mut HashSet::new());
assert!(events.is_empty(), "function: None should not dispatch");
}
#[test]
fn test_tool_dispatch_empty_arguments_still_dispatches() {
// arguments: Some("") is considered complete — intentional.
// Some backends emit empty-string arguments for parameterless tools.
let response = make_stream_response(vec![make_choice_with_tool_call(
0,
Some("call_empty"),
Some("no_params_tool"),
Some(""),
)]);
let events = streaming_tool_dispatch_events(&response, &mut HashSet::new());
assert_eq!(events.len(), 1, "empty arguments should still dispatch");
let json = extract_sse_data_json(events[0].as_ref().unwrap());
assert_eq!(json["tool_call"]["id"], "call_empty");
assert_eq!(json["tool_call"]["function"]["name"], "no_params_tool");
assert_eq!(json["tool_call"]["function"]["arguments"], "");
}
#[test]
fn test_tool_dispatch_n_greater_than_1_includes_choice_index() {
// Regression test: with n > 1, each choice should carry its own choice_index
// so clients can disambiguate which choice the tool call belongs to.
let choice_0 = make_choice_with_tool_call(
0,
Some("call_a"),
Some("get_weather"),
Some(r#"{"city":"Paris"}"#),
);
let choice_1 = make_choice_with_tool_call(
1,
Some("call_b"),
Some("get_time"),
Some(r#"{"tz":"UTC"}"#),
);
let response = make_stream_response(vec![choice_0, choice_1]);
let events = streaming_tool_dispatch_events(&response, &mut HashSet::new());
assert_eq!(events.len(), 2, "should dispatch from both choices");
let json0 = extract_sse_data_json(events[0].as_ref().unwrap());
assert_eq!(json0["choice_index"], 0);
assert_eq!(json0["tool_call"]["id"], "call_a");
let json1 = extract_sse_data_json(events[1].as_ref().unwrap());
assert_eq!(json1["choice_index"], 1);
assert_eq!(json1["tool_call"]["id"], "call_b");
}
#[test]
fn test_tool_dispatch_dedup_skips_already_dispatched_id() {
// Simulate a backend that sends the same complete tool call in two consecutive chunks.
// The HashSet should prevent the second dispatch.
let response = make_stream_response(vec![make_choice_with_tool_call(
0,
Some("call_dup"),
Some("get_weather"),
Some(r#"{"city":"Paris"}"#),
)]);
let mut dispatched = HashSet::new();
// First call — should dispatch
let events = streaming_tool_dispatch_events(&response, &mut dispatched);
assert_eq!(events.len(), 1);
// Second call with same response — should be deduped
let events = streaming_tool_dispatch_events(&response, &mut dispatched);
assert!(events.is_empty(), "duplicate id should not dispatch twice");
}
// ── accumulate_reasoning_dispatch tests ──
#[test]
fn test_reasoning_dispatch_accumulates_and_emits_once() {
let mut buffers: HashMap<u32, String> = HashMap::new();
// Chunk 1: reasoning token "Let me"
let r1 = make_stream_response(vec![make_choice_with_reasoning(0, Some("Let me"), None)]);
let events = accumulate_reasoning_dispatch(&r1, &mut buffers);
assert!(
events.is_empty(),
"should not emit yet — still accumulating"
);
assert_eq!(buffers.get(&0).map(|s| s.as_str()), Some("Let me"));
// Chunk 2: reasoning token " think"
let r2 = make_stream_response(vec![make_choice_with_reasoning(0, Some(" think"), None)]);
let events = accumulate_reasoning_dispatch(&r2, &mut buffers);
assert!(
events.is_empty(),
"should not emit yet — still accumulating"
);
assert_eq!(buffers.get(&0).map(|s| s.as_str()), Some("Let me think"));
// Chunk 3: reasoning ends (None), meaning normal content follows
let r3 = make_stream_response(vec![make_choice_with_reasoning(0, None, None)]);
let events = accumulate_reasoning_dispatch(&r3, &mut buffers);
assert_eq!(events.len(), 1, "should emit single reasoning_dispatch");
let event = events[0].as_ref().unwrap();
assert_event_type(event, "reasoning_dispatch");
let json = extract_sse_data_json(event);
assert_eq!(json["reasoning_content"], "Let me think");
assert_eq!(json["index"], 0);
// Buffer for choice 0 should be cleared (removed or empty)
assert!(
buffers.get(&0).is_none_or(|s| s.is_empty()),
"buffer should be cleared after emit"
);
}
#[test]
fn test_reasoning_dispatch_flushes_on_finish_reason() {
let mut buffers: HashMap<u32, String> = HashMap::new();
// Chunk 1: reasoning token
let r1 = make_stream_response(vec![make_choice_with_reasoning(
0,
Some("Thinking..."),
None,
)]);
accumulate_reasoning_dispatch(&r1, &mut buffers);
// Chunk 2: finish_reason=length while still in reasoning (max_tokens hit)
let r2 = make_stream_response(vec![make_choice_with_reasoning(
0,
Some(" more"),
Some(FinishReason::Length),
)]);
let events = accumulate_reasoning_dispatch(&r2, &mut buffers);
assert_eq!(events.len(), 1, "should flush on finish_reason");
let json = extract_sse_data_json(events[0].as_ref().unwrap());
assert_eq!(json["reasoning_content"], "Thinking... more");
}
#[test]
fn test_reasoning_dispatch_flushes_on_stop() {
let mut buffers: HashMap<u32, String> = HashMap::new();
// Chunk 1: reasoning token
let r1 = make_stream_response(vec![make_choice_with_reasoning(
0,
Some("Analysis complete"),
None,
)]);
accumulate_reasoning_dispatch(&r1, &mut buffers);
// Chunk 2: finish_reason=stop while still in reasoning
let r2 = make_stream_response(vec![make_choice_with_reasoning(
0,
Some("."),
Some(FinishReason::Stop),
)]);
let events = accumulate_reasoning_dispatch(&r2, &mut buffers);
assert_eq!(events.len(), 1, "should flush on FinishReason::Stop");
let json = extract_sse_data_json(events[0].as_ref().unwrap());
assert_eq!(json["reasoning_content"], "Analysis complete.");
}
#[test]
fn test_reasoning_dispatch_no_reasoning_no_event() {
let mut buffers: HashMap<u32, String> = HashMap::new();
// Chunk with no reasoning content at all
let r = make_stream_response(vec![make_choice_with_reasoning(0, None, None)]);
let events = accumulate_reasoning_dispatch(&r, &mut buffers);
assert!(events.is_empty(), "no reasoning content = no event");
}
#[test]
fn test_reasoning_dispatch_empty_string_not_accumulated() {
let mut buffers: HashMap<u32, String> = HashMap::new();
// Chunk with empty string reasoning (treated as no-reasoning)
let r = make_stream_response(vec![make_choice_with_reasoning(0, Some(""), None)]);
let events = accumulate_reasoning_dispatch(&r, &mut buffers);
assert!(events.is_empty());
assert!(
buffers.get(&0).is_none_or(|s| s.is_empty()),
"empty string should not accumulate"
);
}
#[test]
fn test_reasoning_dispatch_no_data() {
let mut buffers: HashMap<u32, String> = HashMap::new();
let response: Annotated<NvCreateChatCompletionStreamResponse> = Annotated {
id: Some("test".to_string()),
data: None,
event: None,
comment: None,
error: None,
};
let events = accumulate_reasoning_dispatch(&response, &mut buffers);
assert!(events.is_empty());
}
#[test]
fn test_reasoning_dispatch_empty_choices() {
let mut buffers: HashMap<u32, String> = HashMap::new();
let response = make_stream_response(vec![]);
let events = accumulate_reasoning_dispatch(&response, &mut buffers);
assert!(events.is_empty());
}
#[test]
fn test_reasoning_dispatch_multi_choice_independent_buffers() {
let mut buffers: HashMap<u32, String> = HashMap::new();
// Both choices emit reasoning in same chunk
let r1 = make_stream_response(vec![
make_choice_with_reasoning(0, Some("Thinking A"), None),
make_choice_with_reasoning(1, Some("Thinking B"), None),
]);
let events = accumulate_reasoning_dispatch(&r1, &mut buffers);
assert!(events.is_empty(), "both still accumulating");
assert_eq!(buffers.get(&0).map(|s| s.as_str()), Some("Thinking A"));
assert_eq!(buffers.get(&1).map(|s| s.as_str()), Some("Thinking B"));
// Choice 0 stops reasoning, choice 1 continues
let r2 = make_stream_response(vec![
make_choice_with_reasoning(0, None, None),
make_choice_with_reasoning(1, Some(" more"), None),
]);
let events = accumulate_reasoning_dispatch(&r2, &mut buffers);
assert_eq!(events.len(), 1, "only choice 0 should emit");
let json = extract_sse_data_json(events[0].as_ref().unwrap());
assert_eq!(json["reasoning_content"], "Thinking A");
assert_eq!(json["index"], 0);
// Choice 1 stops reasoning
let r3 = make_stream_response(vec![make_choice_with_reasoning(1, None, None)]);
let events = accumulate_reasoning_dispatch(&r3, &mut buffers);
assert_eq!(events.len(), 1, "choice 1 should emit");
let json = extract_sse_data_json(events[0].as_ref().unwrap());
assert_eq!(json["reasoning_content"], "Thinking B more");
assert_eq!(json["index"], 1);
}
#[test]
fn test_reasoning_dispatch_multiple_blocks() {
// Reasoning -> emit -> more reasoning -> emit again.
// Verifies that after the buffer is cleared, a new reasoning block
// accumulates independently.
let mut buffers: HashMap<u32, String> = HashMap::new();
// First reasoning block
let r1 = make_stream_response(vec![make_choice_with_reasoning(0, Some("First"), None)]);
accumulate_reasoning_dispatch(&r1, &mut buffers);
let r2 = make_stream_response(vec![make_choice_with_reasoning(0, None, None)]);
let events = accumulate_reasoning_dispatch(&r2, &mut buffers);
assert_eq!(events.len(), 1);
let json = extract_sse_data_json(events[0].as_ref().unwrap());
assert_eq!(json["reasoning_content"], "First");
// Second reasoning block — buffer was cleared, should accumulate fresh
let r3 = make_stream_response(vec![make_choice_with_reasoning(0, Some("Second"), None)]);
accumulate_reasoning_dispatch(&r3, &mut buffers);
let r4 = make_stream_response(vec![make_choice_with_reasoning(0, None, None)]);
let events = accumulate_reasoning_dispatch(&r4, &mut buffers);
assert_eq!(events.len(), 1);
let json = extract_sse_data_json(events[0].as_ref().unwrap());
assert_eq!(
json["reasoning_content"], "Second",
"second emit should only contain second block's content"
);
}
#[test]
fn test_reasoning_dispatch_unicode() {
// Verify that CJK characters and emoji survive the JSON roundtrip.
let mut buffers: HashMap<u32, String> = HashMap::new();
let r1 = make_stream_response(vec![make_choice_with_reasoning(
0,
Some("让我想想 🤔"),
None,
)]);
accumulate_reasoning_dispatch(&r1, &mut buffers);
let r2 = make_stream_response(vec![make_choice_with_reasoning(
0,
Some(" 分析完成 ✅"),
None,
)]);
accumulate_reasoning_dispatch(&r2, &mut buffers);
let r3 = make_stream_response(vec![make_choice_with_reasoning(0, None, None)]);
let events = accumulate_reasoning_dispatch(&r3, &mut buffers);
assert_eq!(events.len(), 1);
let json = extract_sse_data_json(events[0].as_ref().unwrap());
assert_eq!(json["reasoning_content"], "让我想想 🤔 分析完成 ✅");
}
} }
...@@ -154,6 +154,26 @@ impl State { ...@@ -154,6 +154,26 @@ impl State {
pub fn sse_keep_alive(&self) -> Option<Duration> { pub fn sse_keep_alive(&self) -> Option<Duration> {
None None
} }
/// Returns true if streaming tool call dispatch is enabled via
/// [`env_llm::DYN_ENABLE_STREAMING_TOOL_DISPATCH`].
///
/// When enabled, the chat completions streaming path emits `event: tool_call_dispatch`
/// SSE events for each complete tool call, letting clients start processing tool calls
/// before `finish_reason="tool_calls"` arrives.
pub fn streaming_tool_dispatch_enabled(&self) -> bool {
env_is_truthy(env_llm::DYN_ENABLE_STREAMING_TOOL_DISPATCH)
}
/// Returns true if streaming reasoning dispatch is enabled via
/// [`env_llm::DYN_ENABLE_STREAMING_REASONING_DISPATCH`].
///
/// When enabled, the chat completions streaming path accumulates reasoning tokens and
/// emits a single `event: reasoning_dispatch` SSE event with the complete reasoning
/// block once thinking ends (DeepSeek-R1, Qwen3, etc.).
pub fn streaming_reasoning_dispatch_enabled(&self) -> bool {
env_is_truthy(env_llm::DYN_ENABLE_STREAMING_REASONING_DISPATCH)
}
} }
#[derive(Clone)] #[derive(Clone)]
......
...@@ -50,6 +50,9 @@ struct ToolCallState { ...@@ -50,6 +50,9 @@ struct ToolCallState {
accumulated_args: String, accumulated_args: String,
block_index: u32, block_index: u32,
started: bool, started: bool,
/// Set when `content_block_stop` has already been emitted inline
/// (complete tool call detected mid-stream). Prevents duplicate stop in `emit_end_events()`.
stopped: bool,
} }
impl AnthropicStreamConverter { impl AnthropicStreamConverter {
...@@ -261,6 +264,7 @@ impl AnthropicStreamConverter { ...@@ -261,6 +264,7 @@ impl AnthropicStreamConverter {
accumulated_args: String::new(), accumulated_args: String::new(),
block_index, block_index,
started: false, started: false,
stopped: false,
}); });
} }
...@@ -313,6 +317,20 @@ impl AnthropicStreamConverter { ...@@ -313,6 +317,20 @@ impl AnthropicStreamConverter {
}, },
}; };
events.push(make_sse_event("content_block_delta", &block_delta)); events.push(make_sse_event("content_block_delta", &block_delta));
// Emit content_block_stop immediately if the tool call arrived
// complete in a single chunk (id + name + args all present).
// Dynamo backends emit complete tool calls, so this fires on the
// same chunk — no need to wait for finish_reason.
if tc.id.is_some()
&& func.name.is_some()
&& !self.tool_call_states[tc_index].stopped
{
self.tool_call_states[tc_index].stopped = true;
let block_stop =
AnthropicStreamEvent::ContentBlockStop { index: block_index };
events.push(make_sse_event("content_block_stop", &block_stop));
}
} }
} }
} }
...@@ -350,9 +368,9 @@ impl AnthropicStreamConverter { ...@@ -350,9 +368,9 @@ impl AnthropicStreamConverter {
events.push(make_sse_event("content_block_stop", &block_stop)); events.push(make_sse_event("content_block_stop", &block_stop));
} }
// Close tool call blocks // Close tool call blocks (skip any already stopped inline)
for tc in &self.tool_call_states { for tc in &self.tool_call_states {
if tc.started { if tc.started && !tc.stopped {
let block_stop = AnthropicStreamEvent::ContentBlockStop { let block_stop = AnthropicStreamEvent::ContentBlockStop {
index: tc.block_index, index: tc.block_index,
}; };
...@@ -569,6 +587,7 @@ impl AnthropicStreamConverter { ...@@ -569,6 +587,7 @@ impl AnthropicStreamConverter {
accumulated_args: String::new(), accumulated_args: String::new(),
block_index, block_index,
started: false, started: false,
stopped: false,
}); });
} }
if let Some(id) = &tc.id { if let Some(id) = &tc.id {
...@@ -611,6 +630,20 @@ impl AnthropicStreamConverter { ...@@ -611,6 +630,20 @@ impl AnthropicStreamConverter {
}, },
}; };
events.push(make_tagged_event("content_block_delta", &ev)); events.push(make_tagged_event("content_block_delta", &ev));
// Emit content_block_stop immediately if the tool call arrived
// complete in a single chunk (id + name + args all present).
// Dynamo backends emit complete tool calls, so this fires on the
// same chunk — no need to wait for finish_reason.
if tc.id.is_some()
&& func.name.is_some()
&& !self.tool_call_states[tc_index].stopped
{
self.tool_call_states[tc_index].stopped = true;
let ev =
AnthropicStreamEvent::ContentBlockStop { index: block_index };
events.push(make_tagged_event("content_block_stop", &ev));
}
} }
} }
} }
...@@ -647,8 +680,9 @@ impl AnthropicStreamConverter { ...@@ -647,8 +680,9 @@ impl AnthropicStreamConverter {
events.push(make_tagged_event("content_block_stop", &ev)); events.push(make_tagged_event("content_block_stop", &ev));
} }
// Skip already-stopped tool call blocks
for tc in &self.tool_call_states { for tc in &self.tool_call_states {
if tc.started { if tc.started && !tc.stopped {
let ev = AnthropicStreamEvent::ContentBlockStop { let ev = AnthropicStreamEvent::ContentBlockStop {
index: tc.block_index, index: tc.block_index,
}; };
...@@ -788,9 +822,10 @@ mod tests { ...@@ -788,9 +822,10 @@ mod tests {
vec![ vec![
"content_block_stop", "content_block_stop",
"content_block_start", "content_block_start",
"content_block_delta" "content_block_delta",
"content_block_stop",
], ],
"text block must be closed before tool block starts" "text block must be closed before tool block starts; complete tool call stopped inline"
); );
// Verify indices: stop=0 (text), start=1 (tool) // Verify indices: stop=0 (text), start=1 (tool)
...@@ -814,17 +849,13 @@ mod tests { ...@@ -814,17 +849,13 @@ mod tests {
other => panic!("expected ContentBlockStart, got {other:?}"), other => panic!("expected ContentBlockStart, got {other:?}"),
} }
// End events should NOT duplicate the text block stop // End events should NOT duplicate either stop (both already emitted inline)
let end_events = conv.emit_end_events_tagged(); let end_events = conv.emit_end_events_tagged();
assert_eq!( assert_eq!(
event_types(&end_events), event_types(&end_events),
vec!["content_block_stop", "message_delta", "message_stop"], vec!["message_delta", "message_stop"],
"only tool block stop in end events (text already closed)" "no block stops in end events (both text and tool already closed inline)"
); );
match &end_events[0].data {
AnthropicStreamEvent::ContentBlockStop { index } => assert_eq!(*index, 1),
other => panic!("expected tool stop at index 1, got {other:?}"),
}
} }
/// Tool-only response (no preceding text): no spurious stop events. /// Tool-only response (no preceding text): no spurious stop events.
...@@ -840,13 +871,19 @@ mod tests { ...@@ -840,13 +871,19 @@ mod tests {
)); ));
assert_eq!( assert_eq!(
event_types(&tool_events), event_types(&tool_events),
vec!["content_block_start", "content_block_delta"] vec![
"content_block_start",
"content_block_delta",
"content_block_stop"
],
"complete tool call emits stop inline"
); );
let end_events = conv.emit_end_events_tagged(); let end_events = conv.emit_end_events_tagged();
assert_eq!( assert_eq!(
event_types(&end_events), event_types(&end_events),
vec!["content_block_stop", "message_delta", "message_stop"] vec!["message_delta", "message_stop"],
"no block stop in end events (already stopped inline)"
); );
} }
...@@ -937,7 +974,9 @@ mod tests { ...@@ -937,7 +974,9 @@ mod tests {
AnthropicStreamEvent::ContentBlockStart { index: 1, .. } AnthropicStreamEvent::ContentBlockStart { index: 1, .. }
)); ));
// 3. Tool call → text block closes, tool block opens at index 2 // 3. Tool call → text block closes, tool block opens at index 2.
// Because the tool call arrives complete (id + name + args in one
// chunk), inline dispatch also emits content_block_stop immediately.
let ev = conv.process_chunk_tagged(&tool_call_chunk( let ev = conv.process_chunk_tagged(&tool_call_chunk(
0, 0,
Some("call-1"), Some("call-1"),
...@@ -949,7 +988,8 @@ mod tests { ...@@ -949,7 +988,8 @@ mod tests {
vec![ vec![
"content_block_stop", "content_block_stop",
"content_block_start", "content_block_start",
"content_block_delta" "content_block_delta",
"content_block_stop"
] ]
); );
assert!(matches!( assert!(matches!(
...@@ -979,4 +1019,50 @@ mod tests { ...@@ -979,4 +1019,50 @@ mod tests {
] ]
); );
} }
/// Multiple tool calls: each gets inline content_block_stop.
#[test]
fn test_multiple_tool_calls_each_stopped_inline() {
let mut conv = AnthropicStreamConverter::new("test-model".into());
let events1 = conv.process_chunk_tagged(&tool_call_chunk(
0,
Some("call-1"),
Some("Read"),
Some("{\"path\":\"/tmp/a.txt\"}"),
));
assert_eq!(
event_types(&events1),
vec![
"content_block_start",
"content_block_delta",
"content_block_stop"
],
"first tool call closed inline"
);
let events2 = conv.process_chunk_tagged(&tool_call_chunk(
1,
Some("call-2"),
Some("Write"),
Some("{\"path\":\"/tmp/b.txt\"}"),
));
assert_eq!(
event_types(&events2),
vec![
"content_block_start",
"content_block_delta",
"content_block_stop"
],
"second tool call closed inline"
);
// End events: no block stops (both already closed)
let end_events = conv.emit_end_events_tagged();
assert_eq!(
event_types(&end_events),
vec!["message_delta", "message_stop"],
"no block stops in end events"
);
}
} }
...@@ -56,6 +56,9 @@ struct FunctionCallState { ...@@ -56,6 +56,9 @@ struct FunctionCallState {
accumulated_args: String, accumulated_args: String,
output_index: u32, output_index: u32,
started: bool, started: bool,
/// Set when done/item_done events have already been emitted inline
/// (complete tool call detected mid-stream). Prevents duplicate in `emit_end_events()`.
done: bool,
} }
impl ResponseStreamConverter { impl ResponseStreamConverter {
...@@ -284,6 +287,7 @@ impl ResponseStreamConverter { ...@@ -284,6 +287,7 @@ impl ResponseStreamConverter {
accumulated_args: String::new(), accumulated_args: String::new(),
output_index, output_index,
started: false, started: false,
done: false,
}); });
} }
...@@ -323,19 +327,67 @@ impl ResponseStreamConverter { ...@@ -323,19 +327,67 @@ impl ResponseStreamConverter {
self.function_call_items[tc_index] self.function_call_items[tc_index]
.accumulated_args .accumulated_args
.push_str(args); .push_str(args);
let item_id = self.function_call_items[tc_index].item_id.clone();
let output_index = self.function_call_items[tc_index].output_index; let output_index = self.function_call_items[tc_index].output_index;
let is_complete = tc.id.is_some()
&& func.name.is_some()
&& !self.function_call_items[tc_index].done;
// Clone item_id once; reused by both args_delta and (if complete) done events.
let item_id = self.function_call_items[tc_index].item_id.clone();
let seq = self.next_seq(); let seq = self.next_seq();
let args_delta = let args_delta =
ResponseStreamEvent::ResponseFunctionCallArgumentsDelta( ResponseStreamEvent::ResponseFunctionCallArgumentsDelta(
ResponseFunctionCallArgumentsDeltaEvent { ResponseFunctionCallArgumentsDeltaEvent {
sequence_number: seq, sequence_number: seq,
item_id, item_id: item_id.clone(),
output_index, output_index,
delta: args.clone(), delta: args.clone(),
}, },
); );
events.push(make_sse_event(&args_delta)); events.push(make_sse_event(&args_delta));
// Emit done + output_item.done immediately if the tool call
// arrived complete in a single chunk (id + name + args all present).
// Dynamo backends emit complete tool calls, so this fires on the
// same chunk — no need to wait for finish_reason.
if is_complete {
self.function_call_items[tc_index].done = true;
// Reuse item_id from above; capture remaining values before self.next_seq()
let fc_item_id = item_id;
let fc_call_id = self.function_call_items[tc_index].call_id.clone();
let fc_name = self.function_call_items[tc_index].name.clone();
let fc_args =
self.function_call_items[tc_index].accumulated_args.clone();
let fc_output_index =
self.function_call_items[tc_index].output_index;
let args_done =
ResponseStreamEvent::ResponseFunctionCallArgumentsDone(
ResponseFunctionCallArgumentsDoneEvent {
sequence_number: self.next_seq(),
item_id: fc_item_id.clone(),
output_index: fc_output_index,
arguments: fc_args.clone(),
name: Some(fc_name.clone()),
},
);
events.push(make_sse_event(&args_done));
let item_done = ResponseStreamEvent::ResponseOutputItemDone(
ResponseOutputItemDoneEvent {
sequence_number: self.next_seq(),
output_index: fc_output_index,
item: OutputItem::FunctionCall(FunctionToolCall {
id: Some(fc_item_id),
call_id: fc_call_id,
name: fc_name,
arguments: fc_args,
status: Some(OutputStatus::Completed),
}),
},
);
events.push(make_sse_event(&item_done));
}
} }
} }
} }
...@@ -393,11 +445,11 @@ impl ResponseStreamConverter { ...@@ -393,11 +445,11 @@ impl ResponseStreamConverter {
events.push(make_sse_event(&item_done)); events.push(make_sse_event(&item_done));
} }
// Close any function call items - collect data first to avoid borrow conflicts // Close any function call items not already done inline
let fc_data: Vec<_> = self let fc_data: Vec<_> = self
.function_call_items .function_call_items
.iter() .iter()
.filter(|fc| fc.started) .filter(|fc| fc.started && !fc.done)
.map(|fc| { .map(|fc| {
( (
fc.item_id.clone(), fc.item_id.clone(),
...@@ -598,3 +650,262 @@ fn get_event_type(event: &ResponseStreamEvent) -> &'static str { ...@@ -598,3 +650,262 @@ fn get_event_type(event: &ResponseStreamEvent) -> &'static str {
ResponseStreamEvent::ResponseError(_) => "error", ResponseStreamEvent::ResponseError(_) => "error",
} }
} }
#[cfg(test)]
mod tests {
use super::*;
use dynamo_async_openai::types::{
ChatChoiceStream, ChatCompletionMessageContent, ChatCompletionMessageToolCallChunk,
ChatCompletionStreamResponseDelta, ChatCompletionToolType, FunctionCallStream,
};
fn default_params() -> ResponseParams {
ResponseParams {
model: None,
temperature: None,
top_p: None,
max_output_tokens: None,
store: None,
tools: None,
tool_choice: None,
instructions: None,
reasoning: None,
text: None,
service_tier: None,
include: None,
truncation: None,
}
}
fn tool_call_chunk(
tc_index: u32,
id: Option<&str>,
name: Option<&str>,
args: Option<&str>,
) -> NvCreateChatCompletionStreamResponse {
#[allow(deprecated)]
NvCreateChatCompletionStreamResponse {
id: "chat-1".into(),
choices: vec![ChatChoiceStream {
index: 0,
delta: ChatCompletionStreamResponseDelta {
content: None,
function_call: None,
tool_calls: Some(vec![ChatCompletionMessageToolCallChunk {
index: tc_index,
id: id.map(String::from),
r#type: Some(ChatCompletionToolType::Function),
function: Some(FunctionCallStream {
name: name.map(String::from),
arguments: args.map(String::from),
}),
}]),
role: None,
refusal: None,
reasoning_content: None,
},
finish_reason: None,
stop_reason: None,
logprobs: None,
}],
created: 0,
model: "test".into(),
service_tier: None,
system_fingerprint: None,
object: "chat.completion.chunk".into(),
usage: None,
nvext: None,
}
}
fn text_chunk(text: &str) -> NvCreateChatCompletionStreamResponse {
#[allow(deprecated)]
NvCreateChatCompletionStreamResponse {
id: "chat-1".into(),
choices: vec![ChatChoiceStream {
index: 0,
delta: ChatCompletionStreamResponseDelta {
content: Some(ChatCompletionMessageContent::Text(text.into())),
function_call: None,
tool_calls: None,
role: None,
refusal: None,
reasoning_content: None,
},
finish_reason: None,
stop_reason: None,
logprobs: None,
}],
created: 0,
model: "test".into(),
service_tier: None,
system_fingerprint: None,
object: "chat.completion.chunk".into(),
usage: None,
nvext: None,
}
}
/// Extract the SSE event type from a Result<Event, _>.
fn event_type(event: &Result<Event, anyhow::Error>) -> String {
let debug = format!("{:?}", event.as_ref().unwrap());
// Event debug format: Event { ... event: "response.xxx" ... }
// Parse the event type from the serialized SSE data
if let Some(start) = debug.find("event: ") {
let rest = &debug[start + 7..];
if let Some(end) = rest.find("\\n") {
return rest[..end].to_string();
}
}
"unknown".to_string()
}
fn event_types(events: &[Result<Event, anyhow::Error>]) -> Vec<String> {
events.iter().map(event_type).collect()
}
/// Complete tool call emits function_call_arguments.done + output_item.done inline.
#[test]
fn test_complete_tool_call_emits_done_inline() {
let mut conv = ResponseStreamConverter::new("test-model".into(), default_params());
let _ = conv.emit_start_events(); // consume start events
let events = conv.process_chunk(&tool_call_chunk(
0,
Some("call-1"),
Some("get_weather"),
Some("{\"city\":\"SF\"}"),
));
let types = event_types(&events);
assert!(
types.contains(&"response.output_item.added".to_string()),
"should emit output_item.added: {types:?}"
);
assert!(
types.contains(&"response.function_call_arguments.delta".to_string()),
"should emit args delta: {types:?}"
);
assert!(
types.contains(&"response.function_call_arguments.done".to_string()),
"should emit args done inline: {types:?}"
);
assert!(
types.contains(&"response.output_item.done".to_string()),
"should emit output_item.done inline: {types:?}"
);
// End events should NOT duplicate the done events
let end_types = event_types(&conv.emit_end_events());
assert!(
!end_types.contains(&"response.function_call_arguments.done".to_string()),
"done should not be duplicated in end events: {end_types:?}"
);
assert!(
!end_types.contains(&"response.output_item.done".to_string())
|| end_types
.iter()
.filter(|t| *t == "response.output_item.done")
.count()
== 0,
"output_item.done for the tool should not appear in end events"
);
}
/// Multiple tool calls each get their own inline done events.
#[test]
fn test_multiple_tool_calls_each_emit_done_inline() {
let mut conv = ResponseStreamConverter::new("test-model".into(), default_params());
let _ = conv.emit_start_events();
let events1 = conv.process_chunk(&tool_call_chunk(
0,
Some("call-1"),
Some("get_weather"),
Some("{\"city\":\"SF\"}"),
));
let types1 = event_types(&events1);
assert!(
types1.contains(&"response.function_call_arguments.done".to_string()),
"first tool call done inline: {types1:?}"
);
let events2 = conv.process_chunk(&tool_call_chunk(
1,
Some("call-2"),
Some("get_time"),
Some("{\"tz\":\"PST\"}"),
));
let types2 = event_types(&events2);
assert!(
types2.contains(&"response.function_call_arguments.done".to_string()),
"second tool call done inline: {types2:?}"
);
// End events should have no function call done events
let end_types = event_types(&conv.emit_end_events());
let fc_done_count = end_types
.iter()
.filter(|t| *t == "response.function_call_arguments.done")
.count();
assert_eq!(
fc_done_count, 0,
"no function_call_arguments.done in end events: {end_types:?}"
);
}
/// Text-only response: no tool-related events at all.
#[test]
fn test_text_only_response_no_tool_events() {
let mut conv = ResponseStreamConverter::new("test-model".into(), default_params());
let _ = conv.emit_start_events();
let events = conv.process_chunk(&text_chunk("Hello world"));
let types = event_types(&events);
assert!(
!types.contains(&"response.function_call_arguments.done".to_string()),
"no tool events in text-only: {types:?}"
);
let end_events = conv.emit_end_events();
let end_types = event_types(&end_events);
assert!(
end_types.contains(&"response.output_text.done".to_string()),
"text done in end events: {end_types:?}"
);
assert!(
end_types.contains(&"response.completed".to_string()),
"completed in end events: {end_types:?}"
);
}
/// Text followed by tool call: both handled correctly.
#[test]
fn test_text_then_tool_call() {
let mut conv = ResponseStreamConverter::new("test-model".into(), default_params());
let _ = conv.emit_start_events();
let text_events = conv.process_chunk(&text_chunk("Let me check that."));
let text_types = event_types(&text_events);
assert!(
text_types.contains(&"response.output_item.added".to_string()),
"text message started: {text_types:?}"
);
let tool_events = conv.process_chunk(&tool_call_chunk(
0,
Some("call-1"),
Some("search"),
Some("{\"q\":\"rust\"}"),
));
let tool_types = event_types(&tool_events);
assert!(
tool_types.contains(&"response.function_call_arguments.done".to_string()),
"tool call done inline after text: {tool_types:?}"
);
assert!(
tool_types.contains(&"response.output_item.done".to_string()),
"output_item.done inline after text: {tool_types:?}"
);
}
}
...@@ -285,6 +285,13 @@ pub mod llm { ...@@ -285,6 +285,13 @@ pub mod llm {
/// varies per session and per release, wasting tokens and breaking prompt caching. /// varies per session and per release, wasting tokens and breaking prompt caching.
pub const DYN_STRIP_ANTHROPIC_PREAMBLE: &str = "DYN_STRIP_ANTHROPIC_PREAMBLE"; pub const DYN_STRIP_ANTHROPIC_PREAMBLE: &str = "DYN_STRIP_ANTHROPIC_PREAMBLE";
/// Enable streaming tool call dispatch (`event: tool_call_dispatch` SSE events)
pub const DYN_ENABLE_STREAMING_TOOL_DISPATCH: &str = "DYN_ENABLE_STREAMING_TOOL_DISPATCH";
/// Enable streaming reasoning dispatch (`event: reasoning_dispatch` SSE events)
pub const DYN_ENABLE_STREAMING_REASONING_DISPATCH: &str =
"DYN_ENABLE_STREAMING_REASONING_DISPATCH";
/// Metrics configuration /// Metrics configuration
pub mod metrics { pub mod metrics {
/// Custom metrics prefix (overrides default "dynamo_frontend") /// Custom metrics prefix (overrides default "dynamo_frontend")
...@@ -464,6 +471,8 @@ mod tests { ...@@ -464,6 +471,8 @@ mod tests {
llm::DYN_LORA_PATH, llm::DYN_LORA_PATH,
llm::DYN_ENABLE_ANTHROPIC_API, llm::DYN_ENABLE_ANTHROPIC_API,
llm::DYN_STRIP_ANTHROPIC_PREAMBLE, llm::DYN_STRIP_ANTHROPIC_PREAMBLE,
llm::DYN_ENABLE_STREAMING_TOOL_DISPATCH,
llm::DYN_ENABLE_STREAMING_REASONING_DISPATCH,
llm::metrics::DYN_METRICS_PREFIX, llm::metrics::DYN_METRICS_PREFIX,
// Model // Model
model::model_express::MODEL_EXPRESS_URL, model::model_express::MODEL_EXPRESS_URL,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment