"tests/models/language/generation/test_phimoe.py" did not exist on "a84e598e2125960d3b4f716b78863f24ac562947"
serving.py 38.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
# Adapted from
# https://github.com/vllm/vllm/entrypoints/openai/serving_chat.py

"""Anthropic Messages API serving handler"""

import json
import logging
import time
11
import uuid
12
from collections.abc import AsyncGenerator
13
from typing import TYPE_CHECKING, Any
14
15
16
17
18
19

from fastapi import Request

from vllm.engine.protocol import EngineClient
from vllm.entrypoints.anthropic.protocol import (
    AnthropicContentBlock,
20
21
22
    AnthropicContextManagement,
    AnthropicCountTokensRequest,
    AnthropicCountTokensResponse,
23
24
25
26
27
28
29
30
31
    AnthropicDelta,
    AnthropicError,
    AnthropicMessagesRequest,
    AnthropicMessagesResponse,
    AnthropicStreamEvent,
    AnthropicUsage,
)
from vllm.entrypoints.chat_utils import ChatTemplateContentFormatOption
from vllm.entrypoints.logger import RequestLogger
32
from vllm.entrypoints.openai.chat_completion.protocol import (
33
34
35
36
37
    ChatCompletionNamedToolChoiceParam,
    ChatCompletionRequest,
    ChatCompletionResponse,
    ChatCompletionStreamResponse,
    ChatCompletionToolsParam,
38
39
40
)
from vllm.entrypoints.openai.chat_completion.serving import OpenAIServingChat
from vllm.entrypoints.openai.engine.protocol import (
41
42
43
    ErrorResponse,
    StreamOptions,
)
44
from vllm.entrypoints.openai.models.serving import OpenAIServingModels
45

46
47
48
if TYPE_CHECKING:
    from vllm.entrypoints.serve.render.serving import OpenAIServingRender

49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
logger = logging.getLogger(__name__)


def wrap_data_with_event(data: str, event: str):
    return f"event: {event}\ndata: {data}\n\n"


class AnthropicServingMessages(OpenAIServingChat):
    """Handler for Anthropic Messages API requests"""

    def __init__(
        self,
        engine_client: EngineClient,
        models: OpenAIServingModels,
        response_role: str,
        *,
65
        openai_serving_render: "OpenAIServingRender",
66
67
68
69
70
71
72
73
74
75
76
77
78
79
        request_logger: RequestLogger | None,
        chat_template: str | None,
        chat_template_content_format: ChatTemplateContentFormatOption,
        return_tokens_as_token_ids: bool = False,
        reasoning_parser: str = "",
        enable_auto_tools: bool = False,
        tool_parser: str | None = None,
        enable_prompt_tokens_details: bool = False,
        enable_force_include_usage: bool = False,
    ):
        super().__init__(
            engine_client=engine_client,
            models=models,
            response_role=response_role,
80
            openai_serving_render=openai_serving_render,
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
            request_logger=request_logger,
            chat_template=chat_template,
            chat_template_content_format=chat_template_content_format,
            return_tokens_as_token_ids=return_tokens_as_token_ids,
            reasoning_parser=reasoning_parser,
            enable_auto_tools=enable_auto_tools,
            tool_parser=tool_parser,
            enable_prompt_tokens_details=enable_prompt_tokens_details,
            enable_force_include_usage=enable_force_include_usage,
        )
        self.stop_reason_map = {
            "stop": "end_turn",
            "length": "max_tokens",
            "tool_calls": "tool_use",
        }

97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
    @staticmethod
    def _convert_image_source_to_url(source: dict[str, Any]) -> str:
        """Convert an Anthropic image source to an OpenAI-compatible URL.

        Anthropic supports two image source types:
        - base64: {"type": "base64", "media_type": "image/jpeg", "data": "..."}
        - url: {"type": "url", "url": "https://..."}

        For base64 sources, this constructs a proper data URI that
        downstream processors (e.g. vLLM's media connector) can handle.
        """
        source_type = source.get("type")
        if source_type == "url":
            return source.get("url", "")
        # Default to base64 processing if type is "base64"
        # or missing, ensuring a proper data URI is always
        # constructed for non-URL sources.
        media_type = source.get("media_type", "image/jpeg")
        data = source.get("data", "")
        return f"data:{media_type};base64,{data}"

    @classmethod
