Unverified Commit 07c9d8fb authored by Simo Lin's avatar Simo Lin Committed by GitHub
Browse files

[router] add llama3.2 multi json streaming parser (#9735)

parent 4a4772ae
......@@ -356,6 +356,81 @@ impl ToolParser for JsonParser {
return Ok(StreamResult::Incomplete);
}
// Extract JSON content first to check for separators
let extracted_json = self.extract_json_content(&state.buffer);
// Handle multiple JSON objects with separators
// Check if we have a separator and potentially multiple JSON objects
let separator = &self.token_config.separator;
if !separator.is_empty() && extracted_json.contains(separator.as_str()) {
// Try to find a complete JSON object before the separator
if let Some(separator_pos) = extracted_json.find(separator.as_str()) {
// Get JSON before separator
let before_separator = &extracted_json[..separator_pos];
// Try to parse the JSON before the separator
match serde_json::from_str::<Value>(before_separator) {
Ok(value) => {
// Parse tool calls from this JSON
let tools = self.parse_json_value(&value)?;
if !tools.is_empty() {
// We need to figure out how much to remove from the original buffer
// Find where the separator is in the original buffer and remove up to and including it
if let Some(sep_in_original) = state.buffer.find(separator.as_str()) {
let remaining =
state.buffer[sep_in_original + separator.len()..].to_string();
state.buffer = remaining;
}
// Return the first tool as complete
if let Some(tool) = tools.into_iter().next() {
return Ok(StreamResult::ToolComplete(tool));
}
}
}
Err(_) => {
// Failed to parse, continue to try other methods
}
}
}
}
// Handle multiple start tokens (e.g., multiple <|python_tag|> markers)
if !self.token_config.start_tokens.is_empty() {
let start_token = &self.token_config.start_tokens[0];
if !start_token.is_empty() {
// Find all occurrences of start token
let occurrences: Vec<_> =
state.buffer.match_indices(start_token.as_str()).collect();
if occurrences.len() > 1 {
// We have multiple start tokens, try to process the first complete one
let first_pos = occurrences[0].0;
let second_pos = occurrences[1].0;
// Extract content between first and second start token
let first_json_section = &state.buffer[first_pos..second_pos];
let json_content = self.extract_json_content(first_json_section);
// Try to parse this as complete JSON
if let Ok(value) = serde_json::from_str::<Value>(json_content) {
// Parse tool calls from this JSON
let tools = self.parse_json_value(&value)?;
if !tools.is_empty() {
// Remove the processed section from buffer
let remaining = state.buffer[second_pos..].to_string();
state.buffer = remaining;
// Return the first tool as complete
if let Some(tool) = tools.into_iter().next() {
return Ok(StreamResult::ToolComplete(tool));
}
}
}
}
}
}
// Regular single JSON parsing
// Extract JSON content
let json_content = self.extract_json_content(&state.buffer);
......@@ -364,16 +439,23 @@ impl ToolParser for JsonParser {
Ok((value, consumed)) => {
// Check if we have a complete JSON structure
if consumed == json_content.len() {
// Complete JSON, parse tool calls
let tools = self.parse_json_value(&value)?;
if !tools.is_empty() {
// Clear buffer since we consumed everything
state.buffer.clear();
// Return the first tool as complete
// TODO simplified version, address more complex version
if let Some(tool) = tools.into_iter().next() {
return Ok(StreamResult::ToolComplete(tool));
// Check if this is truly complete or just has null from incomplete parsing
// We need to ensure the JSON actually ends properly (not cut off mid-key)
let trimmed = json_content.trim();
let looks_complete = trimmed.ends_with('}') || trimmed.ends_with(']');
if looks_complete {
// Complete JSON, parse tool calls
let tools = self.parse_json_value(&value)?;
if !tools.is_empty() {
// Clear buffer since we consumed everything
state.buffer.clear();
// Return the first tool as complete
// TODO simplified version, address more complex version
if let Some(tool) = tools.into_iter().next() {
return Ok(StreamResult::ToolComplete(tool));
}
}
}
} else {
......
......@@ -341,14 +341,84 @@ async fn test_llama_streaming_multiple_tools() {
let result = parser.parse_incremental(text, &mut state).await.unwrap();
// Current implementation may handle this differently
// Should get first tool complete
match result {
sglang_router_rs::tool_parser::StreamResult::ToolComplete(tool) => {
// At minimum should get first tool
assert_eq!(tool.function.name, "func1");
}
_ => panic!("Expected first tool to be complete"),
}
// Process remaining buffer to get second tool
let result2 = parser.parse_incremental("", &mut state).await.unwrap();
match result2 {
sglang_router_rs::tool_parser::StreamResult::ToolComplete(tool) => {
assert_eq!(tool.function.name, "func2");
}
_ => panic!("Expected second tool to be complete"),
}
}
#[tokio::test]
async fn test_llama_streaming_multiple_tools_chunked() {
// Test streaming multiple tool calls arriving in chunks
let parser = LlamaParser::new();
let mut state = sglang_router_rs::tool_parser::ParseState::new();
// First chunk - incomplete first JSON
let chunk1 = r#"<|python_tag|>{"name": "get_weather", "arguments""#;
let result1 = parser.parse_incremental(chunk1, &mut state).await.unwrap();
// Should be incomplete or have tool name
match result1 {
sglang_router_rs::tool_parser::StreamResult::Incomplete
| sglang_router_rs::tool_parser::StreamResult::ToolName { .. }
| sglang_router_rs::tool_parser::StreamResult::ToolArguments { .. } => {
// Expected - could get tool name or be incomplete or even partial args
}
_ => panic!(
"Expected incomplete or tool name for partial JSON, got: {:?}",
result1
),
}
// Second chunk - complete first JSON and separator
let chunk2 = r#": {"city": "Paris"}};{"name": "#;
let result2 = parser.parse_incremental(chunk2, &mut state).await.unwrap();
// Should get first tool complete
match result2 {
sglang_router_rs::tool_parser::StreamResult::ToolComplete(tool) => {
assert_eq!(tool.function.name, "get_weather");
let args: serde_json::Value = serde_json::from_str(&tool.function.arguments).unwrap();
assert_eq!(args["city"], "Paris");
}
_ => panic!("Expected first tool to be complete after separator"),
}
// Third chunk - complete second JSON
let chunk3 = r#""get_time", "arguments": {"timezone": "UTC"}}"#;
let result3 = parser.parse_incremental(chunk3, &mut state).await.unwrap();
// Should get second tool complete
match result3 {
sglang_router_rs::tool_parser::StreamResult::ToolComplete(tool) => {
assert_eq!(tool.function.name, "get_time");
let args: serde_json::Value = serde_json::from_str(&tool.function.arguments).unwrap();
assert_eq!(args["timezone"], "UTC");
}
_ => {
// Also acceptable if waiting for more
// If not complete yet, try one more empty chunk
let result4 = parser.parse_incremental("", &mut state).await.unwrap();
match result4 {
sglang_router_rs::tool_parser::StreamResult::ToolComplete(tool) => {
assert_eq!(tool.function.name, "get_time");
let args: serde_json::Value =
serde_json::from_str(&tool.function.arguments).unwrap();
assert_eq!(args["timezone"], "UTC");
}
_ => panic!("Expected second tool to be complete"),
}
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment