qwen3coder_tool_parser.py 30.2 KB
Newer Older
1
2
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3
import ast
4
5
6
import json
import uuid
from collections.abc import Sequence
7
from typing import Any
8
9
10

import regex as re

11
from vllm.entrypoints.openai.chat_completion.protocol import (
12
    ChatCompletionRequest,
13
14
)
from vllm.entrypoints.openai.engine.protocol import (
15
16
17
18
19
20
21
    DeltaFunctionCall,
    DeltaMessage,
    DeltaToolCall,
    ExtractedToolCallInformation,
    FunctionCall,
    ToolCall,
)
22
from vllm.logger import init_logger
23
from vllm.tokenizers import TokenizerLike
24
from vllm.tool_parsers.abstract_tool_parser import (
25
    Tool,
26
27
    ToolParser,
)
28
29
30
31
32

logger = init_logger(__name__)


class Qwen3CoderToolParser(ToolParser):
33
34
    def __init__(self, tokenizer: TokenizerLike, tools: list[Tool] | None = None):
        super().__init__(tokenizer, tools)
35
36
37

        self.current_tool_name_sent: bool = False
        self.prev_tool_call_arr: list[dict] = []
38
        # Override base class type - we use string IDs for tool calls
39
        self.current_tool_id: str | None = None  # type: ignore
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
        self.streamed_args_for_tool: list[str] = []

        # Sentinel tokens for streaming mode
        self.tool_call_start_token: str = "<tool_call>"
        self.tool_call_end_token: str = "</tool_call>"
        self.tool_call_prefix: str = "<function="
        self.function_end_token: str = "</function>"
        self.parameter_prefix: str = "<parameter="
        self.parameter_end_token: str = "</parameter>"
        self.is_tool_call_started: bool = False
        self.failed_count: int = 0

        # Enhanced streaming state - reset for each new message
        self._reset_streaming_state()

        # Regex patterns
        self.tool_call_complete_regex = re.compile(
57
58
            r"<tool_call>(.*?)</tool_call>", re.DOTALL
        )
59
        self.tool_call_regex = re.compile(
60
61
            r"<tool_call>(.*?)</tool_call>|<tool_call>(.*?)$", re.DOTALL
        )
62
        self.tool_call_function_regex = re.compile(
63
64
            r"<function=(.*?)</function>|<function=(.*)$", re.DOTALL
        )
65
        self.tool_call_parameter_regex = re.compile(
66
            r"<parameter=(.*?)(?:</parameter>|(?=<parameter=)|(?=</function>)|$)",
67
68
            re.DOTALL,
        )
69
70
71
72

        if not self.model_tokenizer:
            raise ValueError(
                "The model tokenizer must be passed to the ToolParser "
73
74
                "constructor during construction."
            )
75

76
        self.tool_call_start_token_id = self.vocab.get(self.tool_call_start_token)
77
78
        self.tool_call_end_token_id = self.vocab.get(self.tool_call_end_token)

79
        if self.tool_call_start_token_id is None or self.tool_call_end_token_id is None:
80
81
            raise RuntimeError(
                "Qwen3 XML Tool parser could not locate tool call start/end "
82
83
                "tokens in the tokenizer!"
            )
84

85
        logger.debug(
86
87
            "vLLM Successfully import tool parser %s !", self.__class__.__name__
        )
88
89
90
91
92
93
94
95
96
97

    def _generate_tool_call_id(self) -> str:
        """Generate a unique tool call ID."""
        return f"call_{uuid.uuid4().hex[:24]}"

    def _reset_streaming_state(self):
        """Reset all streaming state."""
        self.current_tool_index = 0
        self.is_tool_call_started = False
        self.header_sent = False
98
        self.current_tool_id = None
99
100
101
102
103
104
105
106
107
        self.current_function_name = None
        self.current_param_name = None
        self.current_param_value = ""
        self.param_count = 0
        self.in_param = False
        self.in_function = False
        self.accumulated_text = ""
        self.json_started = False
        self.json_closed = False
108
109
110
111
        # Store accumulated parameters for type conversion
        self.accumulated_params = {}
        self.streaming_request = None

112
    def _get_arguments_config(self, func_name: str, tools: list[Tool] | None) -> dict:
113
114
        """Extract argument configuration for a function."""
        if tools is None:
115
            return {}
116
        for config in tools:
117
118
119
            if not hasattr(config, "type") or not (
                hasattr(config, "function") and hasattr(config.function, "name")
            ):
120
121
122
123
124
125
126
127
128
129
130
                continue
            if config.type == "function" and config.function.name == func_name:
                if not hasattr(config.function, "parameters"):
                    return {}
                params = config.function.parameters
                if isinstance(params, dict) and "properties" in params:
                    return params["properties"]
                elif isinstance(params, dict):
                    return params
                else:
                    return {}
131
        logger.debug("Tool '%s' is not defined in the tools list.", func_name)
132
133
        return {}

134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
    @staticmethod
    def _first_non_null_type(type_value: Any) -> str | None:
        """Extract the first non-null type from a type value.

        Handles both scalar types ("integer") and type-as-array
        (["integer", "null"]) per JSON Schema spec.
        """
        if isinstance(type_value, list):
            return next(
                (
                    str(t).strip().lower()
                    for t in type_value
                    if t is not None and str(t).lower() != "null"
                ),
                None,
            )
        if type_value is not None and str(type_value).lower() != "null":
            return str(type_value).strip().lower()
        return None

    def _resolve_param_type(self, param_def: dict) -> str:
        """Resolve the effective type string from a parameter definition.

        Handles direct "type" fields (including type-as-array),
        anyOf/oneOf schemas emitted by Pydantic v2 for Optional[T],
        and $ref schemas from Pydantic model inputs.
        """
        if "type" in param_def:
            resolved = self._first_non_null_type(param_def["type"])
            return resolved or "string"

        if "anyOf" in param_def or "oneOf" in param_def:
            variants = param_def.get("anyOf") or param_def.get("oneOf", [])
            for v in variants:
                if not isinstance(v, dict):
                    continue
                resolved = self._first_non_null_type(v.get("type"))
                if resolved:
                    return resolved

        # $ref points to a schema definition (e.g. a Pydantic model).
        # The referenced type is almost always an object, so treat it
        # as such to route through json.loads.
        if "$ref" in param_def:
            return "object"

        return "string"

182
183
184
    def _convert_param_value(
        self, param_value: str, param_name: str, param_config: dict, func_name: str
    ) -> Any:
185
186
187
        """Convert parameter value based on its type in the schema."""
        if param_value.lower() == "null":
            return None
188

189
190
        if param_name not in param_config:
            if param_config != {}:
191
                logger.debug(
192
193
                    "Parsed parameter '%s' is not defined in the tool "
                    "parameters for tool '%s', directly returning the "
194
195
196
197
                    "string value.",
                    param_name,
                    func_name,
                )
198
199
            return param_value

200
201
202
203
        if not isinstance(param_config[param_name], dict):
            return param_value

        param_type = self._resolve_param_type(param_config[param_name])
204
205
        if param_type in ["string", "str", "text", "varchar", "char", "enum"]:
            return param_value
206
207
208
209
210
211
212
        elif (
            param_type.startswith("int")
            or param_type.startswith("uint")
            or param_type.startswith("long")
            or param_type.startswith("short")
            or param_type.startswith("unsigned")
        ):
213
214
215
            try:
                return int(param_value)
            except (ValueError, TypeError):
216
                logger.debug(
217
218
                    "Parsed value '%s' of parameter '%s' is not an "
                    "integer in tool '%s', degenerating to string.",
219
220
221
222
                    param_value,
                    param_name,
                    func_name,
                )
223
                return param_value
224
225
226
        elif param_type.startswith("num") or param_type.startswith("float"):
            try:
                float_param_value = float(param_value)
227
228
229
230
231
                return (
                    float_param_value
                    if float_param_value - int(float_param_value) != 0
                    else int(float_param_value)
                )
232
            except (ValueError, TypeError):
233
                logger.debug(
234
                    "Parsed value '%s' of parameter '%s' is not a float "
235
236
237
238
239
                    "in tool '%s', degenerating to string.",
                    param_value,
                    param_name,
                    func_name,
                )
240
                return param_value
241
242
243
        elif param_type in ["boolean", "bool", "binary"]:
            param_value = param_value.lower()
            if param_value not in ["true", "false"]:
244
                logger.debug(
245
246
                    "Parsed value '%s' of parameter '%s' is not a boolean "
                    "(`true` or `false`) in tool '%s', degenerating to "
247
248
249
250
251
                    "false.",
                    param_value,
                    param_name,
                    func_name,
                )
252
253
            return param_value == "true"
        else:
254
255
256
257
258
            if (
                param_type in ["object", "array", "arr"]
                or param_type.startswith("dict")
                or param_type.startswith("list")
            ):
259
                try:
260
261
262
                    param_value = json.loads(param_value)
                    return param_value
                except (json.JSONDecodeError, TypeError, ValueError):
263
                    logger.debug(
264
265
                        "Parsed value '%s' of parameter '%s' cannot be "
                        "parsed with json.loads in tool '%s', will try "
266
267
268
269
270
                        "other methods to parse it.",
                        param_value,
                        param_name,
                        func_name,
                    )
271
272
273
            try:
                param_value = ast.literal_eval(param_value)  # safer
            except (ValueError, SyntaxError, TypeError):
274
                logger.debug(
275
276
                    "Parsed value '%s' of parameter '%s' cannot be "
                    "converted via Python `ast.literal_eval()` in tool "
277
278
279
280
281
                    "'%s', degenerating to string.",
                    param_value,
                    param_name,
                    func_name,
                )
282
283
284
            return param_value

    def _parse_xml_function_call(
285
        self, function_call_str: str, tools: list[Tool] | None
286
    ) -> ToolCall | None:
287
        # Extract function name
288
289
290
291
        end_index = function_call_str.find(">")
        # If there's no ">" character, this is not a valid xml function call
        if end_index == -1:
            return None
292
        function_name = function_call_str[:end_index]
293
        param_config = self._get_arguments_config(function_name, tools)
294
        parameters = function_call_str[end_index + 1 :]
295
        param_dict = {}
296
        for match_text in self.tool_call_parameter_regex.findall(parameters):
297
298
            idx = match_text.index(">")
            param_name = match_text[:idx]
299
            param_value = str(match_text[idx + 1 :])
300
301
302
303
304
305
            # Remove prefix and trailing \n
            if param_value.startswith("\n"):
                param_value = param_value[1:]
            if param_value.endswith("\n"):
                param_value = param_value[:-1]

306
            param_dict[param_name] = self._convert_param_value(
307
308
                param_value, param_name, param_config, function_name
            )
309
310
        return ToolCall(
            type="function",
311
312
313
            function=FunctionCall(
                name=function_name, arguments=json.dumps(param_dict, ensure_ascii=False)
            ),
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
        )

    def _get_function_calls(self, model_output: str) -> list[str]:
        # Find all tool calls
        matched_ranges = self.tool_call_regex.findall(model_output)
        raw_tool_calls = [
            match[0] if match[0] else match[1] for match in matched_ranges
        ]

        # Back-off strategy if no tool_call tags found
        if len(raw_tool_calls) == 0:
            raw_tool_calls = [model_output]

        raw_function_calls = []
        for tool_call in raw_tool_calls:
329
            raw_function_calls.extend(self.tool_call_function_regex.findall(tool_call))
330
331
332
333
334
335
336
337
338
339
340
341
342

        function_calls = [
            match[0] if match[0] else match[1] for match in raw_function_calls
        ]
        return function_calls

    def extract_tool_calls(
        self,
        model_output: str,
        request: ChatCompletionRequest,
    ) -> ExtractedToolCallInformation:
        # Quick check to avoid unnecessary processing
        if self.tool_call_prefix not in model_output:
343
344
345
            return ExtractedToolCallInformation(
                tools_called=False, tool_calls=[], content=model_output
            )
346
347
348
349

        try:
            function_calls = self._get_function_calls(model_output)
            if len(function_calls) == 0:
350
351
352
                return ExtractedToolCallInformation(
                    tools_called=False, tool_calls=[], content=model_output
                )
353
354

            tool_calls = [
355
                self._parse_xml_function_call(function_call_str, self.tools)
356
357
                for function_call_str in function_calls
            ]
358
            # Populate prev_tool_call_arr for serving layer to set finish_reason
359
360
361
            self.prev_tool_call_arr.clear()  # Clear previous calls
            for tool_call in tool_calls:
                if tool_call:
362
363
364
365
366
367
                    self.prev_tool_call_arr.append(
                        {
                            "name": tool_call.function.name,
                            "arguments": tool_call.function.arguments,
                        }
                    )
368
369
370

            # Extract content before tool calls
            content_index = model_output.find(self.tool_call_start_token)
371
372
            idx = model_output.find(self.tool_call_prefix)
            content_index = content_index if content_index >= 0 else idx