119
    def _convert_anthropic_to_openai_request(
120
        cls, anthropic_request: AnthropicMessagesRequest | AnthropicCountTokensRequest
121
122
    ) -> ChatCompletionRequest:
        """Convert Anthropic message format to OpenAI format"""
123
124
125
126
127
128
129
130
131
        openai_messages: list[dict[str, Any]] = []

        cls._convert_system_message(anthropic_request, openai_messages)
        cls._convert_messages(anthropic_request.messages, openai_messages)
        req = cls._build_base_request(anthropic_request, openai_messages)
        cls._handle_streaming_options(req, anthropic_request)
        cls._convert_tool_choice(anthropic_request, req)
        cls._convert_tools(anthropic_request, req)
        return req
132

133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
    @classmethod
    def _convert_system_message(
        cls,
        anthropic_request: AnthropicMessagesRequest | AnthropicCountTokensRequest,
        openai_messages: list[dict[str, Any]],
    ) -> None:
        """Convert Anthropic system message to OpenAI format"""
        if not anthropic_request.system:
            return

        if isinstance(anthropic_request.system, str):
            openai_messages.append(
                {"role": "system", "content": anthropic_request.system}
            )
        else:
            system_prompt = ""
            for block in anthropic_request.system:
                if block.type == "text" and block.text:
151
152
153
154
                    # Strip Claude Code's attribution header which contains
                    # a per-request hash that defeats prefix caching.
                    if block.text.startswith("x-anthropic-billing-header"):
                        continue
155
156
                    system_prompt += block.text
            openai_messages.append({"role": "system", "content": system_prompt})
157

158
159
160
161
162
163
    @classmethod
    def _convert_messages(
        cls, messages: list, openai_messages: list[dict[str, Any]]
    ) -> None:
        """Convert Anthropic messages to OpenAI format"""
        for msg in messages:
164
            openai_msg: dict[str, Any] = {"role": msg.role}  # type: ignore
165

166
167
168
            if isinstance(msg.content, str):
                openai_msg["content"] = msg.content
            else:
zhangning3's avatar
zhangning3 committed
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
                # Handle complex content blocks
                content_parts: list[dict[str, Any]] = []
                tool_calls: list[dict[str, Any]] = []
                reasoning_parts: list[str] = []

                for block in msg.content:
                    if block.type == "text" and block.text:
                        content_parts.append({"type": "text", "text": block.text})
                    elif block.type == "image" and block.source:
                        content_parts.append(
                            {
                                "type": "image_url",
                                "image_url": {"url": block.source.get("data", "")},
                            }
                        )
                    elif block.type == "thinking" and block.thinking is not None:
                        reasoning_parts.append(block.thinking)
                    elif block.type == "tool_use":
                        # Convert tool use to function call format
                        tool_call = {
                            "id": block.id or f"call_{int(time.time())}",
                            "type": "function",
                            "function": {
                                "name": block.name or "",
                                "arguments": json.dumps(block.input or {}),
                            },
                        }
                        tool_calls.append(tool_call)
                    elif block.type == "tool_result":
                        if msg.role == "user":
                            openai_messages.append(
                                {
                                    "role": "tool",
                                    "tool_call_id": block.id or "",
                                    "content": str(block.content)
                                    if block.content
                                    else "",
                                }
                            )
                        else:
                            # Assistant tool result becomes regular text
                            tool_result_text = (
                                str(block.content) if block.content else ""
                            )
                            content_parts.append(
                                {
                                    "type": "text",
                                    "text": f"Tool result: {tool_result_text}",
                                }
                            )

                if reasoning_parts:
                    openai_msg["reasoning"] = "".join(reasoning_parts)

                # Add tool calls to the message if any
                if tool_calls:
                    openai_msg["tool_calls"] = tool_calls  # type: ignore

                # Add content parts if any
                if content_parts:
                    if len(content_parts) == 1 and content_parts[0]["type"] == "text":
                        openai_msg["content"] = content_parts[0]["text"]
                    else:
                        openai_msg["content"] = content_parts  # type: ignore
                elif not tool_calls and not reasoning_parts:
                    continue
