api_router.py 4.23 KB
Newer Older
1
2
3
4
5
6
7
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project


from collections.abc import AsyncGenerator
from http import HTTPStatus

8
from fastapi import APIRouter, Depends, FastAPI, Request
9
10
11
12
13
14
15
16
17
18
19
from fastapi.responses import JSONResponse, StreamingResponse

from vllm.entrypoints.openai.engine.protocol import ErrorResponse
from vllm.entrypoints.openai.responses.protocol import (
    ResponsesRequest,
    ResponsesResponse,
    StreamingResponsesResponse,
)
from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses
from vllm.entrypoints.openai.utils import validate_json_request
from vllm.entrypoints.utils import (
20
    load_aware_call,
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
    with_cancellation,
)
from vllm.logger import init_logger

logger = init_logger(__name__)

router = APIRouter()


def responses(request: Request) -> OpenAIServingResponses | None:
    return request.app.state.openai_serving_responses


async def _convert_stream_to_sse_events(
    generator: AsyncGenerator[StreamingResponsesResponse, None],
) -> AsyncGenerator[str, None]:
    """Convert the generator to a stream of events in SSE format"""
    async for event in generator:
        event_type = getattr(event, "type", "unknown")
        # https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format
        event_data = (
42
43
            f"event: {event_type}\ndata: "
            f"{event.model_dump_json(indent=None, by_alias=True)}\n\n"
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
        )
        yield event_data


@router.post(
    "/v1/responses",
    dependencies=[Depends(validate_json_request)],
    responses={
        HTTPStatus.OK.value: {"content": {"text/event-stream": {}}},
        HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse},
        HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse},
        HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse},
    },
)
@with_cancellation
59
@load_aware_call
60
61
62
async def create_responses(request: ResponsesRequest, raw_request: Request):
    handler = responses(raw_request)
    if handler is None:
63
        raise NotImplementedError("The model does not support Responses API")
64
65

    generator = await handler.create_responses(request, raw_request)
66
67
68

    if isinstance(generator, ErrorResponse):
        return JSONResponse(
69
70
            content=generator.model_dump(mode="json", by_alias=True),
            status_code=generator.error.code,
71
72
        )
    elif isinstance(generator, ResponsesResponse):
73
        return JSONResponse(content=generator.model_dump(mode="json", by_alias=True))
74
75
76
77
78
79
80

    return StreamingResponse(
        content=_convert_stream_to_sse_events(generator), media_type="text/event-stream"
    )


@router.get("/v1/responses/{response_id}")
81
@load_aware_call
82
83
84
85
86
87
88
89
async def retrieve_responses(
    response_id: str,
    raw_request: Request,
    starting_after: int | None = None,
    stream: bool | None = False,
):
    handler = responses(raw_request)
    if handler is None:
90
        raise NotImplementedError("The model does not support Responses API")
91

92
93
94
95
96
    response = await handler.retrieve_responses(
        response_id,
        starting_after=starting_after,
        stream=stream,
    )
97
98
99

    if isinstance(response, ErrorResponse):
        return JSONResponse(
100
101
            content=response.model_dump(mode="json", by_alias=True),
            status_code=response.error.code,
102
103
        )
    elif isinstance(response, ResponsesResponse):
104
        return JSONResponse(content=response.model_dump(mode="json", by_alias=True))
105
106
107
108
109
110
    return StreamingResponse(
        content=_convert_stream_to_sse_events(response), media_type="text/event-stream"
    )


@router.post("/v1/responses/{response_id}/cancel")
111
@load_aware_call
112
113
114
async def cancel_responses(response_id: str, raw_request: Request):
    handler = responses(raw_request)
    if handler is None:
115
        raise NotImplementedError("The model does not support Responses API")
116

117
    response = await handler.cancel_responses(response_id)
118
119
120

    if isinstance(response, ErrorResponse):
        return JSONResponse(
121
122
            content=response.model_dump(mode="json", by_alias=True),
            status_code=response.error.code,
123
        )
124
    return JSONResponse(content=response.model_dump(mode="json", by_alias=True))
125
126
127
128


def attach_router(app: FastAPI):
    app.include_router(router)