373
            content = model_output[:content_index]  # .rstrip()
374
            valid_tool_calls = [tc for tc in tool_calls if tc is not None]
375
            return ExtractedToolCallInformation(
376
377
                tools_called=(len(valid_tool_calls) > 0),
                tool_calls=valid_tool_calls,
378
379
380
381
382
                content=content if content else None,
            )

        except Exception:
            logger.exception("Error in extracting tool call from response.")
383
384
385
            return ExtractedToolCallInformation(
                tools_called=False, tool_calls=[], content=model_output
            )
386
387
388
389
390
391
392
393
394
395

    def extract_tool_calls_streaming(
        self,
        previous_text: str,
        current_text: str,
        delta_text: str,
        previous_token_ids: Sequence[int],
        current_token_ids: Sequence[int],
        delta_token_ids: Sequence[int],
        request: ChatCompletionRequest,
396
    ) -> DeltaMessage | None:
397
398
399
400
401
402
        # Store request for type conversion
        if not previous_text:
            self._reset_streaming_state()
            self.streaming_request = request

        # If no delta text, return None unless it's an EOS token after tools
403
404
        if not delta_text:
            # Check if this is an EOS token after all tool calls are complete
405
406
            # Check for tool calls in text even if is_tool_call_started
            # is False (might have been reset after processing all tools)
407
            if delta_token_ids and self.tool_call_end_token_id not in delta_token_ids:
408
409
                # Count complete tool calls
                complete_calls = len(
410
411
                    self.tool_call_complete_regex.findall(current_text)
                )
412
413
414

                # If we have completed tool calls and populated
                # prev_tool_call_arr
415
                if complete_calls > 0 and len(self.prev_tool_call_arr) > 0:
416
                    # Check if all tool calls are closed
417
                    open_calls = current_text.count(
418
419
                        self.tool_call_start_token
                    ) - current_text.count(self.tool_call_end_token)
420
                    if open_calls == 0:
421
                        # Return empty delta for finish_reason processing
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
                        return DeltaMessage(content="")
                elif not self.is_tool_call_started and current_text:
                    # This is a regular content response that's now complete
                    return DeltaMessage(content="")
            return None

        # Update accumulated text
        self.accumulated_text = current_text

        # Check if we need to advance to next tool
        if self.json_closed and not self.in_function:
            # Check if this tool call has ended
            tool_ends = current_text.count(self.tool_call_end_token)
            if tool_ends > self.current_tool_index:
                # This tool has ended, advance to next
                self.current_tool_index += 1
                self.header_sent = False
                self.param_count = 0
                self.json_started = False
                self.json_closed = False
442
                self.accumulated_params = {}
443
444

                # Check if there are more tool calls
445
446
                tool_starts = current_text.count(self.tool_call_start_token)
                if self.current_tool_index >= tool_starts:
447
448
449
450
451
452
453
454
                    # No more tool calls
                    self.is_tool_call_started = False
                # Continue processing next tool
                return None

        # Handle normal content before tool calls
        if not self.is_tool_call_started:
            # Check if tool call is starting
455
456
457
458
            if (
                self.tool_call_start_token_id in delta_token_ids
                or self.tool_call_start_token in delta_text
            ):
459
460
461
                self.is_tool_call_started = True
                # Return any content before the tool call
                if self.tool_call_start_token in delta_text:
462
463
464
                    content_before = delta_text[
                        : delta_text.index(self.tool_call_start_token)
                    ]
465
466
467
468
469
                    if content_before:
                        return DeltaMessage(content=content_before)
                return None
            else:
                # Check if we're between tool calls - skip whitespace
470
471
472
473
                if (
                    current_text.rstrip().endswith(self.tool_call_end_token)
                    and delta_text.strip() == ""
                ):
474
475
476
477
478
479
480
481
482
483
484
485
486
487
                    # We just ended a tool call, skip whitespace
                    return None
                # Normal content, no tool call
                return DeltaMessage(content=delta_text)

        # Check if we're between tool calls (waiting for next one)
        # Count tool calls we've seen vs processed
        tool_starts_count = current_text.count(self.tool_call_start_token)
        if self.current_tool_index >= tool_starts_count:
            # We're past all tool calls, shouldn't be here
            return None

        # We're in a tool call, find the current tool call portion
        # Need to find the correct tool call based on current_tool_index
488
        tool_start_positions: list[int] = []
489
490
491
492
493
        idx = 0
        while True:
            idx = current_text.find(self.tool_call_start_token, idx)
            if idx == -1:
                break
494
            tool_start_positions.append(idx)
495
496
            idx += len(self.tool_call_start_token)

497
        if self.current_tool_index >= len(tool_start_positions):
498
499
500
            # No more tool calls to process yet
            return None

501
        tool_start_idx = tool_start_positions[self.current_tool_index]
502
        # Find where this tool call ends (or current position if not ended yet)
503
        tool_end_idx = current_text.find(self.tool_call_end_token, tool_start_idx)
504
505
506
        if tool_end_idx == -1:
            tool_text = current_text[tool_start_idx:]
        else:
507
508
509
            tool_text = current_text[
                tool_start_idx : tool_end_idx + len(self.tool_call_end_token)
            ]
510
511
512
513

        # Looking for function header
        if not self.header_sent:
            if self.tool_call_prefix in tool_text:
514
                func_start = tool_text.find(self.tool_call_prefix) + len(
515
516
                    self.tool_call_prefix
                )
517
518
519
520
521
                func_end = tool_text.find(">", func_start)

                if func_end != -1:
                    # Found complete function name
                    self.current_function_name = tool_text[func_start:func_end]
522
                    self.current_tool_id = self._generate_tool_call_id()
523
524
525
                    self.header_sent = True
                    self.in_function = True

526
527
528
529
530
531
532
533
                    # Always append — each tool call is a separate
                    # invocation even if the function name is the same
                    # (e.g. two consecutive "read" calls).
                    self.prev_tool_call_arr.append(
                        {
                            "name": self.current_function_name,
                            "arguments": "{}",
                        }
534
                    )
535
536
537
538
539
540
541

                    # Initialize streamed args tracking for this tool.
                    # The serving layer reads streamed_args_for_tool to
                    # compute remaining arguments at stream end. Without
                    # this, IndexError occurs when the serving layer
                    # accesses streamed_args_for_tool[index].
                    self.streamed_args_for_tool.append("")
542
543

                    # Send header with function info
544
545
546
547
548
549
550
551
552
553
554
555
                    return DeltaMessage(
                        tool_calls=[
                            DeltaToolCall(
                                index=self.current_tool_index,
                                id=self.current_tool_id,
                                function=DeltaFunctionCall(
                                    name=self.current_function_name, arguments=""
                                ),
                                type="function",
                            )
                        ]
                    )
556
557
558
559
            return None

        # We've sent header, now handle function body
        if self.in_function:
560
561
562
563
564
565
            # Always send opening brace first, regardless of whether
            # parameter_prefix is in the current delta. With speculative
            # decoding, a single delta may contain both the opening brace
            # and parameter data; skipping "{" here would desync
            # json_started from what was actually streamed.
            if not self.json_started:
566
                self.json_started = True
567
                self.streamed_args_for_tool[self.current_tool_index] += "{"
568
569
570
571
572
573
574
575
                return DeltaMessage(
                    tool_calls=[
                        DeltaToolCall(
                            index=self.current_tool_index,
                            function=DeltaFunctionCall(arguments="{"),
                        )
                    ]
                )
576

