launcher.py 4.66 KB
Newer Older
1
# SPDX-License-Identifier: Apache-2.0
2
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3

4
5
import asyncio
import signal
6
import socket
7
from typing import Any
8
9

import uvicorn
10
from fastapi import FastAPI
11

12
from vllm import envs
13
from vllm.engine.protocol import EngineClient
14
15
16
17
from vllm.entrypoints.constants import (
    H11_MAX_HEADER_COUNT_DEFAULT,
    H11_MAX_INCOMPLETE_EVENT_SIZE_DEFAULT,
)
18
from vllm.entrypoints.ssl import SSLCertRefresher
19
from vllm.logger import init_logger
20
from vllm.utils.network_utils import find_process_using_port
21
22
23
24

logger = init_logger(__name__)


25
26
async def serve_http(
    app: FastAPI,
27
    sock: socket.socket | None,
28
29
30
    enable_ssl_refresh: bool = False,
    **uvicorn_kwargs: Any,
):
31
32
33
34
35
    """
    Start a FastAPI app using Uvicorn, with support for custom Uvicorn config
    options.  Supports http header limits via h11_max_incomplete_event_size and
    h11_max_header_count.
    """
36
    logger.info("Available routes are:")
37
    # post endpoints
38
39
40
41
42
43
44
    for route in app.routes:
        methods = getattr(route, "methods", None)
        path = getattr(route, "path", None)

        if methods is None or path is None:
            continue

45
        logger.info("Route: %s, Methods: %s", path, ", ".join(methods))
46

47
48
49
50
51
52
53
54
55
56
57
    # other endpoints
    for route in app.routes:
        endpoint = getattr(route, "endpoint", None)
        methods = getattr(route, "methods", None)
        path = getattr(route, "path", None)

        if endpoint is None or path is None or methods is not None:
            continue

        logger.info("Route: %s, Endpoint: %s", path, endpoint.__name__)

58
59
    # Extract header limit options if present
    h11_max_incomplete_event_size = uvicorn_kwargs.pop(
60
61
        "h11_max_incomplete_event_size", None
    )
62
63
64
65
66
67
68
69
    h11_max_header_count = uvicorn_kwargs.pop("h11_max_header_count", None)

    # Set safe defaults if not provided
    if h11_max_incomplete_event_size is None:
        h11_max_incomplete_event_size = H11_MAX_INCOMPLETE_EVENT_SIZE_DEFAULT
    if h11_max_header_count is None:
        h11_max_header_count = H11_MAX_HEADER_COUNT_DEFAULT

70
    config = uvicorn.Config(app, **uvicorn_kwargs)
71
72
73
    # Set header limits
    config.h11_max_incomplete_event_size = h11_max_incomplete_event_size
    config.h11_max_header_count = h11_max_header_count
74
    config.load()
75
    server = uvicorn.Server(config)
76
    app.state.server = server
77
78
79

    loop = asyncio.get_running_loop()

80
81
82
83
84
85
86
87
88
89
90
91
92
    watchdog_task = loop.create_task(watchdog_loop(server, app.state.engine_client))
    server_task = loop.create_task(server.serve(sockets=[sock] if sock else None))

    ssl_cert_refresher = (
        None
        if not enable_ssl_refresh
        else SSLCertRefresher(
            ssl_context=config.ssl,
            key_path=config.ssl_keyfile,
            cert_path=config.ssl_certfile,
            ca_path=config.ssl_ca_certs,
        )
    )
93

94
95
96
    def signal_handler() -> None:
        # prevents the uvicorn signal handler to exit early
        server_task.cancel()
97
        watchdog_task.cancel()
98
99
        if ssl_cert_refresher:
            ssl_cert_refresher.stop()
100
101
102
103
104
105
106
107
108
109
110

    async def dummy_shutdown() -> None:
        pass

    loop.add_signal_handler(signal.SIGINT, signal_handler)
    loop.add_signal_handler(signal.SIGTERM, signal_handler)

    try:
        await server_task
        return dummy_shutdown()
    except asyncio.CancelledError:
111
112
113
        port = uvicorn_kwargs["port"]
        process = find_process_using_port(port)
        if process is not None:
114
            logger.warning(
115
                "port %s is used by process %s launched with command:\n%s",
116
117
118
119
                port,
                process,
                " ".join(process.cmdline()),
            )
120
        logger.info("Shutting down FastAPI HTTP server.")
121
        return server.shutdown()
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
    finally:
        watchdog_task.cancel()


async def watchdog_loop(server: uvicorn.Server, engine: EngineClient):
    """
    # Watchdog task that runs in the background, checking
    # for error state in the engine. Needed to trigger shutdown
    # if an exception arises is StreamingResponse() generator.
    """
    VLLM_WATCHDOG_TIME_S = 5.0
    while True:
        await asyncio.sleep(VLLM_WATCHDOG_TIME_S)
        terminate_if_errored(server, engine)


def terminate_if_errored(server: uvicorn.Server, engine: EngineClient):
    """
    See discussions here on shutting down a uvicorn server
    https://github.com/encode/uvicorn/discussions/1103
    In this case we cannot await the server shutdown here
    because handler must first return to close the connection
    for this request.
    """
    engine_errored = engine.errored and not engine.is_running
    if not envs.VLLM_KEEP_ALIVE_ON_ENGINE_DEATH and engine_errored:
        server.should_exit = True