235
236

            openai_messages.append(openai_msg)
237

238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
    @classmethod
    def _convert_message_content(
        cls,
        msg,
        openai_msg: dict[str, Any],
        openai_messages: list[dict[str, Any]],
    ) -> None:
        """Convert complex message content blocks"""
        content_parts: list[dict[str, Any]] = []
        tool_calls: list[dict[str, Any]] = []
        reasoning_parts: list[str] = []

        for block in msg.content:
            cls._convert_block(
                block,
                msg.role,
                content_parts,
                tool_calls,
                reasoning_parts,
                openai_messages,
            )
259

260
261
        if reasoning_parts:
            openai_msg["reasoning"] = "".join(reasoning_parts)
262

263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
        if tool_calls:
            openai_msg["tool_calls"] = tool_calls  # type: ignore

        if content_parts:
            if len(content_parts) == 1 and content_parts[0]["type"] == "text":
                openai_msg["content"] = content_parts[0]["text"]
            else:
                openai_msg["content"] = content_parts  # type: ignore
        elif not tool_calls and not reasoning_parts:
            return

    @classmethod
    def _convert_block(
        cls,
        block,
        role: str,
        content_parts: list[dict[str, Any]],
        tool_calls: list[dict[str, Any]],
        reasoning_parts: list[str],
        openai_messages: list[dict[str, Any]],
    ) -> None:
        """Convert individual content block"""
        if block.type == "text" and block.text:
            content_parts.append({"type": "text", "text": block.text})
        elif block.type == "image" and block.source:
            image_url = cls._convert_image_source_to_url(block.source)
            content_parts.append({"type": "image_url", "image_url": {"url": image_url}})
        elif block.type == "thinking" and block.thinking is not None:
            reasoning_parts.append(block.thinking)
292
293
294
295
296
297
        elif block.type == "redacted_thinking":
            # Redacted thinking blocks contain safety-filtered reasoning.
            # We skip them as the content is opaque (base64 'data' field),
            # but accepting the block prevents a validation error when the
            # client echoes back the full assistant message.
            pass
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
        elif block.type == "tool_use":
            cls._convert_tool_use_block(block, tool_calls)
        elif block.type == "tool_result":
            cls._convert_tool_result_block(block, role, openai_messages, content_parts)

    @classmethod
    def _convert_tool_use_block(cls, block, tool_calls: list[dict[str, Any]]) -> None:
        """Convert tool_use block to OpenAI function call format"""
        tool_call = {
            "id": block.id or f"call_{int(time.time())}",
            "type": "function",
            "function": {
                "name": block.name or "",
                "arguments": json.dumps(block.input or {}),
            },
        }
        tool_calls.append(tool_call)

    @classmethod
    def _convert_tool_result_block(
        cls,
        block,
        role: str,
        openai_messages: list[dict[str, Any]],
        content_parts: list[dict[str, Any]],
    ) -> None:
        """Convert tool_result block to OpenAI format"""
        if role == "user":
            cls._convert_user_tool_result(block, openai_messages)
        else:
            tool_result_text = str(block.content) if block.content else ""
            content_parts.append(
                {"type": "text", "text": f"Tool result: {tool_result_text}"}
            )

    @classmethod
    def _convert_user_tool_result(
        cls, block, openai_messages: list[dict[str, Any]]
    ) -> None:
        """Convert user tool_result with text and image support"""
        tool_text = ""
        tool_image_urls: list[str] = []

        if isinstance(block.content, str):
            tool_text = block.content
        elif isinstance(block.content, list):
            text_parts: list[str] = []
            for item in block.content:
                if not isinstance(item, dict):