577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
            # Find all parameter start positions in current tool_text
            param_starts = []
            search_idx = 0
            while True:
                search_idx = tool_text.find(self.parameter_prefix, search_idx)
                if search_idx == -1:
                    break
                param_starts.append(search_idx)
                search_idx += len(self.parameter_prefix)

            # Process ALL complete params in a loop (spec decode fix).
            # With speculative decoding a single delta can deliver
            # multiple complete parameters at once. The old single-pass
            # code would process one and ``return None`` if the next was
            # incomplete — skipping any already-complete params that
            # preceded it. Using a loop with ``break`` instead ensures
            # we emit every complete parameter before yielding control.
            json_fragments = []
            while not self.in_param and self.param_count < len(param_starts):
                param_idx = param_starts[self.param_count]
                param_start = param_idx + len(self.parameter_prefix)
                remaining = tool_text[param_start:]

                if ">" not in remaining:
                    break

                name_end = remaining.find(">")
                current_param_name = remaining[:name_end]

                value_start = param_start + name_end + 1
                value_text = tool_text[value_start:]
                if value_text.startswith("\n"):
                    value_text = value_text[1:]

                param_end_idx = value_text.find(self.parameter_end_token)
                if param_end_idx == -1:
                    next_param_idx = value_text.find(self.parameter_prefix)
                    func_end_idx = value_text.find(self.function_end_token)

                    if next_param_idx != -1 and (
                        func_end_idx == -1 or next_param_idx < func_end_idx
                    ):
                        param_end_idx = next_param_idx
                    elif func_end_idx != -1:
                        param_end_idx = func_end_idx
                    else:
                        # Fallback for malformed XML where </function>
                        # is missing. Use </tool_call> as a delimiter
                        # if present in the value so we don't include
                        # the closing tag as part of the param value.
                        tool_end_in_value = value_text.find(self.tool_call_end_token)
                        if tool_end_in_value != -1:
                            param_end_idx = tool_end_in_value
                        else:
                            # Parameter incomplete — break so we still
                            # emit any fragments accumulated by earlier
                            # loop iterations.
                            break

                if param_end_idx == -1:
                    break

                param_value = value_text[:param_end_idx]
                if param_value.endswith("\n"):
                    param_value = param_value[:-1]

                self.current_param_name = current_param_name
                self.accumulated_params[current_param_name] = param_value

                param_config = self._get_arguments_config(
                    self.current_function_name or "",
648
                    self.tools,
649
                )
650

651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
                converted_value = self._convert_param_value(
                    param_value,
                    current_param_name,
                    param_config,
                    self.current_function_name or "",
                )

                serialized_value = json.dumps(converted_value, ensure_ascii=False)

                if self.param_count == 0:
                    json_fragment = f'"{current_param_name}": {serialized_value}'
                else:
                    json_fragment = f', "{current_param_name}": {serialized_value}'

                self.param_count += 1
                json_fragments.append(json_fragment)

            if json_fragments:
                combined = "".join(json_fragments)

                if self.current_tool_index < len(self.streamed_args_for_tool):
                    self.streamed_args_for_tool[self.current_tool_index] += combined
                else:
                    logger.warning(
                        "streamed_args_for_tool out of sync: index=%d len=%d",
                        self.current_tool_index,
                        len(self.streamed_args_for_tool),
                    )

                return DeltaMessage(
                    tool_calls=[
                        DeltaToolCall(
                            index=self.current_tool_index,
                            function=DeltaFunctionCall(arguments=combined),
                        )
                    ]
                )

            # Check for function end AFTER processing parameters.
            # This ordering is critical: with speculative decoding a
            # burst can deliver the final parameter value together with
            # </function>. If the close check ran first it would emit
            # "}" and set in_function=False before the parameter loop
            # ever ran, causing the parameter to be silently dropped.
695
696
697
            if not self.json_closed and self.function_end_token in tool_text:
                self.json_closed = True

698
                func_start = tool_text.find(self.tool_call_prefix) + len(
699
700
701
                    self.tool_call_prefix
                )
                func_content_end = tool_text.find(self.function_end_token, func_start)
702
703
704
705
                if func_content_end != -1:
                    func_content = tool_text[func_start:func_content_end]
                    try:
                        parsed_tool = self._parse_xml_function_call(
706
                            func_content,
707
                            self.tools,
708
                        )
709
710
711
712
713
714
                        if parsed_tool and self.current_tool_index < len(
                            self.prev_tool_call_arr
                        ):
                            self.prev_tool_call_arr[self.current_tool_index][
                                "arguments"
                            ] = parsed_tool.function.arguments
715
                    except Exception:
716
717
718
719
720
721
722
723
724
725
726
727
728
729
                        logger.debug(
                            "Failed to parse tool call during streaming: %s",
                            tool_text,
                            exc_info=True,
                        )

                if self.current_tool_index < len(self.streamed_args_for_tool):
                    self.streamed_args_for_tool[self.current_tool_index] += "}"
                else:
                    logger.warning(
                        "streamed_args_for_tool out of sync: index=%d len=%d",
                        self.current_tool_index,
                        len(self.streamed_args_for_tool),
                    )
730

731
732
733
734
735
736
737
738
                result = DeltaMessage(
                    tool_calls=[
                        DeltaToolCall(
                            index=self.current_tool_index,
                            function=DeltaFunctionCall(arguments="}"),
                        )
                    ]
                )
739
740
741

                self.in_function = False
                self.json_closed = True
742
                self.accumulated_params = {}
743
744
745

                return result

746
        return None