launcher.py 5.1 KB
Newer Older
1
2
# SPDX-License-Identifier: Apache-2.0

3
4
import asyncio
import signal
5
import socket
6
from http import HTTPStatus
7
from typing import Any, Optional
8
9

import uvicorn
10
from fastapi import FastAPI, Request, Response
11

12
13
from vllm import envs
from vllm.engine.async_llm_engine import AsyncEngineDeadError
14
from vllm.engine.multiprocessing import MQEngineDeadError
15
from vllm.engine.protocol import EngineClient
16
from vllm.entrypoints.ssl import SSLCertRefresher
17
from vllm.logger import init_logger
18
from vllm.utils import find_process_using_port
19
from vllm.v1.engine.exceptions import EngineDeadError, EngineGenerateError
20
21
22
23

logger = init_logger(__name__)


24
25
26
async def serve_http(app: FastAPI,
                     sock: Optional[socket.socket],
                     enable_ssl_refresh: bool = False,
27
                     **uvicorn_kwargs: Any):
28
29
30
31
32
33
34
35
36
37
38
    logger.info("Available routes are:")
    for route in app.routes:
        methods = getattr(route, "methods", None)
        path = getattr(route, "path", None)

        if methods is None or path is None:
            continue

        logger.info("Route: %s, Methods: %s", path, ', '.join(methods))

    config = uvicorn.Config(app, **uvicorn_kwargs)
39
    config.load()
40
    server = uvicorn.Server(config)
41
    _add_shutdown_handlers(app, server)
42
43
44

    loop = asyncio.get_running_loop()

45
46
    watchdog_task = loop.create_task(
        watchdog_loop(server, app.state.engine_client))
47
48
    server_task = loop.create_task(
        server.serve(sockets=[sock] if sock else None))
49

50
51
52
53
54
55
    ssl_cert_refresher = None if not enable_ssl_refresh else SSLCertRefresher(
        ssl_context=config.ssl,
        key_path=config.ssl_keyfile,
        cert_path=config.ssl_certfile,
        ca_path=config.ssl_ca_certs)

56
57
58
    def signal_handler() -> None:
        # prevents the uvicorn signal handler to exit early
        server_task.cancel()
59
        watchdog_task.cancel()
60
61
        if ssl_cert_refresher:
            ssl_cert_refresher.stop()
62
63
64
65
66
67
68
69
70
71
72

    async def dummy_shutdown() -> None:
        pass

    loop.add_signal_handler(signal.SIGINT, signal_handler)
    loop.add_signal_handler(signal.SIGTERM, signal_handler)

    try:
        await server_task
        return dummy_shutdown()
    except asyncio.CancelledError:
73
74
75
76
77
78
        port = uvicorn_kwargs["port"]
        process = find_process_using_port(port)
        if process is not None:
            logger.debug(
                "port %s is used by process %s launched with command:\n%s",
                port, process, " ".join(process.cmdline()))
79
        logger.info("Shutting down FastAPI HTTP server.")
80
        
81
        return server.shutdown()
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
    finally:
        watchdog_task.cancel()


async def watchdog_loop(server: uvicorn.Server, engine: EngineClient):
    """
    # Watchdog task that runs in the background, checking
    # for error state in the engine. Needed to trigger shutdown
    # if an exception arises is StreamingResponse() generator.
    """
    VLLM_WATCHDOG_TIME_S = 5.0
    while True:
        await asyncio.sleep(VLLM_WATCHDOG_TIME_S)
        terminate_if_errored(server, engine)


def terminate_if_errored(server: uvicorn.Server, engine: EngineClient):
    """
    See discussions here on shutting down a uvicorn server
    https://github.com/encode/uvicorn/discussions/1103
    In this case we cannot await the server shutdown here
    because handler must first return to close the connection
    for this request.
    """
    engine_errored = engine.errored and not engine.is_running
    if not envs.VLLM_KEEP_ALIVE_ON_ENGINE_DEATH and engine_errored:
        server.should_exit = True
109
110


111
def _add_shutdown_handlers(app: FastAPI, server: uvicorn.Server) -> None:
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
    """
    VLLM V1 AsyncLLM catches exceptions and returns
    only two types: EngineGenerateError and EngineDeadError.
    
    EngineGenerateError is raised by the per request generate()
    method. This error could be request specific (and therefore
    recoverable - e.g. if there is an error in input processing).
    
    EngineDeadError is raised by the background output_handler
    method. This error is global and therefore not recoverable.
    
    We register these @app.exception_handlers to return nice
    responses to the end user if they occur and shut down if needed.
    See https://fastapi.tiangolo.com/tutorial/handling-errors/
    for more details on how exception handlers work.

    If an exception is encountered in a StreamingResponse
    generator, the exception is not raised, since we already sent
    a 200 status. Rather, we send an error message as the next chunk.
    Since the exception is not raised, this means that the server
    will not automatically shut down. Instead, we use the watchdog
    background task for check for errored state.
    """
135
136
137

    @app.exception_handler(RuntimeError)
    @app.exception_handler(AsyncEngineDeadError)
138
    @app.exception_handler(MQEngineDeadError)
139
140
141
142
143
144
145
    @app.exception_handler(EngineDeadError)
    @app.exception_handler(EngineGenerateError)
    async def runtime_exception_handler(request: Request, __):
        terminate_if_errored(
            server=server,
            engine=request.app.state.engine_client,
        )
146
147

        return Response(status_code=HTTPStatus.INTERNAL_SERVER_ERROR)