347
                    continue
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
                item_type = item.get("type")
                if item_type == "text":
                    text_parts.append(item.get("text", ""))
                elif item_type == "image":
                    source = item.get("source", {})
                    url = cls._convert_image_source_to_url(source)
                    if url:
                        tool_image_urls.append(url)
            tool_text = "\n".join(text_parts)

        openai_messages.append(
            {
                "role": "tool",
                "tool_call_id": block.tool_use_id or "",
                "content": tool_text or "",
            }
        )
365

366
367
368
369
370
371
372
373
374
375
        if tool_image_urls:
            openai_messages.append(
                {
                    "role": "user",
                    "content": [  # type: ignore[dict-item]
                        {"type": "image_url", "image_url": {"url": img}}
                        for img in tool_image_urls
                    ],
                }
            )
376

377
378
379
380
381
382
383
384
385
386
387
388
389
390
    @classmethod
    def _build_base_request(
        cls,
        anthropic_request: AnthropicMessagesRequest | AnthropicCountTokensRequest,
        openai_messages: list[dict[str, Any]],
    ) -> ChatCompletionRequest:
        """Build base ChatCompletionRequest"""
        if isinstance(anthropic_request, AnthropicCountTokensRequest):
            return ChatCompletionRequest(
                model=anthropic_request.model,
                messages=openai_messages,
            )

        return ChatCompletionRequest(
391
392
393
394
395
396
397
398
            model=anthropic_request.model,
            messages=openai_messages,
            max_tokens=anthropic_request.max_tokens,
            max_completion_tokens=anthropic_request.max_tokens,
            stop=anthropic_request.stop_sequences,
            temperature=anthropic_request.temperature,
            top_p=anthropic_request.top_p,
            top_k=anthropic_request.top_k,
399
            kv_transfer_params=anthropic_request.kv_transfer_params,
400
401
        )

402
403
404
405
406
407
408
409
410
    @classmethod
    def _handle_streaming_options(
        cls,
        req: ChatCompletionRequest,
        anthropic_request: AnthropicMessagesRequest | AnthropicCountTokensRequest,
    ) -> None:
        """Handle streaming configuration"""
        if isinstance(anthropic_request, AnthropicCountTokensRequest):
            return
411
412
        if anthropic_request.stream:
            req.stream = anthropic_request.stream
413
            req.stream_options = StreamOptions.model_validate(
414
415
                {"include_usage": True, "continuous_usage_stats": True}
            )
416

417
418
419
420
421
422
423
    @classmethod
    def _convert_tool_choice(
        cls,
        anthropic_request: AnthropicMessagesRequest | AnthropicCountTokensRequest,
        req: ChatCompletionRequest,
    ) -> None:
        """Convert Anthropic tool_choice to OpenAI format"""
424
425
        if anthropic_request.tool_choice is None:
            req.tool_choice = None
426
427
428
429
            return

        tool_choice_type = anthropic_request.tool_choice.type
        if tool_choice_type == "auto":
430
            req.tool_choice = "auto"
431
        elif tool_choice_type == "any":
432
            req.tool_choice = "required"
433
434
        elif tool_choice_type == "none":
            req.tool_choice = "none"
435
        elif tool_choice_type == "tool":
436
437
438
439
440
441
442
            req.tool_choice = ChatCompletionNamedToolChoiceParam.model_validate(
                {
                    "type": "function",
                    "function": {"name": anthropic_request.tool_choice.name},
                }
            )

443
444
445
446
447
448
449
    @classmethod
    def _convert_tools(
        cls,
        anthropic_request: AnthropicMessagesRequest | AnthropicCountTokensRequest,
        req: ChatCompletionRequest,
    ) -> None:
        """Convert Anthropic tools to OpenAI format"""
450
        if anthropic_request.tools is None:
451
452
453
            return

        tools = []
454
455
456
457
458
459
460
461
462
463
464
465
466
        for tool in anthropic_request.tools:
            tools.append(
                ChatCompletionToolsParam.model_validate(
                    {
                        "type": "function",
                        "function": {
                            "name": tool.name,
                            "description": tool.description,
                            "parameters": tool.input_schema,
                        },
                    }
                )
            )
467

468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
        if req.tool_choice is None:
            req.tool_choice = "auto"
        req.tools = tools

    async def create_messages(
        self,
        request: AnthropicMessagesRequest,
        raw_request: Request | None = None,
    ) -> AsyncGenerator[str, None] | AnthropicMessagesResponse | ErrorResponse:
        """
        Messages API similar to Anthropic's API.

        See https://docs.anthropic.com/en/api/messages
        for the API specification. This API mimics the Anthropic messages API.
        """
483
484
        if logger.isEnabledFor(logging.DEBUG):
            logger.debug("Received messages request %s", request.model_dump_json())
485
        chat_req = self._convert_anthropic_to_openai_request(request)
486
487
        if logger.isEnabledFor(logging.DEBUG):
            logger.debug("Convert to OpenAI request %s", chat_req.model_dump_json())
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
        generator = await self.create_chat_completion(chat_req, raw_request)

        if isinstance(generator, ErrorResponse):
            return generator

        elif isinstance(generator, ChatCompletionResponse):
            return self.messages_full_converter(generator)

        return self.message_stream_converter(generator)

    def messages_full_converter(
        self,
        generator: ChatCompletionResponse,
    ) -> AnthropicMessagesResponse:
        result = AnthropicMessagesResponse(
            id=generator.id,
            content=[],
            model=generator.model,
            usage=AnthropicUsage(
                input_tokens=generator.usage.prompt_tokens,
                output_tokens=generator.usage.completion_tokens,
            ),
510
            kv_transfer_params=generator.kv_transfer_params,
511
        )
512
513
        choice = generator.choices[0]
        if choice.finish_reason == "stop":
514
            result.stop_reason = "end_turn"
515
        elif choice.finish_reason == "length":
516
            result.stop_reason = "max_tokens"
517
        elif choice.finish_reason == "tool_calls":
518
519
            result.stop_reason = "tool_use"

520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
        content: list[AnthropicContentBlock] = []
        if choice.message.reasoning:
            content.append(
                AnthropicContentBlock(
                    type="thinking",
                    thinking=choice.message.reasoning,
                    signature=uuid.uuid4().hex,
                )
            )
        if choice.message.content:
            content.append(
                AnthropicContentBlock(
                    type="text",
                    text=choice.message.content,
                )
535
536
            )

537
        for tool_call in choice.message.tool_calls:
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
            anthropic_tool_call = AnthropicContentBlock(
                type="tool_use",
                id=tool_call.id,
                name=tool_call.function.name,
                input=json.loads(tool_call.function.arguments),
            )
            content += [anthropic_tool_call]

        result.content = content

        return result

    async def message_stream_converter(
        self,
        generator: AsyncGenerator[str, None],
    ) -> AsyncGenerator[str, None]:
        try:
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587

            class _ActiveBlockState:
                def __init__(self) -> None:
                    self.content_block_index = 0
                    self.block_type: str | None = None
                    self.block_index: int | None = None
                    self.block_signature: str | None = None
                    self.signature_emitted: bool = False
                    self.tool_use_id: str | None = None

                def reset(self) -> None:
                    self.block_type = None
                    self.block_index = None
                    self.block_signature = None
                    self.signature_emitted = False
                    self.tool_use_id = None

                def start(self, block: AnthropicContentBlock) -> None:
                    self.block_type = block.type
                    self.block_index = self.content_block_index
                    if block.type == "thinking":
                        self.block_signature = uuid.uuid4().hex
                        self.signature_emitted = False
                        self.tool_use_id = None
                    elif block.type == "tool_use":
                        self.block_signature = None
                        self.signature_emitted = True
                        self.tool_use_id = block.id
                    else:
                        self.block_signature = None
                        self.signature_emitted = True
                        self.tool_use_id = None

