Unverified Commit ed31a020 authored by Flora Feng's avatar Flora Feng Committed by GitHub
Browse files

[Refactor] Extract Harmony streaming SSE event builders into streaming_events.py (#34909)


Signed-off-by: default avatarsfeng33 <4florafeng@gmail.com>
Co-authored-by: default avatarCyrus Leung <tlleungac@connect.ust.hk>
parent f9ac1920
...@@ -2,36 +2,22 @@ ...@@ -2,36 +2,22 @@
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project # SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import asyncio import asyncio
import json
import time import time
import uuid import uuid
from collections import deque from collections import deque
from collections.abc import AsyncGenerator, AsyncIterator, Callable, Sequence from collections.abc import AsyncGenerator, AsyncIterator, Callable, Sequence
from contextlib import AsyncExitStack from contextlib import AsyncExitStack
from copy import copy from copy import copy
from dataclasses import dataclass, replace from dataclasses import replace
from http import HTTPStatus from http import HTTPStatus
from typing import Final from typing import Final
import jinja2 import jinja2
from fastapi import Request from fastapi import Request
from openai.types.responses import ( from openai.types.responses import (
ResponseCodeInterpreterCallCodeDeltaEvent,
ResponseCodeInterpreterCallCodeDoneEvent,
ResponseCodeInterpreterCallCompletedEvent,
ResponseCodeInterpreterCallInProgressEvent,
ResponseCodeInterpreterCallInterpretingEvent,
ResponseCodeInterpreterToolCallParam,
ResponseContentPartAddedEvent, ResponseContentPartAddedEvent,
ResponseContentPartDoneEvent, ResponseContentPartDoneEvent,
ResponseFunctionCallArgumentsDeltaEvent,
ResponseFunctionCallArgumentsDoneEvent,
ResponseFunctionToolCall, ResponseFunctionToolCall,
ResponseFunctionWebSearch,
ResponseMcpCallArgumentsDeltaEvent,
ResponseMcpCallArgumentsDoneEvent,
ResponseMcpCallCompletedEvent,
ResponseMcpCallInProgressEvent,
ResponseOutputItem, ResponseOutputItem,
ResponseOutputItemAddedEvent, ResponseOutputItemAddedEvent,
ResponseOutputItemDoneEvent, ResponseOutputItemDoneEvent,
...@@ -43,13 +29,8 @@ from openai.types.responses import ( ...@@ -43,13 +29,8 @@ from openai.types.responses import (
ResponseStatus, ResponseStatus,
ResponseTextDeltaEvent, ResponseTextDeltaEvent,
ResponseTextDoneEvent, ResponseTextDoneEvent,
ResponseWebSearchCallCompletedEvent,
ResponseWebSearchCallInProgressEvent,
ResponseWebSearchCallSearchingEvent,
response_function_web_search,
response_text_delta_event, response_text_delta_event,
) )
from openai.types.responses.response_output_item import McpCall
from openai.types.responses.response_output_text import Logprob, LogprobTopLogprob from openai.types.responses.response_output_text import Logprob, LogprobTopLogprob
from openai.types.responses.response_reasoning_item import ( from openai.types.responses.response_reasoning_item import (
Content as ResponseReasoningTextContent, Content as ResponseReasoningTextContent,
...@@ -102,13 +83,17 @@ from vllm.entrypoints.openai.responses.protocol import ( ...@@ -102,13 +83,17 @@ from vllm.entrypoints.openai.responses.protocol import (
ResponseCreatedEvent, ResponseCreatedEvent,
ResponseInProgressEvent, ResponseInProgressEvent,
ResponseInputOutputMessage, ResponseInputOutputMessage,
ResponseReasoningPartAddedEvent,
ResponseReasoningPartDoneEvent,
ResponsesRequest, ResponsesRequest,
ResponsesResponse, ResponsesResponse,
ResponseUsage, ResponseUsage,
StreamingResponsesResponse, StreamingResponsesResponse,
) )
from vllm.entrypoints.openai.responses.streaming_events import (
HarmonyStreamingState,
emit_content_delta_events,
emit_previous_item_done_events,
emit_tool_action_events,
)
from vllm.entrypoints.openai.responses.utils import ( from vllm.entrypoints.openai.responses.utils import (
construct_input_messages, construct_input_messages,
construct_tool_dicts, construct_tool_dicts,
...@@ -129,23 +114,6 @@ from vllm.utils import random_uuid ...@@ -129,23 +114,6 @@ from vllm.utils import random_uuid
logger = init_logger(__name__) logger = init_logger(__name__)
@dataclass
class HarmonyStreamingState:
"""Mutable state for harmony streaming event processing."""
current_content_index: int = -1
current_output_index: int = 0
current_item_id: str = ""
sent_output_item_added: bool = False
is_first_function_call_delta: bool = False
def reset_for_new_item(self) -> None:
"""Reset state when expecting a new output item."""
self.current_output_index += 1
self.sent_output_item_added = False
self.is_first_function_call_delta = False
def _extract_allowed_tools_from_mcp_requests( def _extract_allowed_tools_from_mcp_requests(
tools: list[Tool], tools: list[Tool],
) -> dict[str, list[str] | None]: ) -> dict[str, list[str] | None]:
...@@ -817,26 +785,6 @@ class OpenAIServingResponses(OpenAIServing): ...@@ -817,26 +785,6 @@ class OpenAIServingResponses(OpenAIServing):
self.response_store[response.id] = response self.response_store[response.id] = response
return response return response
def _is_mcp_tool_by_namespace(self, recipient: str | None) -> bool:
"""
Determine if a tool call is an MCP tool based on recipient prefix.
- Tools starting with "functions." are function calls
- Everything else is an MCP tool
"""
if recipient is None:
return False
# Function calls have "functions." prefix
# Everything else is an MCP tool
return not recipient.startswith("functions.")
_TOOL_NAME_TO_MCP_SERVER_LABEL: Final[dict[str, str]] = {
"python": "code_interpreter",
"container": "container",
"browser": "web_search_preview",
}
def _topk_logprobs( def _topk_logprobs(
self, self,
logprobs: dict[int, SampleLogprob], logprobs: dict[int, SampleLogprob],
...@@ -1605,816 +1553,6 @@ class OpenAIServingResponses(OpenAIServing): ...@@ -1605,816 +1553,6 @@ class OpenAIServingResponses(OpenAIServing):
) )
) )
def _emit_function_call_done_events(
self,
previous_item,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events when a function call completes."""
function_name = previous_item.recipient[len("functions.") :]
events = []
events.append(
ResponseFunctionCallArgumentsDoneEvent(
type="response.function_call_arguments.done",
arguments=previous_item.content[0].text,
name=function_name,
item_id=state.current_item_id,
output_index=state.current_output_index,
sequence_number=-1,
)
)
function_call_item = ResponseFunctionToolCall(
type="function_call",
arguments=previous_item.content[0].text,
name=function_name,
item_id=state.current_item_id,
output_index=state.current_output_index,
sequence_number=-1,
call_id=f"fc_{random_uuid()}",
status="completed",
)
events.append(
ResponseOutputItemDoneEvent(
type="response.output_item.done",
sequence_number=-1,
output_index=state.current_output_index,
item=function_call_item,
)
)
return events
def _emit_mcp_call_done_events(
self,
previous_item,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events when an MCP tool call completes."""
server_label = self._TOOL_NAME_TO_MCP_SERVER_LABEL.get(
previous_item.recipient, previous_item.recipient
)
events = []
events.append(
ResponseMcpCallArgumentsDoneEvent(
type="response.mcp_call_arguments.done",
arguments=previous_item.content[0].text,
name=previous_item.recipient,
item_id=state.current_item_id,
output_index=state.current_output_index,
sequence_number=-1,
)
)
events.append(
ResponseMcpCallCompletedEvent(
type="response.mcp_call.completed",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
)
)
events.append(
ResponseOutputItemDoneEvent(
type="response.output_item.done",
sequence_number=-1,
output_index=state.current_output_index,
item=McpCall(
type="mcp_call",
arguments=previous_item.content[0].text,
name=previous_item.recipient,
id=state.current_item_id,
server_label=server_label,
status="completed",
),
)
)
return events
def _emit_reasoning_done_events(
self,
previous_item,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events when a reasoning (analysis) item completes."""
content = ResponseReasoningTextContent(
text=previous_item.content[0].text,
type="reasoning_text",
)
reasoning_item = ResponseReasoningItem(
type="reasoning",
content=[content],
status="completed",
id=state.current_item_id,
summary=[],
)
events = []
events.append(
ResponseReasoningTextDoneEvent(
type="response.reasoning_text.done",
item_id=state.current_item_id,
sequence_number=-1,
output_index=state.current_output_index,
content_index=state.current_content_index,
text=previous_item.content[0].text,
)
)
events.append(
ResponseReasoningPartDoneEvent(
type="response.reasoning_part.done",
sequence_number=-1,
item_id=state.current_item_id,
output_index=state.current_output_index,
content_index=state.current_content_index,
part=content,
)
)
events.append(
ResponseOutputItemDoneEvent(
type="response.output_item.done",
sequence_number=-1,
output_index=state.current_output_index,
item=reasoning_item,
)
)
return events
def _emit_text_output_done_events(
self,
previous_item,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events when a final text output item completes."""
text_content = ResponseOutputText(
type="output_text",
text=previous_item.content[0].text,
annotations=[],
)
events = []
events.append(
ResponseTextDoneEvent(
type="response.output_text.done",
sequence_number=-1,
output_index=state.current_output_index,
content_index=state.current_content_index,
text=previous_item.content[0].text,
logprobs=[],
item_id=state.current_item_id,
)
)
events.append(
ResponseContentPartDoneEvent(
type="response.content_part.done",
sequence_number=-1,
item_id=state.current_item_id,
output_index=state.current_output_index,
content_index=state.current_content_index,
part=text_content,
)
)
events.append(
ResponseOutputItemDoneEvent(
type="response.output_item.done",
sequence_number=-1,
output_index=state.current_output_index,
item=ResponseOutputMessage(
id=state.current_item_id,
type="message",
role="assistant",
content=[text_content],
status="completed",
),
)
)
return events
def _emit_previous_item_done_events(
self,
previous_item,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit done events for the previous item when expecting a new start."""
if previous_item.recipient is not None:
# Deal with tool call
if previous_item.recipient.startswith("functions."):
return self._emit_function_call_done_events(previous_item, state)
elif (
self._is_mcp_tool_by_namespace(previous_item.recipient)
and state.current_item_id is not None
and state.current_item_id.startswith("mcp_")
):
return self._emit_mcp_call_done_events(previous_item, state)
elif previous_item.channel == "analysis":
return self._emit_reasoning_done_events(previous_item, state)
elif previous_item.channel == "final":
return self._emit_text_output_done_events(previous_item, state)
return []
def _emit_final_channel_delta_events(
self,
ctx: StreamingHarmonyContext,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events for final channel text delta streaming."""
events = []
if not state.sent_output_item_added:
state.sent_output_item_added = True
state.current_item_id = f"msg_{random_uuid()}"
events.append(
ResponseOutputItemAddedEvent(
type="response.output_item.added",
sequence_number=-1,
output_index=state.current_output_index,
item=ResponseOutputMessage(
id=state.current_item_id,
type="message",
role="assistant",
content=[],
status="in_progress",
),
)
)
state.current_content_index += 1
events.append(
ResponseContentPartAddedEvent(
type="response.content_part.added",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
content_index=state.current_content_index,
part=ResponseOutputText(
type="output_text",
text="",
annotations=[],
logprobs=[],
),
)
)
events.append(
ResponseTextDeltaEvent(
type="response.output_text.delta",
sequence_number=-1,
content_index=state.current_content_index,
output_index=state.current_output_index,
item_id=state.current_item_id,
delta=ctx.last_content_delta,
# TODO, use logprobs from ctx.last_request_output
logprobs=[],
)
)
return events
def _emit_analysis_channel_delta_events(
self,
ctx: StreamingHarmonyContext,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events for analysis channel reasoning delta streaming."""
events = []
if not state.sent_output_item_added:
state.sent_output_item_added = True
state.current_item_id = f"msg_{random_uuid()}"
events.append(
ResponseOutputItemAddedEvent(
type="response.output_item.added",
sequence_number=-1,
output_index=state.current_output_index,
item=ResponseReasoningItem(
type="reasoning",
id=state.current_item_id,
summary=[],
status="in_progress",
),
)
)
state.current_content_index += 1
events.append(
ResponseReasoningPartAddedEvent(
type="response.reasoning_part.added",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
content_index=state.current_content_index,
part=ResponseReasoningTextContent(
text="",
type="reasoning_text",
),
)
)
events.append(
ResponseReasoningTextDeltaEvent(
type="response.reasoning_text.delta",
item_id=state.current_item_id,
output_index=state.current_output_index,
content_index=state.current_content_index,
delta=ctx.last_content_delta,
sequence_number=-1,
)
)
return events
def _emit_mcp_tool_delta_events(
self,
ctx: StreamingHarmonyContext,
state: HarmonyStreamingState,
recipient: str,
) -> list[StreamingResponsesResponse]:
"""Emit events for MCP tool delta streaming."""
server_label = self._TOOL_NAME_TO_MCP_SERVER_LABEL.get(recipient, recipient)
events = []
if not state.sent_output_item_added:
state.sent_output_item_added = True
state.current_item_id = f"mcp_{random_uuid()}"
events.append(
ResponseOutputItemAddedEvent(
type="response.output_item.added",
sequence_number=-1,
output_index=state.current_output_index,
item=McpCall(
type="mcp_call",
id=state.current_item_id,
name=recipient,
arguments="",
server_label=server_label,
status="in_progress",
),
)
)
events.append(
ResponseMcpCallInProgressEvent(
type="response.mcp_call.in_progress",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
)
)
events.append(
ResponseMcpCallArgumentsDeltaEvent(
type="response.mcp_call_arguments.delta",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
delta=ctx.last_content_delta,
)
)
return events
def _emit_code_interpreter_delta_events(
self,
ctx: StreamingHarmonyContext,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events for code interpreter delta streaming."""
events = []
if not state.sent_output_item_added:
state.sent_output_item_added = True
state.current_item_id = f"tool_{random_uuid()}"
events.append(
ResponseOutputItemAddedEvent(
type="response.output_item.added",
sequence_number=-1,
output_index=state.current_output_index,
item=ResponseCodeInterpreterToolCallParam(
type="code_interpreter_call",
id=state.current_item_id,
code=None,
container_id="auto",
outputs=None,
status="in_progress",
),
)
)
events.append(
ResponseCodeInterpreterCallInProgressEvent(
type="response.code_interpreter_call.in_progress",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
)
)
events.append(
ResponseCodeInterpreterCallCodeDeltaEvent(
type="response.code_interpreter_call_code.delta",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
delta=ctx.last_content_delta,
)
)
return events
def _emit_mcp_prefix_delta_events(
self,
ctx: StreamingHarmonyContext,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events for MCP prefix (mcp.*) delta streaming."""
events = []
if not state.sent_output_item_added:
state.sent_output_item_added = True
state.current_item_id = f"mcp_{random_uuid()}"
mcp_name = ctx.parser.current_recipient[len("mcp.") :]
events.append(
ResponseOutputItemAddedEvent(
type="response.output_item.added",
sequence_number=-1,
output_index=state.current_output_index,
item=McpCall(
type="mcp_call",
id=state.current_item_id,
name=mcp_name,
arguments="",
server_label=mcp_name,
status="in_progress",
),
)
)
events.append(
ResponseMcpCallInProgressEvent(
type="response.mcp_call.in_progress",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
)
)
events.append(
ResponseMcpCallArgumentsDeltaEvent(
type="response.mcp_call_arguments.delta",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
delta=ctx.last_content_delta,
)
)
return events
def _emit_content_delta_events(
self,
ctx: StreamingHarmonyContext,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events for content delta streaming based on channel type."""
if not ctx.last_content_delta:
return []
if (
ctx.parser.current_channel == "final"
and ctx.parser.current_recipient is None
):
return self._emit_final_channel_delta_events(ctx, state)
elif (
ctx.parser.current_channel == "analysis"
and ctx.parser.current_recipient is None
):
return self._emit_analysis_channel_delta_events(ctx, state)
# built-in tools will be triggered on the analysis channel
# However, occasionally built-in tools will
# still be output to commentary.
elif (
ctx.parser.current_channel == "commentary"
or ctx.parser.current_channel == "analysis"
) and ctx.parser.current_recipient is not None:
recipient = ctx.parser.current_recipient
# Check for function calls first - they have their own event handling
if recipient.startswith("functions."):
return self._emit_function_call_delta_events(ctx, state)
is_mcp_tool = self._is_mcp_tool_by_namespace(recipient)
if is_mcp_tool:
return self._emit_mcp_tool_delta_events(ctx, state, recipient)
else:
return self._emit_code_interpreter_delta_events(ctx, state)
elif (
(
ctx.parser.current_channel == "commentary"
or ctx.parser.current_channel == "analysis"
)
and ctx.parser.current_recipient is not None
and ctx.parser.current_recipient.startswith("mcp.")
):
return self._emit_mcp_prefix_delta_events(ctx, state)
return []
def _emit_browser_tool_events(
self,
previous_item,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events for browser tool calls (web search)."""
function_name = previous_item.recipient[len("browser.") :]
parsed_args = json.loads(previous_item.content[0].text)
action = None
if function_name == "search":
action = response_function_web_search.ActionSearch(
type="search",
query=parsed_args["query"],
)
elif function_name == "open":
action = response_function_web_search.ActionOpenPage(
type="open_page",
# TODO: translate to url
url=f"cursor:{parsed_args.get('cursor', '')}",
)
elif function_name == "find":
action = response_function_web_search.ActionFind(
type="find",
pattern=parsed_args["pattern"],
# TODO: translate to url
url=f"cursor:{parsed_args.get('cursor', '')}",
)
else:
raise ValueError(f"Unknown function name: {function_name}")
state.current_item_id = f"tool_{random_uuid()}"
events = []
events.append(
ResponseOutputItemAddedEvent(
type="response.output_item.added",
sequence_number=-1,
output_index=state.current_output_index,
item=response_function_web_search.ResponseFunctionWebSearch(
# TODO: generate a unique id for web search call
type="web_search_call",
id=state.current_item_id,
action=action,
status="in_progress",
),
)
)
events.append(
ResponseWebSearchCallInProgressEvent(
type="response.web_search_call.in_progress",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
)
)
events.append(
ResponseWebSearchCallSearchingEvent(
type="response.web_search_call.searching",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
)
)
# enqueue
events.append(
ResponseWebSearchCallCompletedEvent(
type="response.web_search_call.completed",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
)
)
events.append(
ResponseOutputItemDoneEvent(
type="response.output_item.done",
sequence_number=-1,
output_index=state.current_output_index,
item=ResponseFunctionWebSearch(
type="web_search_call",
id=state.current_item_id,
action=action,
status="completed",
),
)
)
return events
def _emit_mcp_tool_completion_events(
self,
previous_item,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events when an MCP tool completes during assistant action turn."""
recipient = previous_item.recipient
server_label = self._TOOL_NAME_TO_MCP_SERVER_LABEL.get(recipient, recipient)
events = []
events.append(
ResponseMcpCallArgumentsDoneEvent(
type="response.mcp_call_arguments.done",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
arguments=previous_item.content[0].text,
name=recipient,
)
)
events.append(
ResponseMcpCallCompletedEvent(
type="response.mcp_call.completed",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
)
)
events.append(
ResponseOutputItemDoneEvent(
type="response.output_item.done",
sequence_number=-1,
output_index=state.current_output_index,
item=McpCall(
type="mcp_call",
id=state.current_item_id,
name=recipient,
arguments=previous_item.content[0].text,
server_label=server_label,
status="completed",
),
)
)
return events
def _emit_code_interpreter_completion_events(
self,
previous_item,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events when code interpreter completes."""
events = []
events.append(
ResponseCodeInterpreterCallCodeDoneEvent(
type="response.code_interpreter_call_code.done",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
code=previous_item.content[0].text,
)
)
events.append(
ResponseCodeInterpreterCallInterpretingEvent(
type="response.code_interpreter_call.interpreting",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
)
)
events.append(
ResponseCodeInterpreterCallCompletedEvent(
type="response.code_interpreter_call.completed",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
)
)
events.append(
ResponseOutputItemDoneEvent(
type="response.output_item.done",
sequence_number=-1,
output_index=state.current_output_index,
item=ResponseCodeInterpreterToolCallParam(
type="code_interpreter_call",
id=state.current_item_id,
code=previous_item.content[0].text,
container_id="auto",
outputs=[],
status="completed",
),
)
)
return events
def _emit_mcp_prefix_completion_events(
self,
previous_item,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events when an MCP prefix tool (mcp.*) completes."""
mcp_name = previous_item.recipient[len("mcp.") :]
events = []
events.append(
ResponseMcpCallArgumentsDoneEvent(
type="response.mcp_call_arguments.done",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
arguments=previous_item.content[0].text,
name=mcp_name,
)
)
events.append(
ResponseMcpCallCompletedEvent(
type="response.mcp_call.completed",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
)
)
events.append(
ResponseOutputItemDoneEvent(
type="response.output_item.done",
sequence_number=-1,
output_index=state.current_output_index,
item=McpCall(
type="mcp_call",
id=state.current_item_id,
name=mcp_name,
arguments=previous_item.content[0].text,
server_label=mcp_name,
status="completed",
),
)
)
return events
def _emit_tool_action_events(
self,
ctx: StreamingHarmonyContext,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events for tool action turn."""
if not ctx.is_assistant_action_turn() or len(ctx.parser.messages) == 0:
return []
events = []
previous_item = ctx.parser.messages[-1]
# Handle browser tool
if (
self.tool_server is not None
and self.tool_server.has_tool("browser")
and previous_item.recipient is not None
and previous_item.recipient.startswith("browser.")
):
events.extend(self._emit_browser_tool_events(previous_item, state))
# Handle tool completion
if (
self.tool_server is not None
and previous_item.recipient is not None
and state.current_item_id is not None
and state.sent_output_item_added
):
recipient = previous_item.recipient
# Handle MCP prefix tool completion first
if recipient.startswith("mcp."):
events.extend(
self._emit_mcp_prefix_completion_events(previous_item, state)
)
else:
# Handle other MCP tool and code interpreter completion
is_mcp_tool = self._is_mcp_tool_by_namespace(
recipient
) and state.current_item_id.startswith("mcp_")
if is_mcp_tool:
events.extend(
self._emit_mcp_tool_completion_events(previous_item, state)
)
else:
events.extend(
self._emit_code_interpreter_completion_events(
previous_item, state
)
)
return events
def _emit_function_call_delta_events(
self,
ctx: StreamingHarmonyContext,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events for developer function calls on commentary channel."""
if not (
ctx.parser.current_channel == "commentary"
and ctx.parser.current_recipient
and ctx.parser.current_recipient.startswith("functions.")
):
return []
events = []
if state.is_first_function_call_delta is False:
state.is_first_function_call_delta = True
fc_name = ctx.parser.current_recipient[len("functions.") :]
state.current_item_id = f"fc_{random_uuid()}"
tool_call_item = ResponseFunctionToolCall(
name=fc_name,
type="function_call",
id=state.current_item_id,
call_id=f"call_{random_uuid()}",
arguments="",
status="in_progress",
)
events.append(
ResponseOutputItemAddedEvent(
type="response.output_item.added",
sequence_number=-1,
output_index=state.current_output_index,
item=tool_call_item,
)
)
# Always emit the delta (including on first call)
events.append(
ResponseFunctionCallArgumentsDeltaEvent(
item_id=state.current_item_id,
delta=ctx.last_content_delta,
output_index=state.current_output_index,
sequence_number=-1,
type="response.function_call_arguments.delta",
)
)
return events
async def _process_harmony_streaming_events( async def _process_harmony_streaming_events(
self, self,
request: ResponsesRequest, request: ResponsesRequest,
...@@ -2440,18 +1578,16 @@ class OpenAIServingResponses(OpenAIServing): ...@@ -2440,18 +1578,16 @@ class OpenAIServingResponses(OpenAIServing):
if ctx.is_expecting_start(): if ctx.is_expecting_start():
if len(ctx.parser.messages) > 0: if len(ctx.parser.messages) > 0:
previous_item = ctx.parser.messages[-1] previous_item = ctx.parser.messages[-1]
for event in self._emit_previous_item_done_events( for event in emit_previous_item_done_events(previous_item, state):
previous_item, state
):
yield _increment_sequence_number_and_return(event) yield _increment_sequence_number_and_return(event)
state.reset_for_new_item() state.reset_for_new_item()
# Stream the output of a harmony message # Stream the output of a harmony message
for event in self._emit_content_delta_events(ctx, state): for event in emit_content_delta_events(ctx, state):
yield _increment_sequence_number_and_return(event) yield _increment_sequence_number_and_return(event)
# Stream tool call outputs # Stream tool call outputs
for event in self._emit_tool_action_events(ctx, state): for event in emit_tool_action_events(ctx, state, self.tool_server):
yield _increment_sequence_number_and_return(event) yield _increment_sequence_number_and_return(event)
async def responses_stream_generator( async def responses_stream_generator(
......
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""
Streaming SSE event builders for the Responses API.
Pure functions that translate streaming state + delta data into
OpenAI Response API SSE events. Used by the streaming event
processors in serving.py.
"""
import json
from dataclasses import dataclass
from typing import Final
from openai.types.responses import (
ResponseCodeInterpreterCallCodeDeltaEvent,
ResponseCodeInterpreterCallCodeDoneEvent,
ResponseCodeInterpreterCallCompletedEvent,
ResponseCodeInterpreterCallInProgressEvent,
ResponseCodeInterpreterCallInterpretingEvent,
ResponseCodeInterpreterToolCallParam,
ResponseContentPartAddedEvent,
ResponseContentPartDoneEvent,
ResponseFunctionCallArgumentsDeltaEvent,
ResponseFunctionCallArgumentsDoneEvent,
ResponseFunctionToolCall,
ResponseFunctionWebSearch,
ResponseMcpCallArgumentsDeltaEvent,
ResponseMcpCallArgumentsDoneEvent,
ResponseMcpCallCompletedEvent,
ResponseMcpCallInProgressEvent,
ResponseOutputItemAddedEvent,
ResponseOutputItemDoneEvent,
ResponseOutputMessage,
ResponseOutputText,
ResponseReasoningItem,
ResponseReasoningTextDeltaEvent,
ResponseReasoningTextDoneEvent,
ResponseTextDeltaEvent,
ResponseTextDoneEvent,
ResponseWebSearchCallCompletedEvent,
ResponseWebSearchCallInProgressEvent,
ResponseWebSearchCallSearchingEvent,
response_function_web_search,
)
from openai.types.responses.response_output_item import McpCall
from openai.types.responses.response_reasoning_item import (
Content as ResponseReasoningTextContent,
)
from vllm.entrypoints.mcp.tool_server import ToolServer
from vllm.entrypoints.openai.responses.context import StreamingHarmonyContext
from vllm.entrypoints.openai.responses.protocol import (
ResponseReasoningPartAddedEvent,
ResponseReasoningPartDoneEvent,
StreamingResponsesResponse,
)
from vllm.utils import random_uuid
TOOL_NAME_TO_MCP_SERVER_LABEL: Final[dict[str, str]] = {
"python": "code_interpreter",
"container": "container",
"browser": "web_search_preview",
}
@dataclass
class HarmonyStreamingState:
"""Mutable state for harmony streaming event processing."""
current_content_index: int = -1
current_output_index: int = 0
current_item_id: str = ""
sent_output_item_added: bool = False
is_first_function_call_delta: bool = False
def reset_for_new_item(self) -> None:
"""Reset state when expecting a new output item."""
self.current_output_index += 1
self.sent_output_item_added = False
self.is_first_function_call_delta = False
def is_mcp_tool_by_namespace(recipient: str | None) -> bool:
"""
Determine if a tool call is an MCP tool based on recipient prefix.
- Tools starting with "functions." are function calls
- Everything else is an MCP tool
"""
if recipient is None:
return False
# Function calls have "functions." prefix
# Everything else is an MCP tool
return not recipient.startswith("functions.")
def emit_function_call_done_events(
previous_item,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events when a function call completes."""
function_name = previous_item.recipient[len("functions.") :]
events: list[StreamingResponsesResponse] = []
events.append(
ResponseFunctionCallArgumentsDoneEvent(
type="response.function_call_arguments.done",
arguments=previous_item.content[0].text,
name=function_name,
item_id=state.current_item_id,
output_index=state.current_output_index,
sequence_number=-1,
)
)
function_call_item = ResponseFunctionToolCall(
type="function_call",
arguments=previous_item.content[0].text,
name=function_name,
item_id=state.current_item_id,
output_index=state.current_output_index,
sequence_number=-1,
call_id=f"fc_{random_uuid()}",
status="completed",
)
events.append(
ResponseOutputItemDoneEvent(
type="response.output_item.done",
sequence_number=-1,
output_index=state.current_output_index,
item=function_call_item,
)
)
return events
def emit_mcp_call_done_events(
previous_item,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events when an MCP tool call completes."""
server_label = TOOL_NAME_TO_MCP_SERVER_LABEL.get(
previous_item.recipient, previous_item.recipient
)
events: list[StreamingResponsesResponse] = []
events.append(
ResponseMcpCallArgumentsDoneEvent(
type="response.mcp_call_arguments.done",
arguments=previous_item.content[0].text,
name=previous_item.recipient,
item_id=state.current_item_id,
output_index=state.current_output_index,
sequence_number=-1,
)
)
events.append(
ResponseMcpCallCompletedEvent(
type="response.mcp_call.completed",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
)
)
events.append(
ResponseOutputItemDoneEvent(
type="response.output_item.done",
sequence_number=-1,
output_index=state.current_output_index,
item=McpCall(
type="mcp_call",
arguments=previous_item.content[0].text,
name=previous_item.recipient,
id=state.current_item_id,
server_label=server_label,
status="completed",
),
)
)
return events
def emit_reasoning_done_events(
previous_item,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events when a reasoning (analysis) item completes."""
content = ResponseReasoningTextContent(
text=previous_item.content[0].text,
type="reasoning_text",
)
reasoning_item = ResponseReasoningItem(
type="reasoning",
content=[content],
status="completed",
id=state.current_item_id,
summary=[],
)
events: list[StreamingResponsesResponse] = []
events.append(
ResponseReasoningTextDoneEvent(
type="response.reasoning_text.done",
item_id=state.current_item_id,
sequence_number=-1,
output_index=state.current_output_index,
content_index=state.current_content_index,
text=previous_item.content[0].text,
)
)
events.append(
ResponseReasoningPartDoneEvent(
type="response.reasoning_part.done",
sequence_number=-1,
item_id=state.current_item_id,
output_index=state.current_output_index,
content_index=state.current_content_index,
part=content,
)
)
events.append(
ResponseOutputItemDoneEvent(
type="response.output_item.done",
sequence_number=-1,
output_index=state.current_output_index,
item=reasoning_item,
)
)
return events
def emit_text_output_done_events(
previous_item,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events when a final text output item completes."""
text_content = ResponseOutputText(
type="output_text",
text=previous_item.content[0].text,
annotations=[],
)
events: list[StreamingResponsesResponse] = []
events.append(
ResponseTextDoneEvent(
type="response.output_text.done",
sequence_number=-1,
output_index=state.current_output_index,
content_index=state.current_content_index,
text=previous_item.content[0].text,
logprobs=[],
item_id=state.current_item_id,
)
)
events.append(
ResponseContentPartDoneEvent(
type="response.content_part.done",
sequence_number=-1,
item_id=state.current_item_id,
output_index=state.current_output_index,
content_index=state.current_content_index,
part=text_content,
)
)
events.append(
ResponseOutputItemDoneEvent(
type="response.output_item.done",
sequence_number=-1,
output_index=state.current_output_index,
item=ResponseOutputMessage(
id=state.current_item_id,
type="message",
role="assistant",
content=[text_content],
status="completed",
),
)
)
return events
def emit_previous_item_done_events(
previous_item,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit done events for the previous item when expecting a new start."""
if previous_item.recipient is not None:
# Deal with tool call
if previous_item.recipient.startswith("functions."):
return emit_function_call_done_events(previous_item, state)
elif (
is_mcp_tool_by_namespace(previous_item.recipient)
and state.current_item_id is not None
and state.current_item_id.startswith("mcp_")
):
return emit_mcp_call_done_events(previous_item, state)
elif previous_item.channel == "analysis":
return emit_reasoning_done_events(previous_item, state)
elif previous_item.channel == "final":
return emit_text_output_done_events(previous_item, state)
return []
def emit_final_channel_delta_events(
ctx: StreamingHarmonyContext,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events for final channel text delta streaming."""
events: list[StreamingResponsesResponse] = []
if not state.sent_output_item_added:
state.sent_output_item_added = True
state.current_item_id = f"msg_{random_uuid()}"
events.append(
ResponseOutputItemAddedEvent(
type="response.output_item.added",
sequence_number=-1,
output_index=state.current_output_index,
item=ResponseOutputMessage(
id=state.current_item_id,
type="message",
role="assistant",
content=[],
status="in_progress",
),
)
)
state.current_content_index += 1
events.append(
ResponseContentPartAddedEvent(
type="response.content_part.added",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
content_index=state.current_content_index,
part=ResponseOutputText(
type="output_text",
text="",
annotations=[],
logprobs=[],
),
)
)
events.append(
ResponseTextDeltaEvent(
type="response.output_text.delta",
sequence_number=-1,
content_index=state.current_content_index,
output_index=state.current_output_index,
item_id=state.current_item_id,
delta=ctx.last_content_delta,
# TODO, use logprobs from ctx.last_request_output
logprobs=[],
)
)
return events
def emit_analysis_channel_delta_events(
ctx: StreamingHarmonyContext,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events for analysis channel reasoning delta streaming."""
events: list[StreamingResponsesResponse] = []
if not state.sent_output_item_added:
state.sent_output_item_added = True
state.current_item_id = f"msg_{random_uuid()}"
events.append(
ResponseOutputItemAddedEvent(
type="response.output_item.added",
sequence_number=-1,
output_index=state.current_output_index,
item=ResponseReasoningItem(
type="reasoning",
id=state.current_item_id,
summary=[],
status="in_progress",
),
)
)
state.current_content_index += 1
events.append(
ResponseReasoningPartAddedEvent(
type="response.reasoning_part.added",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
content_index=state.current_content_index,
part=ResponseReasoningTextContent(
text="",
type="reasoning_text",
),
)
)
events.append(
ResponseReasoningTextDeltaEvent(
type="response.reasoning_text.delta",
item_id=state.current_item_id,
output_index=state.current_output_index,
content_index=state.current_content_index,
delta=ctx.last_content_delta,
sequence_number=-1,
)
)
return events
def emit_mcp_tool_delta_events(
ctx: StreamingHarmonyContext,
state: HarmonyStreamingState,
recipient: str,
) -> list[StreamingResponsesResponse]:
"""Emit events for MCP tool delta streaming."""
server_label = TOOL_NAME_TO_MCP_SERVER_LABEL.get(recipient, recipient)
events: list[StreamingResponsesResponse] = []
if not state.sent_output_item_added:
state.sent_output_item_added = True
state.current_item_id = f"mcp_{random_uuid()}"
events.append(
ResponseOutputItemAddedEvent(
type="response.output_item.added",
sequence_number=-1,
output_index=state.current_output_index,
item=McpCall(
type="mcp_call",
id=state.current_item_id,
name=recipient,
arguments="",
server_label=server_label,
status="in_progress",
),
)
)
events.append(
ResponseMcpCallInProgressEvent(
type="response.mcp_call.in_progress",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
)
)
events.append(
ResponseMcpCallArgumentsDeltaEvent(
type="response.mcp_call_arguments.delta",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
delta=ctx.last_content_delta,
)
)
return events
def emit_code_interpreter_delta_events(
ctx: StreamingHarmonyContext,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events for code interpreter delta streaming."""
events: list[StreamingResponsesResponse] = []
if not state.sent_output_item_added:
state.sent_output_item_added = True
state.current_item_id = f"tool_{random_uuid()}"
events.append(
ResponseOutputItemAddedEvent(
type="response.output_item.added",
sequence_number=-1,
output_index=state.current_output_index,
item=ResponseCodeInterpreterToolCallParam(
type="code_interpreter_call",
id=state.current_item_id,
code=None,
container_id="auto",
outputs=None,
status="in_progress",
),
)
)
events.append(
ResponseCodeInterpreterCallInProgressEvent(
type="response.code_interpreter_call.in_progress",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
)
)
events.append(
ResponseCodeInterpreterCallCodeDeltaEvent(
type="response.code_interpreter_call_code.delta",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
delta=ctx.last_content_delta,
)
)
return events
def emit_mcp_prefix_delta_events(
ctx: StreamingHarmonyContext,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events for MCP prefix (mcp.*) delta streaming."""
events: list[StreamingResponsesResponse] = []
if not state.sent_output_item_added:
state.sent_output_item_added = True
state.current_item_id = f"mcp_{random_uuid()}"
mcp_name = ctx.parser.current_recipient[len("mcp.") :]
events.append(
ResponseOutputItemAddedEvent(
type="response.output_item.added",
sequence_number=-1,
output_index=state.current_output_index,
item=McpCall(
type="mcp_call",
id=state.current_item_id,
name=mcp_name,
arguments="",
server_label=mcp_name,
status="in_progress",
),
)
)
events.append(
ResponseMcpCallInProgressEvent(
type="response.mcp_call.in_progress",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
)
)
events.append(
ResponseMcpCallArgumentsDeltaEvent(
type="response.mcp_call_arguments.delta",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
delta=ctx.last_content_delta,
)
)
return events
def emit_function_call_delta_events(
ctx: StreamingHarmonyContext,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events for developer function calls on commentary channel."""
if not (
ctx.parser.current_channel == "commentary"
and ctx.parser.current_recipient
and ctx.parser.current_recipient.startswith("functions.")
):
return []
events: list[StreamingResponsesResponse] = []
if state.is_first_function_call_delta is False:
state.is_first_function_call_delta = True
fc_name = ctx.parser.current_recipient[len("functions.") :]
state.current_item_id = f"fc_{random_uuid()}"
tool_call_item = ResponseFunctionToolCall(
name=fc_name,
type="function_call",
id=state.current_item_id,
call_id=f"call_{random_uuid()}",
arguments="",
status="in_progress",
)
events.append(
ResponseOutputItemAddedEvent(
type="response.output_item.added",
sequence_number=-1,
output_index=state.current_output_index,
item=tool_call_item,
)
)
# Always emit the delta (including on first call)
events.append(
ResponseFunctionCallArgumentsDeltaEvent(
item_id=state.current_item_id,
delta=ctx.last_content_delta,
output_index=state.current_output_index,
sequence_number=-1,
type="response.function_call_arguments.delta",
)
)
return events
def emit_content_delta_events(
ctx: StreamingHarmonyContext,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events for content delta streaming based on channel type."""
if not ctx.last_content_delta:
return []
if ctx.parser.current_channel == "final" and ctx.parser.current_recipient is None:
return emit_final_channel_delta_events(ctx, state)
elif (
ctx.parser.current_channel == "analysis"
and ctx.parser.current_recipient is None
):
return emit_analysis_channel_delta_events(ctx, state)
# built-in tools will be triggered on the analysis channel
# However, occasionally built-in tools will
# still be output to commentary.
elif (
ctx.parser.current_channel == "commentary"
or ctx.parser.current_channel == "analysis"
) and ctx.parser.current_recipient is not None:
recipient = ctx.parser.current_recipient
# Check for function calls first - they have their own event handling
if recipient.startswith("functions."):
return emit_function_call_delta_events(ctx, state)
if is_mcp_tool_by_namespace(recipient):
return emit_mcp_tool_delta_events(ctx, state, recipient)
else:
return emit_code_interpreter_delta_events(ctx, state)
elif (
(
ctx.parser.current_channel == "commentary"
or ctx.parser.current_channel == "analysis"
)
and ctx.parser.current_recipient is not None
and ctx.parser.current_recipient.startswith("mcp.")
):
return emit_mcp_prefix_delta_events(ctx, state)
return []
def emit_browser_tool_events(
previous_item,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events for browser tool calls (web search)."""
function_name = previous_item.recipient[len("browser.") :]
parsed_args = json.loads(previous_item.content[0].text)
action = None
if function_name == "search":
action = response_function_web_search.ActionSearch(
type="search",
query=parsed_args["query"],
)
elif function_name == "open":
action = response_function_web_search.ActionOpenPage(
type="open_page",
# TODO: translate to url
url=f"cursor:{parsed_args.get('cursor', '')}",
)
elif function_name == "find":
action = response_function_web_search.ActionFind(
type="find",
pattern=parsed_args["pattern"],
# TODO: translate to url
url=f"cursor:{parsed_args.get('cursor', '')}",
)
else:
raise ValueError(f"Unknown function name: {function_name}")
state.current_item_id = f"tool_{random_uuid()}"
events: list[StreamingResponsesResponse] = []
events.append(
ResponseOutputItemAddedEvent(
type="response.output_item.added",
sequence_number=-1,
output_index=state.current_output_index,
item=response_function_web_search.ResponseFunctionWebSearch(
# TODO: generate a unique id for web search call
type="web_search_call",
id=state.current_item_id,
action=action,
status="in_progress",
),
)
)
events.append(
ResponseWebSearchCallInProgressEvent(
type="response.web_search_call.in_progress",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
)
)
events.append(
ResponseWebSearchCallSearchingEvent(
type="response.web_search_call.searching",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
)
)
# enqueue
events.append(
ResponseWebSearchCallCompletedEvent(
type="response.web_search_call.completed",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
)
)
events.append(
ResponseOutputItemDoneEvent(
type="response.output_item.done",
sequence_number=-1,
output_index=state.current_output_index,
item=ResponseFunctionWebSearch(
type="web_search_call",
id=state.current_item_id,
action=action,
status="completed",
),
)
)
return events
def emit_mcp_tool_completion_events(
previous_item,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events when an MCP tool completes during assistant action turn."""
recipient = previous_item.recipient
server_label = TOOL_NAME_TO_MCP_SERVER_LABEL.get(recipient, recipient)
events: list[StreamingResponsesResponse] = []
events.append(
ResponseMcpCallArgumentsDoneEvent(
type="response.mcp_call_arguments.done",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
arguments=previous_item.content[0].text,
name=recipient,
)
)
events.append(
ResponseMcpCallCompletedEvent(
type="response.mcp_call.completed",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
)
)
events.append(
ResponseOutputItemDoneEvent(
type="response.output_item.done",
sequence_number=-1,
output_index=state.current_output_index,
item=McpCall(
type="mcp_call",
id=state.current_item_id,
name=recipient,
arguments=previous_item.content[0].text,
server_label=server_label,
status="completed",
),
)
)
return events
def emit_code_interpreter_completion_events(
previous_item,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events when code interpreter completes."""
events: list[StreamingResponsesResponse] = []
events.append(
ResponseCodeInterpreterCallCodeDoneEvent(
type="response.code_interpreter_call_code.done",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
code=previous_item.content[0].text,
)
)
events.append(
ResponseCodeInterpreterCallInterpretingEvent(
type="response.code_interpreter_call.interpreting",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
)
)
events.append(
ResponseCodeInterpreterCallCompletedEvent(
type="response.code_interpreter_call.completed",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
)
)
events.append(
ResponseOutputItemDoneEvent(
type="response.output_item.done",
sequence_number=-1,
output_index=state.current_output_index,
item=ResponseCodeInterpreterToolCallParam(
type="code_interpreter_call",
id=state.current_item_id,
code=previous_item.content[0].text,
container_id="auto",
outputs=[],
status="completed",
),
)
)
return events
def emit_mcp_prefix_completion_events(
previous_item,
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events when an MCP prefix tool (mcp.*) completes."""
mcp_name = previous_item.recipient[len("mcp.") :]
events: list[StreamingResponsesResponse] = []
events.append(
ResponseMcpCallArgumentsDoneEvent(
type="response.mcp_call_arguments.done",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
arguments=previous_item.content[0].text,
name=mcp_name,
)
)
events.append(
ResponseMcpCallCompletedEvent(
type="response.mcp_call.completed",
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
)
)
events.append(
ResponseOutputItemDoneEvent(
type="response.output_item.done",
sequence_number=-1,
output_index=state.current_output_index,
item=McpCall(
type="mcp_call",
id=state.current_item_id,
name=mcp_name,
arguments=previous_item.content[0].text,
server_label=mcp_name,
status="completed",
),
)
)
return events
def emit_tool_action_events(
ctx: StreamingHarmonyContext,
state: HarmonyStreamingState,
tool_server: ToolServer | None,
) -> list[StreamingResponsesResponse]:
"""Emit events for tool action turn."""
if not ctx.is_assistant_action_turn() or len(ctx.parser.messages) == 0:
return []
events: list[StreamingResponsesResponse] = []
previous_item = ctx.parser.messages[-1]
# Handle browser tool
if (
tool_server is not None
and tool_server.has_tool("browser")
and previous_item.recipient is not None
and previous_item.recipient.startswith("browser.")
):
events.extend(emit_browser_tool_events(previous_item, state))
# Handle tool completion
if (
tool_server is not None
and previous_item.recipient is not None
and state.current_item_id is not None
and state.sent_output_item_added
):
recipient = previous_item.recipient
# Handle MCP prefix tool completion first
if recipient.startswith("mcp."):
events.extend(emit_mcp_prefix_completion_events(previous_item, state))
else:
# Handle other MCP tool and code interpreter completion
is_mcp_tool = is_mcp_tool_by_namespace(
recipient
) and state.current_item_id.startswith("mcp_")
if is_mcp_tool:
events.extend(emit_mcp_tool_completion_events(previous_item, state))
else:
events.extend(
emit_code_interpreter_completion_events(previous_item, state)
)
return events
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment