# SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project # Adapted from # https://github.com/vllm/vllm/entrypoints/openai/serving_chat.py """Anthropic Messages API serving handler""" import json import logging import time import uuid from collections.abc import AsyncGenerator from typing import TYPE_CHECKING, Any from fastapi import Request from vllm.engine.protocol import EngineClient from vllm.entrypoints.anthropic.protocol import ( AnthropicContentBlock, AnthropicContextManagement, AnthropicCountTokensRequest, AnthropicCountTokensResponse, AnthropicDelta, AnthropicError, AnthropicMessagesRequest, AnthropicMessagesResponse, AnthropicStreamEvent, AnthropicUsage, ) from vllm.entrypoints.chat_utils import ChatTemplateContentFormatOption from vllm.entrypoints.logger import RequestLogger from vllm.entrypoints.openai.chat_completion.protocol import ( ChatCompletionNamedToolChoiceParam, ChatCompletionRequest, ChatCompletionResponse, ChatCompletionStreamResponse, ChatCompletionToolsParam, ) from vllm.entrypoints.openai.chat_completion.serving import OpenAIServingChat from vllm.entrypoints.openai.engine.protocol import ( ErrorResponse, StreamOptions, ) from vllm.entrypoints.openai.models.serving import OpenAIServingModels if TYPE_CHECKING: from vllm.entrypoints.serve.render.serving import OpenAIServingRender logger = logging.getLogger(__name__) def wrap_data_with_event(data: str, event: str): return f"event: {event}\ndata: {data}\n\n" class AnthropicServingMessages(OpenAIServingChat): """Handler for Anthropic Messages API requests""" def __init__( self, engine_client: EngineClient, models: OpenAIServingModels, response_role: str, *, openai_serving_render: "OpenAIServingRender", request_logger: RequestLogger | None, chat_template: str | None, chat_template_content_format: ChatTemplateContentFormatOption, return_tokens_as_token_ids: bool = False, reasoning_parser: str = "", enable_auto_tools: bool = False, tool_parser: str | None = None, enable_prompt_tokens_details: bool = False, enable_force_include_usage: bool = False, default_chat_template_kwargs: dict[str, Any] | None = None, ): super().__init__( engine_client=engine_client, models=models, response_role=response_role, openai_serving_render=openai_serving_render, request_logger=request_logger, chat_template=chat_template, chat_template_content_format=chat_template_content_format, return_tokens_as_token_ids=return_tokens_as_token_ids, reasoning_parser=reasoning_parser, enable_auto_tools=enable_auto_tools, tool_parser=tool_parser, enable_prompt_tokens_details=enable_prompt_tokens_details, enable_force_include_usage=enable_force_include_usage, default_chat_template_kwargs=default_chat_template_kwargs, ) self.stop_reason_map = { "stop": "end_turn", "length": "max_tokens", "tool_calls": "tool_use", } @staticmethod def _convert_image_source_to_url(source: dict[str, Any]) -> str: """Convert an Anthropic image source to an OpenAI-compatible URL. Anthropic supports two image source types: - base64: {"type": "base64", "media_type": "image/jpeg", "data": "..."} - url: {"type": "url", "url": "https://..."} For base64 sources, this constructs a proper data URI that downstream processors (e.g. vLLM's media connector) can handle. """ source_type = source.get("type") if source_type == "url": return source.get("url", "") # Default to base64 processing if type is "base64" # or missing, ensuring a proper data URI is always # constructed for non-URL sources. media_type = source.get("media_type", "image/jpeg") data = source.get("data", "") return f"data:{media_type};base64,{data}" @classmethod def _convert_anthropic_to_openai_request( cls, anthropic_request: AnthropicMessagesRequest | AnthropicCountTokensRequest ) -> ChatCompletionRequest: """Convert Anthropic message format to OpenAI format""" openai_messages: list[dict[str, Any]] = [] cls._convert_system_message(anthropic_request, openai_messages) cls._convert_messages(anthropic_request.messages, openai_messages) req = cls._build_base_request(anthropic_request, openai_messages) cls._handle_streaming_options(req, anthropic_request) cls._convert_tool_choice(anthropic_request, req) cls._convert_tools(anthropic_request, req) return req @classmethod def _convert_system_message( cls, anthropic_request: AnthropicMessagesRequest | AnthropicCountTokensRequest, openai_messages: list[dict[str, Any]], ) -> None: """Convert Anthropic system message to OpenAI format""" if not anthropic_request.system: return if isinstance(anthropic_request.system, str): openai_messages.append( {"role": "system", "content": anthropic_request.system} ) else: system_prompt = "" for block in anthropic_request.system: if block.type == "text" and block.text: # Strip Claude Code's attribution header which contains # a per-request hash that defeats prefix caching. if block.text.startswith("x-anthropic-billing-header"): continue system_prompt += block.text openai_messages.append({"role": "system", "content": system_prompt}) @classmethod def _convert_messages( cls, messages: list, openai_messages: list[dict[str, Any]] ) -> None: """Convert Anthropic messages to OpenAI format""" for msg in messages: openai_msg: dict[str, Any] = {"role": msg.role} # type: ignore if isinstance(msg.content, str): openai_msg["content"] = msg.content else: cls._convert_message_content(msg, openai_msg, openai_messages) if not (msg.role == "user" and "content" not in openai_msg): openai_messages.append(openai_msg) @classmethod def _convert_message_content( cls, msg, openai_msg: dict[str, Any], openai_messages: list[dict[str, Any]], ) -> None: """Convert complex message content blocks""" content_parts: list[dict[str, Any]] = [] tool_calls: list[dict[str, Any]] = [] reasoning_parts: list[str] = [] for block in msg.content: cls._convert_block( block, msg.role, content_parts, tool_calls, reasoning_parts, openai_messages, ) if reasoning_parts: openai_msg["reasoning"] = "".join(reasoning_parts) if tool_calls: openai_msg["tool_calls"] = tool_calls # type: ignore if content_parts: if len(content_parts) == 1 and content_parts[0]["type"] == "text": openai_msg["content"] = content_parts[0]["text"] else: openai_msg["content"] = content_parts # type: ignore elif not tool_calls and not reasoning_parts: return @classmethod def _convert_block( cls, block, role: str, content_parts: list[dict[str, Any]], tool_calls: list[dict[str, Any]], reasoning_parts: list[str], openai_messages: list[dict[str, Any]], ) -> None: """Convert individual content block""" if block.type == "text" and block.text: content_parts.append({"type": "text", "text": block.text}) elif block.type == "image" and block.source: image_url = cls._convert_image_source_to_url(block.source) content_parts.append({"type": "image_url", "image_url": {"url": image_url}}) elif block.type == "thinking" and block.thinking is not None: reasoning_parts.append(block.thinking) elif block.type == "redacted_thinking": # Redacted thinking blocks contain safety-filtered reasoning. # We skip them as the content is opaque (base64 'data' field), # but accepting the block prevents a validation error when the # client echoes back the full assistant message. pass elif block.type == "tool_use": cls._convert_tool_use_block(block, tool_calls) elif block.type == "tool_result": cls._convert_tool_result_block(block, role, openai_messages, content_parts) @classmethod def _convert_tool_use_block(cls, block, tool_calls: list[dict[str, Any]]) -> None: """Convert tool_use block to OpenAI function call format""" tool_call = { "id": block.id or f"call_{int(time.time())}", "type": "function", "function": { "name": block.name or "", "arguments": json.dumps(block.input or {}), }, } tool_calls.append(tool_call) @classmethod def _convert_tool_result_block( cls, block, role: str, openai_messages: list[dict[str, Any]], content_parts: list[dict[str, Any]], ) -> None: """Convert tool_result block to OpenAI format""" if role == "user": cls._convert_user_tool_result(block, openai_messages) else: tool_result_text = str(block.content) if block.content else "" content_parts.append( {"type": "text", "text": f"Tool result: {tool_result_text}"} ) @classmethod def _convert_user_tool_result( cls, block, openai_messages: list[dict[str, Any]] ) -> None: """Convert user tool_result with text and image support""" tool_text = "" tool_image_urls: list[str] = [] if isinstance(block.content, str): tool_text = block.content elif isinstance(block.content, list): text_parts: list[str] = [] for item in block.content: if not isinstance(item, dict): continue item_type = item.get("type") if item_type == "text": text_parts.append(item.get("text", "")) elif item_type == "image": source = item.get("source", {}) url = cls._convert_image_source_to_url(source) if url: tool_image_urls.append(url) tool_text = "\n".join(text_parts) openai_messages.append( { "role": "tool", "tool_call_id": block.tool_use_id or "", "content": tool_text or "", } ) if tool_image_urls: openai_messages.append( { "role": "user", "content": [ # type: ignore[dict-item] {"type": "image_url", "image_url": {"url": img}} for img in tool_image_urls ], } ) @classmethod def _build_base_request( cls, anthropic_request: AnthropicMessagesRequest | AnthropicCountTokensRequest, openai_messages: list[dict[str, Any]], ) -> ChatCompletionRequest: """Build base ChatCompletionRequest""" if isinstance(anthropic_request, AnthropicCountTokensRequest): return ChatCompletionRequest( model=anthropic_request.model, messages=openai_messages, ) return ChatCompletionRequest( model=anthropic_request.model, messages=openai_messages, max_tokens=anthropic_request.max_tokens, max_completion_tokens=anthropic_request.max_tokens, stop=anthropic_request.stop_sequences, temperature=anthropic_request.temperature, top_p=anthropic_request.top_p, top_k=anthropic_request.top_k, kv_transfer_params=anthropic_request.kv_transfer_params, ) @classmethod def _handle_streaming_options( cls, req: ChatCompletionRequest, anthropic_request: AnthropicMessagesRequest | AnthropicCountTokensRequest, ) -> None: """Handle streaming configuration""" if isinstance(anthropic_request, AnthropicCountTokensRequest): return if anthropic_request.stream: req.stream = anthropic_request.stream req.stream_options = StreamOptions.model_validate( {"include_usage": True, "continuous_usage_stats": True} ) @classmethod def _convert_tool_choice( cls, anthropic_request: AnthropicMessagesRequest | AnthropicCountTokensRequest, req: ChatCompletionRequest, ) -> None: """Convert Anthropic tool_choice to OpenAI format""" if anthropic_request.tool_choice is None: req.tool_choice = None return tool_choice_type = anthropic_request.tool_choice.type if tool_choice_type == "auto": req.tool_choice = "auto" elif tool_choice_type == "any": req.tool_choice = "required" elif tool_choice_type == "none": req.tool_choice = "none" elif tool_choice_type == "tool": req.tool_choice = ChatCompletionNamedToolChoiceParam.model_validate( { "type": "function", "function": {"name": anthropic_request.tool_choice.name}, } ) @classmethod def _convert_tools( cls, anthropic_request: AnthropicMessagesRequest | AnthropicCountTokensRequest, req: ChatCompletionRequest, ) -> None: """Convert Anthropic tools to OpenAI format""" if anthropic_request.tools is None: return tools = [] for tool in anthropic_request.tools: tools.append( ChatCompletionToolsParam.model_validate( { "type": "function", "function": { "name": tool.name, "description": tool.description, "parameters": tool.input_schema, }, } ) ) if req.tool_choice is None: req.tool_choice = "auto" req.tools = tools async def create_messages( self, request: AnthropicMessagesRequest, raw_request: Request | None = None, ) -> AsyncGenerator[str, None] | AnthropicMessagesResponse | ErrorResponse: """ Messages API similar to Anthropic's API. See https://docs.anthropic.com/en/api/messages for the API specification. This API mimics the Anthropic messages API. """ if logger.isEnabledFor(logging.DEBUG): logger.debug("Received messages request %s", request.model_dump_json()) chat_req = self._convert_anthropic_to_openai_request(request) if logger.isEnabledFor(logging.DEBUG): logger.debug("Convert to OpenAI request %s", chat_req.model_dump_json()) generator = await self.create_chat_completion(chat_req, raw_request) if isinstance(generator, ErrorResponse): return generator elif isinstance(generator, ChatCompletionResponse): return self.messages_full_converter(generator) return self.message_stream_converter(generator) def messages_full_converter( self, generator: ChatCompletionResponse, ) -> AnthropicMessagesResponse: result = AnthropicMessagesResponse( id=generator.id, content=[], model=generator.model, usage=AnthropicUsage( input_tokens=generator.usage.prompt_tokens, output_tokens=generator.usage.completion_tokens, ), kv_transfer_params=generator.kv_transfer_params, ) choice = generator.choices[0] if choice.finish_reason == "stop": result.stop_reason = "end_turn" elif choice.finish_reason == "length": result.stop_reason = "max_tokens" elif choice.finish_reason == "tool_calls": result.stop_reason = "tool_use" content: list[AnthropicContentBlock] = [] if choice.message.reasoning: content.append( AnthropicContentBlock( type="thinking", thinking=choice.message.reasoning, signature=uuid.uuid4().hex, ) ) if choice.message.content: content.append( AnthropicContentBlock( type="text", text=choice.message.content, ) ) for tool_call in choice.message.tool_calls: anthropic_tool_call = AnthropicContentBlock( type="tool_use", id=tool_call.id, name=tool_call.function.name, input=json.loads(tool_call.function.arguments), ) content += [anthropic_tool_call] result.content = content return result async def message_stream_converter( self, generator: AsyncGenerator[str, None], ) -> AsyncGenerator[str, None]: try: class _ActiveBlockState: def __init__(self) -> None: self.content_block_index = 0 self.block_type: str | None = None self.block_index: int | None = None self.block_signature: str | None = None self.signature_emitted: bool = False self.tool_use_id: str | None = None def reset(self) -> None: self.block_type = None self.block_index = None self.block_signature = None self.signature_emitted = False self.tool_use_id = None def start(self, block: AnthropicContentBlock) -> None: self.block_type = block.type self.block_index = self.content_block_index if block.type == "thinking": self.block_signature = uuid.uuid4().hex self.signature_emitted = False self.tool_use_id = None elif block.type == "tool_use": self.block_signature = None self.signature_emitted = True self.tool_use_id = block.id else: self.block_signature = None self.signature_emitted = True self.tool_use_id = None first_item = True finish_reason = None state = _ActiveBlockState() # Map from tool call index to tool_use_id tool_index_to_id: dict[int, str] = {} def stop_active_block(): events: list[str] = [] if state.block_type is None: return events if ( state.block_type == "thinking" and state.block_signature is not None and not state.signature_emitted ): chunk = AnthropicStreamEvent( index=state.block_index, type="content_block_delta", delta=AnthropicDelta( type="signature_delta", signature=state.block_signature, ), ) data = chunk.model_dump_json(exclude_unset=True) events.append(wrap_data_with_event(data, "content_block_delta")) state.signature_emitted = True stop_chunk = AnthropicStreamEvent( index=state.block_index, type="content_block_stop", ) data = stop_chunk.model_dump_json(exclude_unset=True) events.append(wrap_data_with_event(data, "content_block_stop")) state.reset() state.content_block_index += 1 return events def start_block(block: AnthropicContentBlock): chunk = AnthropicStreamEvent( index=state.content_block_index, type="content_block_start", content_block=block, ) data = chunk.model_dump_json(exclude_unset=True) event = wrap_data_with_event(data, "content_block_start") state.start(block) return event async for item in generator: if item.startswith("data:"): data_str = item[5:].strip().rstrip("\n") if data_str == "[DONE]": stop_message = AnthropicStreamEvent( type="message_stop", ) data = stop_message.model_dump_json( exclude_unset=True, exclude_none=True ) yield wrap_data_with_event(data, "message_stop") else: origin_chunk = ChatCompletionStreamResponse.model_validate_json( data_str ) if first_item: chunk = AnthropicStreamEvent( type="message_start", message=AnthropicMessagesResponse( id=origin_chunk.id, content=[], model=origin_chunk.model, stop_reason=None, stop_sequence=None, usage=AnthropicUsage( input_tokens=origin_chunk.usage.prompt_tokens if origin_chunk.usage else 0, output_tokens=0, ), ), ) first_item = False data = chunk.model_dump_json(exclude_unset=True) yield wrap_data_with_event(data, "message_start") continue # last chunk including usage info if len(origin_chunk.choices) == 0: for event in stop_active_block(): yield event stop_reason = self.stop_reason_map.get( finish_reason or "stop" ) chunk = AnthropicStreamEvent( type="message_delta", delta=AnthropicDelta(stop_reason=stop_reason), usage=AnthropicUsage( input_tokens=origin_chunk.usage.prompt_tokens if origin_chunk.usage else 0, output_tokens=origin_chunk.usage.completion_tokens if origin_chunk.usage else 0, ), ) data = chunk.model_dump_json(exclude_unset=True) yield wrap_data_with_event(data, "message_delta") continue if origin_chunk.choices[0].finish_reason is not None: finish_reason = origin_chunk.choices[0].finish_reason # continue # thinking / text content reasoning_delta = origin_chunk.choices[0].delta.reasoning if reasoning_delta is not None: if reasoning_delta == "": pass else: if state.block_type != "thinking": for event in stop_active_block(): yield event start_event = start_block( AnthropicContentBlock( type="thinking", thinking="" ) ) yield start_event chunk = AnthropicStreamEvent( index=( state.block_index if state.block_index is not None else state.content_block_index ), type="content_block_delta", delta=AnthropicDelta( type="thinking_delta", thinking=reasoning_delta, ), ) data = chunk.model_dump_json(exclude_unset=True) yield wrap_data_with_event(data, "content_block_delta") if origin_chunk.choices[0].delta.content is not None: if origin_chunk.choices[0].delta.content == "": pass else: if state.block_type != "text": for event in stop_active_block(): yield event start_event = start_block( AnthropicContentBlock(type="text", text="") ) yield start_event chunk = AnthropicStreamEvent( index=( state.block_index if state.block_index is not None else state.content_block_index ), type="content_block_delta", delta=AnthropicDelta( type="text_delta", text=origin_chunk.choices[0].delta.content, ), ) data = chunk.model_dump_json(exclude_unset=True) yield wrap_data_with_event(data, "content_block_delta") # tool calls - process all tool calls in the delta if len(origin_chunk.choices[0].delta.tool_calls) > 0: for tool_call in origin_chunk.choices[0].delta.tool_calls: if tool_call.id is not None: # Update mapping for incremental updates tool_index_to_id[tool_call.index] = tool_call.id # Only create new block if different tool call # AND has a name tool_name = ( tool_call.function.name if tool_call.function else None ) if ( state.tool_use_id != tool_call.id and tool_name is not None ): for event in stop_active_block(): yield event start_event = start_block( AnthropicContentBlock( type="tool_use", id=tool_call.id, name=tool_name, input={}, ) ) yield start_event # Handle initial arguments if present if ( tool_call.function and tool_call.function.arguments and state.tool_use_id == tool_call.id ): chunk = AnthropicStreamEvent( index=( state.block_index if state.block_index is not None else state.content_block_index ), type="content_block_delta", delta=AnthropicDelta( type="input_json_delta", partial_json=tool_call.function.arguments, ), ) data = chunk.model_dump_json(exclude_unset=True) yield wrap_data_with_event( data, "content_block_delta" ) else: # Incremental update - use index to find tool_use_id tool_use_id = tool_index_to_id.get(tool_call.index) if ( tool_use_id is not None and tool_call.function and tool_call.function.arguments and state.tool_use_id == tool_use_id ): chunk = AnthropicStreamEvent( index=( state.block_index if state.block_index is not None else state.content_block_index ), type="content_block_delta", delta=AnthropicDelta( type="input_json_delta", partial_json=tool_call.function.arguments, ), ) data = chunk.model_dump_json(exclude_unset=True) yield wrap_data_with_event( data, "content_block_delta" ) continue else: error_response = AnthropicStreamEvent( type="error", error=AnthropicError( type="internal_error", message="Invalid data format received", ), ) data = error_response.model_dump_json(exclude_unset=True) yield wrap_data_with_event(data, "error") except Exception as e: logger.exception("Error in message stream converter.") error_response = AnthropicStreamEvent( type="error", error=AnthropicError(type="internal_error", message=str(e)), ) data = error_response.model_dump_json(exclude_unset=True) yield wrap_data_with_event(data, "error") async def count_tokens( self, request: AnthropicCountTokensRequest, raw_request: Request | None = None, ) -> AnthropicCountTokensResponse | ErrorResponse: """Implements Anthropic's messages.count_tokens endpoint.""" chat_req = self._convert_anthropic_to_openai_request(request) result = await self.render_chat_request(chat_req) if isinstance(result, ErrorResponse): return result _, engine_inputs = result input_tokens = sum( # type: ignore len(engine_input["prompt_token_ids"]) # type: ignore[typeddict-item, misc] for engine_input in engine_inputs if "prompt_token_ids" in engine_input ) response = AnthropicCountTokensResponse( input_tokens=input_tokens, context_management=AnthropicContextManagement( original_input_tokens=input_tokens ), ) return response