588
589
            first_item = True
            finish_reason = None
zhangning3's avatar
zhangning3 committed
590
591
592
593
594
595
            content_block_index = 0
            active_block_type: str | None = None
            active_block_index: int | None = None
            active_block_signature: str | None = None
            signature_emitted = False
            active_tool_use_id: str | None = None
596
597
598
599
            # Map from tool call index to tool_use_id
            tool_index_to_id: dict[int, str] = {}

            def stop_active_block():
zhangning3's avatar
zhangning3 committed
600
601
                nonlocal active_block_type, active_block_index, content_block_index
                nonlocal active_block_signature, signature_emitted, active_tool_use_id
602
                events: list[str] = []
zhangning3's avatar
zhangning3 committed
603
                if active_block_type is None:
604
605
                    return events
                if (
zhangning3's avatar
zhangning3 committed
606
607
608
                    active_block_type == "thinking"
                    and active_block_signature is not None
                    and not signature_emitted
609
610
                ):
                    chunk = AnthropicStreamEvent(
zhangning3's avatar
zhangning3 committed
611
                        index=active_block_index,
612
613
614
                        type="content_block_delta",
                        delta=AnthropicDelta(
                            type="signature_delta",
zhangning3's avatar
zhangning3 committed
615
                            signature=active_block_signature,
616
617
618
619
                        ),
                    )
                    data = chunk.model_dump_json(exclude_unset=True)
                    events.append(wrap_data_with_event(data, "content_block_delta"))
zhangning3's avatar
zhangning3 committed
620
                    signature_emitted = True
621
                stop_chunk = AnthropicStreamEvent(
zhangning3's avatar
zhangning3 committed
622
                    index=active_block_index,
623
624
625
626
                    type="content_block_stop",
                )
                data = stop_chunk.model_dump_json(exclude_unset=True)
                events.append(wrap_data_with_event(data, "content_block_stop"))
zhangning3's avatar
zhangning3 committed
627
628
629
630
631
632
                active_block_type = None
                active_block_index = None
                active_block_signature = None
                signature_emitted = False
                active_tool_use_id = None
                content_block_index += 1
633
634
635
                return events

            def start_block(block: AnthropicContentBlock):
zhangning3's avatar
zhangning3 committed
636
637
                nonlocal active_block_type, active_block_index, content_block_index
                nonlocal active_block_signature, signature_emitted, active_tool_use_id
638
                chunk = AnthropicStreamEvent(
zhangning3's avatar
zhangning3 committed
639
                    index=content_block_index,
640
641
642
643
644
                    type="content_block_start",
                    content_block=block,
                )
                data = chunk.model_dump_json(exclude_unset=True)
                event = wrap_data_with_event(data, "content_block_start")
zhangning3's avatar
zhangning3 committed
645
646
647
648
649
650
651
652
653
654
655
656
657
658
                active_block_type = block.type
                active_block_index = content_block_index
                if block.type == "thinking":
                    active_block_signature = uuid.uuid4().hex
                    signature_emitted = False
                    active_tool_use_id = None
                elif block.type == "tool_use":
                    active_block_signature = None
                    signature_emitted = True
                    active_tool_use_id = block.id
                else:
                    active_block_signature = None
                    signature_emitted = True
                    active_tool_use_id = None
659
                return event
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683

            async for item in generator:
                if item.startswith("data:"):
                    data_str = item[5:].strip().rstrip("\n")
                    if data_str == "[DONE]":
                        stop_message = AnthropicStreamEvent(
                            type="message_stop",
                        )
                        data = stop_message.model_dump_json(
                            exclude_unset=True, exclude_none=True
                        )
                        yield wrap_data_with_event(data, "message_stop")
                    else:
                        origin_chunk = ChatCompletionStreamResponse.model_validate_json(
                            data_str
                        )

                        if first_item:
                            chunk = AnthropicStreamEvent(
                                type="message_start",
                                message=AnthropicMessagesResponse(
                                    id=origin_chunk.id,
                                    content=[],
                                    model=origin_chunk.model,
684
685
                                    stop_reason=None,
                                    stop_sequence=None,
686
687
688
689
690
691
                                    usage=AnthropicUsage(
                                        input_tokens=origin_chunk.usage.prompt_tokens
                                        if origin_chunk.usage
                                        else 0,
                                        output_tokens=0,
                                    ),
692
                                ),
693
694
695
696
697
698
699
700
                            )
                            first_item = False
                            data = chunk.model_dump_json(exclude_unset=True)
                            yield wrap_data_with_event(data, "message_start")
                            continue

                        # last chunk including usage info
                        if len(origin_chunk.choices) == 0:
