launcher.py 5.11 KB
Newer Older
1
# SPDX-License-Identifier: Apache-2.0
2
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3

4
5
import asyncio
import signal
6
import socket
7
from functools import partial
8
from typing import Any
9
10

import uvicorn
11
from fastapi import FastAPI
12

13
from vllm import envs
14
from vllm.engine.protocol import EngineClient
15
16
17
18
from vllm.entrypoints.constants import (
    H11_MAX_HEADER_COUNT_DEFAULT,
    H11_MAX_INCOMPLETE_EVENT_SIZE_DEFAULT,
)
19
from vllm.entrypoints.ssl import SSLCertRefresher
20
from vllm.logger import init_logger
21
from vllm.utils.network_utils import find_process_using_port
22
23
24
25

logger = init_logger(__name__)


26
27
async def serve_http(
    app: FastAPI,
28
    sock: socket.socket | None,
29
30
31
    enable_ssl_refresh: bool = False,
    **uvicorn_kwargs: Any,
):
32
33
34
35
36
    """
    Start a FastAPI app using Uvicorn, with support for custom Uvicorn config
    options.  Supports http header limits via h11_max_incomplete_event_size and
    h11_max_header_count.
    """
37
    logger.info("Available routes are:")
38
    # post endpoints
39
40
41
42
43
44
45
    for route in app.routes:
        methods = getattr(route, "methods", None)
        path = getattr(route, "path", None)

        if methods is None or path is None:
            continue

46
        logger.info("Route: %s, Methods: %s", path, ", ".join(methods))
47

48
49
50
51
52
53
54
55
56
57
58
    # other endpoints
    for route in app.routes:
        endpoint = getattr(route, "endpoint", None)
        methods = getattr(route, "methods", None)
        path = getattr(route, "path", None)

        if endpoint is None or path is None or methods is not None:
            continue

        logger.info("Route: %s, Endpoint: %s", path, endpoint.__name__)

59
60
    # Extract header limit options if present
    h11_max_incomplete_event_size = uvicorn_kwargs.pop(
61
62
        "h11_max_incomplete_event_size", None
    )
63
64
65
66
67
68
69
70
    h11_max_header_count = uvicorn_kwargs.pop("h11_max_header_count", None)

    # Set safe defaults if not provided
    if h11_max_incomplete_event_size is None:
        h11_max_incomplete_event_size = H11_MAX_INCOMPLETE_EVENT_SIZE_DEFAULT
    if h11_max_header_count is None:
        h11_max_header_count = H11_MAX_HEADER_COUNT_DEFAULT

71
    config = uvicorn.Config(app, **uvicorn_kwargs)
72
73
74
    # Set header limits
    config.h11_max_incomplete_event_size = h11_max_incomplete_event_size
    config.h11_max_header_count = h11_max_header_count
75
    config.load()
76
    server = uvicorn.Server(config)
77
    app.state.server = server
78
79
80

    loop = asyncio.get_running_loop()

81
82
83
84
85
86
87
88
89
90
91
92
93
    watchdog_task = loop.create_task(watchdog_loop(server, app.state.engine_client))
    server_task = loop.create_task(server.serve(sockets=[sock] if sock else None))

    ssl_cert_refresher = (
        None
        if not enable_ssl_refresh
        else SSLCertRefresher(
            ssl_context=config.ssl,
            key_path=config.ssl_keyfile,
            cert_path=config.ssl_certfile,
            ca_path=config.ssl_ca_certs,
        )
    )
94

95
96
    shutdown_event = asyncio.Event()

97
    def signal_handler() -> None:
98
        shutdown_event.set()
99
100
101
102
103
104
105

    async def dummy_shutdown() -> None:
        pass

    loop.add_signal_handler(signal.SIGINT, signal_handler)
    loop.add_signal_handler(signal.SIGTERM, signal_handler)

106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
    async def handle_shutdown() -> None:
        await shutdown_event.wait()

        engine_client = app.state.engine_client
        timeout = engine_client.vllm_config.shutdown_timeout

        await loop.run_in_executor(
            None, partial(engine_client.shutdown, timeout=timeout)
        )

        server.should_exit = True
        server_task.cancel()
        watchdog_task.cancel()
        if ssl_cert_refresher:
            ssl_cert_refresher.stop()

    shutdown_task = loop.create_task(handle_shutdown())

124
125
126
127
    try:
        await server_task
        return dummy_shutdown()
    except asyncio.CancelledError:
128
129
130
        port = uvicorn_kwargs["port"]
        process = find_process_using_port(port)
        if process is not None:
131
            logger.warning(
132
                "port %s is used by process %s launched with command:\n%s",
133
134
135
136
                port,
                process,
                " ".join(process.cmdline()),
            )
137
        logger.info("Shutting down FastAPI HTTP server.")
138
        return server.shutdown()
139
    finally:
140
        shutdown_task.cancel()
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
        watchdog_task.cancel()


async def watchdog_loop(server: uvicorn.Server, engine: EngineClient):
    """
    # Watchdog task that runs in the background, checking
    # for error state in the engine. Needed to trigger shutdown
    # if an exception arises is StreamingResponse() generator.
    """
    VLLM_WATCHDOG_TIME_S = 5.0
    while True:
        await asyncio.sleep(VLLM_WATCHDOG_TIME_S)
        terminate_if_errored(server, engine)


def terminate_if_errored(server: uvicorn.Server, engine: EngineClient):
    """
    See discussions here on shutting down a uvicorn server
    https://github.com/encode/uvicorn/discussions/1103
    In this case we cannot await the server shutdown here
    because handler must first return to close the connection
    for this request.
    """
    engine_errored = engine.errored and not engine.is_running
    if not envs.VLLM_KEEP_ALIVE_ON_ENGINE_DEATH and engine_errored:
        server.should_exit = True