test_response_api_simple.py 4.34 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project


import pytest
import pytest_asyncio
from openai import OpenAI

from ...utils import RemoteOpenAIServer

MODEL_NAME = "Qwen/Qwen3-8B"


@pytest.fixture(scope="module")
def server():
    args = ["--reasoning-parser", "qwen3", "--max_model_len", "5000"]
    env_dict = dict(
        VLLM_ENABLE_RESPONSES_API_STORE="1",
        # uncomment for tool calling
        # PYTHON_EXECUTION_BACKEND="dangerously_use_uv",
    )

    with RemoteOpenAIServer(MODEL_NAME, args, env_dict=env_dict) as remote_server:
        yield remote_server


@pytest_asyncio.fixture
async def client(server):
    async with server.get_async_client() as async_client:
        yield async_client


@pytest.mark.asyncio
@pytest.mark.parametrize("model_name", [MODEL_NAME])
async def test_basic(client: OpenAI, model_name: str):
    response = await client.responses.create(
        model=model_name,
        input="What is 13 * 24?",
    )
    assert response is not None
    print("response: ", response)
    assert response.status == "completed"


45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@pytest.mark.asyncio
@pytest.mark.parametrize("model_name", [MODEL_NAME])
async def test_enable_response_messages(client: OpenAI, model_name: str):
    response = await client.responses.create(
        model=model_name,
        input="Hello?",
        extra_body={"enable_response_messages": True},
    )
    assert response.status == "completed"
    assert response.input_messages[0]["type"] == "raw_message_tokens"
    assert type(response.input_messages[0]["message"]) is str
    assert len(response.input_messages[0]["message"]) > 10
    assert type(response.input_messages[0]["tokens"][0]) is int
    assert type(response.output_messages[0]["message"]) is str
    assert len(response.output_messages[0]["message"]) > 10
    assert type(response.output_messages[0]["tokens"][0]) is int


63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@pytest.mark.asyncio
@pytest.mark.parametrize("model_name", [MODEL_NAME])
async def test_reasoning_item(client: OpenAI, model_name: str):
    response = await client.responses.create(
        model=model_name,
        input=[
            {"type": "message", "content": "Hello.", "role": "user"},
            {
                "type": "reasoning",
                "id": "lol",
                "content": [
                    {
                        "type": "reasoning_text",
                        "text": "We need to respond: greeting.",
                    }
                ],
                "summary": [],
            },
        ],
        temperature=0.0,
    )
    assert response is not None
    assert response.status == "completed"
    # make sure we get a reasoning and text output
    assert response.output[0].type == "reasoning"
    assert response.output[1].type == "message"
    assert type(response.output[1].content[0].text) is str
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134


@pytest.mark.asyncio
@pytest.mark.parametrize("model_name", [MODEL_NAME])
async def test_streaming_output_consistency(client: OpenAI, model_name: str):
    """Test that streaming delta text matches the final response output_text.

    This test verifies that when using streaming mode:
    1. The concatenated text from all 'response.output_text.delta' events
    2. Matches the 'output_text' in the final 'response.completed' event
    """
    response = await client.responses.create(
        model=model_name,
        input="Say hello in one sentence.",
        stream=True,
    )

    events = []
    async for event in response:
        events.append(event)

    assert len(events) > 0

    # Concatenate all delta text from streaming events
    streaming_text = "".join(
        event.delta for event in events if event.type == "response.output_text.delta"
    )

    # Get the final response from the last event
    response_completed_event = events[-1]
    assert response_completed_event.type == "response.completed"
    assert response_completed_event.response.status == "completed"

    # Get output_text from the final response
    final_output_text = response_completed_event.response.output_text

    # Verify final response has output
    assert len(response_completed_event.response.output) > 0

    # Verify streaming text matches final output_text
    assert streaming_text == final_output_text, (
        f"Streaming text does not match final output_text.\n"
        f"Streaming: {streaming_text!r}\n"
        f"Final: {final_output_text!r}"
    )