701
702
                            for event in stop_active_block():
                                yield event
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
                            stop_reason = self.stop_reason_map.get(
                                finish_reason or "stop"
                            )
                            chunk = AnthropicStreamEvent(
                                type="message_delta",
                                delta=AnthropicDelta(stop_reason=stop_reason),
                                usage=AnthropicUsage(
                                    input_tokens=origin_chunk.usage.prompt_tokens
                                    if origin_chunk.usage
                                    else 0,
                                    output_tokens=origin_chunk.usage.completion_tokens
                                    if origin_chunk.usage
                                    else 0,
                                ),
                            )
                            data = chunk.model_dump_json(exclude_unset=True)
                            yield wrap_data_with_event(data, "message_delta")
                            continue

                        if origin_chunk.choices[0].finish_reason is not None:
                            finish_reason = origin_chunk.choices[0].finish_reason
724
                            # continue
725

726
727
728
729
730
731
                        # thinking / text content
                        reasoning_delta = origin_chunk.choices[0].delta.reasoning
                        if reasoning_delta is not None:
                            if reasoning_delta == "":
                                pass
                            else:
zhangning3's avatar
zhangning3 committed
732
                                if active_block_type != "thinking":
733
734
735
736
737
738
739
740
                                    for event in stop_active_block():
                                        yield event
                                    start_event = start_block(
                                        AnthropicContentBlock(
                                            type="thinking", thinking=""
                                        )
                                    )
                                    yield start_event
741
                                chunk = AnthropicStreamEvent(
742
                                    index=(
zhangning3's avatar
zhangning3 committed
743
744
745
                                        active_block_index
                                        if active_block_index is not None
                                        else content_block_index
746
747
748
749
750
                                    ),
                                    type="content_block_delta",
                                    delta=AnthropicDelta(
                                        type="thinking_delta",
                                        thinking=reasoning_delta,
751
752
753
                                    ),
                                )
                                data = chunk.model_dump_json(exclude_unset=True)
754
                                yield wrap_data_with_event(data, "content_block_delta")
755

756
                        if origin_chunk.choices[0].delta.content is not None:
757
                            if origin_chunk.choices[0].delta.content == "":
758
759
                                pass
                            else:
zhangning3's avatar
zhangning3 committed
760
                                if active_block_type != "text":
761
762
763
764
                                    for event in stop_active_block():
                                        yield event
                                    start_event = start_block(
                                        AnthropicContentBlock(type="text", text="")
765
                                    )
766
                                    yield start_event
767
                                chunk = AnthropicStreamEvent(
768
                                    index=(
zhangning3's avatar
zhangning3 committed
769
770
771
                                        active_block_index
                                        if active_block_index is not None
                                        else content_block_index
772
773
774
                                    ),
                                    type="content_block_delta",
                                    delta=AnthropicDelta(
775
776
                                        type="text_delta",
                                        text=origin_chunk.choices[0].delta.content,
777
778
779
780
                                    ),
                                )
                                data = chunk.model_dump_json(exclude_unset=True)
                                yield wrap_data_with_event(data, "content_block_delta")
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795

                        # tool calls - process all tool calls in the delta
                        if len(origin_chunk.choices[0].delta.tool_calls) > 0:
                            for tool_call in origin_chunk.choices[0].delta.tool_calls:
                                if tool_call.id is not None:
                                    # Update mapping for incremental updates
                                    tool_index_to_id[tool_call.index] = tool_call.id
                                    # Only create new block if different tool call
                                    # AND has a name
                                    tool_name = (
                                        tool_call.function.name
                                        if tool_call.function
                                        else None
                                    )
                                    if (
zhangning3's avatar
zhangning3 committed
796
                                        active_tool_use_id != tool_call.id
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
                                        and tool_name is not None
                                    ):
                                        for event in stop_active_block():
                                            yield event
                                        start_event = start_block(
                                            AnthropicContentBlock(
                                                type="tool_use",
                                                id=tool_call.id,
                                                name=tool_name,
                                                input={},
                                            )
                                        )
                                        yield start_event
                                    # Handle initial arguments if present
                                    if (
                                        tool_call.function
                                        and tool_call.function.arguments
zhangning3's avatar
zhangning3 committed
814
                                        and active_tool_use_id == tool_call.id
815
816
817
                                    ):
                                        chunk = AnthropicStreamEvent(
                                            index=(
zhangning3's avatar
zhangning3 committed
818
819
820
                                                active_block_index
                                                if active_block_index is not None
                                                else content_block_index
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
                                            ),
                                            type="content_block_delta",
                                            delta=AnthropicDelta(
                                                type="input_json_delta",
                                                partial_json=tool_call.function.arguments,
                                            ),
                                        )
                                        data = chunk.model_dump_json(exclude_unset=True)
                                        yield wrap_data_with_event(
                                            data, "content_block_delta"
                                        )
                                else:
                                    # Incremental update - use index to find tool_use_id
                                    tool_use_id = tool_index_to_id.get(tool_call.index)
                                    if (
                                        tool_use_id is not None
                                        and tool_call.function
                                        and tool_call.function.arguments
zhangning3's avatar
zhangning3 committed
839
                                        and active_tool_use_id == tool_use_id
840
841
842
                                    ):
                                        chunk = AnthropicStreamEvent(
                                            index=(
zhangning3's avatar
zhangning3 committed
843
844
845
                                                active_block_index
                                                if active_block_index is not None
                                                else content_block_index
846
847
848
849
850
851
852
853
854
855
856
                                            ),
                                            type="content_block_delta",
                                            delta=AnthropicDelta(
                                                type="input_json_delta",
                                                partial_json=tool_call.function.arguments,
                                            ),
                                        )
                                        data = chunk.model_dump_json(exclude_unset=True)
                                        yield wrap_data_with_event(
                                            data, "content_block_delta"
                                        )
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
                            continue
                else:
                    error_response = AnthropicStreamEvent(
                        type="error",
                        error=AnthropicError(
                            type="internal_error",
                            message="Invalid data format received",
                        ),
                    )
                    data = error_response.model_dump_json(exclude_unset=True)
                    yield wrap_data_with_event(data, "error")

        except Exception as e:
            logger.exception("Error in message stream converter.")
            error_response = AnthropicStreamEvent(
                type="error",
                error=AnthropicError(type="internal_error", message=str(e)),
            )
            data = error_response.model_dump_json(exclude_unset=True)
            yield wrap_data_with_event(data, "error")
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904

    async def count_tokens(
        self,
        request: AnthropicCountTokensRequest,
        raw_request: Request | None = None,
    ) -> AnthropicCountTokensResponse | ErrorResponse:
        """Implements Anthropic's messages.count_tokens endpoint."""
        chat_req = self._convert_anthropic_to_openai_request(request)
        result = await self.render_chat_request(chat_req)
        if isinstance(result, ErrorResponse):
            return result

        _, engine_prompts = result

        input_tokens = sum(  # type: ignore
            len(prompt["prompt_token_ids"])  # type: ignore[typeddict-item, misc]
            for prompt in engine_prompts
            if "prompt_token_ids" in prompt
        )

        response = AnthropicCountTokensResponse(
            input_tokens=input_tokens,
            context_management=AnthropicContextManagement(
                original_input_tokens=input_tokens
            ),
        )

        return response