api_server.py 30.6 KB
Newer Older
1
# SPDX-License-Identifier: Apache-2.0
2
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3
import asyncio
4
5
import importlib
import inspect
6
import multiprocessing
7
import multiprocessing.forkserver as forkserver
8
import os
9
import signal
10
import socket
11
import tempfile
12
import warnings
13
from argparse import Namespace
14
from collections.abc import AsyncIterator
15
from contextlib import asynccontextmanager, suppress
16
from typing import Any
17

18
import uvloop
19
from fastapi import FastAPI, HTTPException
Zhuohan Li's avatar
Zhuohan Li committed
20
21
from fastapi.exceptions import RequestValidationError
from fastapi.middleware.cors import CORSMiddleware
22
from starlette.datastructures import State
Zhuohan Li's avatar
Zhuohan Li committed
23

24
import vllm.envs as envs
25
from vllm.config import ModelConfig, VllmConfig
Woosuk Kwon's avatar
Woosuk Kwon committed
26
from vllm.engine.arg_utils import AsyncEngineArgs
27
from vllm.engine.protocol import EngineClient
28
from vllm.entrypoints.chat_utils import load_chat_template
29
from vllm.entrypoints.launcher import serve_http
30
from vllm.entrypoints.logger import RequestLogger
31
from vllm.entrypoints.openai.cli_args import make_arg_parser, validate_parsed_serve_args
32
from vllm.entrypoints.openai.engine.protocol import GenerationError
33
from vllm.entrypoints.openai.models.protocol import BaseModelPath
34
35
from vllm.entrypoints.openai.models.serving import OpenAIServingModels
from vllm.entrypoints.openai.server_utils import (
36
37
    engine_error_handler,
    exception_handler,
38
    generation_error_handler,
39
40
41
42
43
    get_uvicorn_log_config,
    http_exception_handler,
    lifespan,
    log_response,
    validation_exception_handler,
44
)
45
from vllm.entrypoints.sagemaker.api_router import sagemaker_standards_bootstrap
46
47
48
from vllm.entrypoints.serve.elastic_ep.middleware import (
    ScalingMiddleware,
)
49
from vllm.entrypoints.serve.render.serving import OpenAIServingRender
50
from vllm.entrypoints.serve.tokenize.serving import OpenAIServingTokenization
51
52
53
from vllm.entrypoints.utils import (
    cli_env_setup,
    log_non_default_args,
54
    log_version_and_model,
55
    process_lora_modules,
56
)
57
from vllm.logger import init_logger
58
from vllm.reasoning import ReasoningParserManager
59
from vllm.tasks import POOLING_TASKS, SupportedTask
60
from vllm.tool_parsers import ToolParserManager
61
from vllm.tracing import instrument
yhu422's avatar
yhu422 committed
62
from vllm.usage.usage_lib import UsageContext
Cyrus Leung's avatar
Cyrus Leung committed
63
from vllm.utils.argparse_utils import FlexibleArgumentParser
64
from vllm.utils.network_utils import is_valid_ipv6_address
65
from vllm.utils.system_utils import decorate_logs, set_ulimit
66
from vllm.v1.engine.exceptions import EngineDeadError, EngineGenerateError
67
from vllm.version import __version__ as VLLM_VERSION
Zhuohan Li's avatar
Zhuohan Li committed
68

69
prometheus_multiproc_dir: tempfile.TemporaryDirectory
70

71
# Cannot use __name__ (https://github.com/vllm-project/vllm/pull/4765)
72
logger = init_logger("vllm.entrypoints.openai.api_server")
73

74
75
_FALLBACK_SUPPORTED_TASKS: tuple[SupportedTask, ...] = ("generate",)

76

77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def _startup_prefetch_weights(vllm_config: "VllmConfig") -> None:
    """Kick off reading model weight shards into OS page cache from the
    parent APIServer. EngineCore will read the same files a few seconds
    later from the child; by then the kernel already has them ready.

    All work (directory resolution, HF/ModelScope cache lookup, globbing,
    and the reads themselves) runs inside the background thread so we do
    not block the asyncio event loop.

    Best-effort: any failure (unknown model location, permission, etc.) is
    swallowed — vLLM's existing in-child prefetch then runs normally.
    """
    import threading

    # Capture only the small scalar fields the thread needs. Avoid holding
    # a reference to vllm_config (which contains unpicklable objects) for
    # longer than necessary.
    model_ref = vllm_config.model_config.model
    revision = vllm_config.model_config.revision
    download_dir = vllm_config.load_config.download_dir

    def _prefetch_worker() -> None:
        import glob
        import os

        from vllm import envs

        candidate_dir: str | None = None

        # 1. Local path?
        if os.path.isdir(model_ref):
            candidate_dir = model_ref
        else:
            # 2. HF / ModelScope repo id — resolve to the local cache
            # snapshot dir using the same revision / cache_dir the weight
            # loader will use, so we prefetch the right files.
            try:
                if envs.VLLM_USE_MODELSCOPE:
                    from modelscope.hub.snapshot_download import (
                        snapshot_download,
                    )

                    candidate_dir = snapshot_download(
                        model_id=model_ref,
                        revision=revision,
                        cache_dir=download_dir,
                        local_files_only=True,
                    )
                else:
                    from huggingface_hub import snapshot_download

                    candidate_dir = snapshot_download(
                        repo_id=model_ref,
                        revision=revision,
                        cache_dir=download_dir,
                        allow_patterns=[
                            "*.safetensors",
                            "*.bin",
                            "*.json",
                            "*tokenizer*",
                        ],
                        local_files_only=True,
                    )
            except Exception:
                return  # not cached yet or not a known repo id

        if not candidate_dir or not os.path.isdir(candidate_dir):
            return

        # Weight shards: large files, read into page cache.
        shard_paths = sorted(
            glob.glob(os.path.join(candidate_dir, "*.safetensors"))
            + glob.glob(os.path.join(candidate_dir, "*.bin"))
        )
        # Tokenizer/config sidecars: small, but re-opened in the child and
        # add synchronous open+read latency when the disk is cold.
        sidecar_paths = sorted(
            glob.glob(os.path.join(candidate_dir, "*.json"))
            + glob.glob(os.path.join(candidate_dir, "tokenizer.model"))
            + glob.glob(os.path.join(candidate_dir, "*tokenizer*"))
        )
        shard_paths.extend(sidecar_paths)
        if not shard_paths:
            return

        logger.debug(
            "Parent-side weight prefetch starting for %d files in %s",
            len(shard_paths),
            candidate_dir,
        )

        # Match vLLM's in-child prefetch block size + thread count.
        block_size = 16 * 1024 * 1024  # 16 MB
        # Read shards in parallel across 8 worker threads (bounded) to
        # saturate multi-spindle / multi-queue storage without thrashing.
        from concurrent.futures import ThreadPoolExecutor

        def read_one(p: str) -> None:
            try:
                with open(p, "rb") as f:
                    while f.read(block_size):
                        pass
            except Exception:
                pass

        with ThreadPoolExecutor(max_workers=8) as pool:
            list(pool.map(read_one, shard_paths))

    threading.Thread(
        target=_prefetch_worker,
        daemon=True,
        name="vllm-parent-weight-prefetch",
    ).start()


192
@asynccontextmanager
193
async def build_async_engine_client(
194
    args: Namespace,
195
196
    *,
    usage_context: UsageContext = UsageContext.OPENAI_API_SERVER,
197
    client_config: dict[str, Any] | None = None,
198
) -> AsyncIterator[EngineClient]:
199
200
201
202
    if os.getenv("VLLM_WORKER_MULTIPROC_METHOD") == "forkserver":
        # The executor is expected to be mp.
        # Pre-import heavy modules in the forkserver process
        logger.debug("Setup forkserver with pre-imports")
203
204
205
206
        # May already have been set by the CLI entry's async prewarm
        # (vllm/entrypoints/cli/main.py); tolerate re-call.
        with suppress(RuntimeError):
            multiprocessing.set_start_method("forkserver", force=False)
207
208
209
210
        multiprocessing.set_forkserver_preload(["vllm.v1.engine.async_llm"])
        forkserver.ensure_running()
        logger.debug("Forkserver setup complete!")

211
    # Context manager to handle engine_client lifecycle
212
213
    # Ensures everything is shutdown and cleaned up on error/exit
    engine_args = AsyncEngineArgs.from_cli_args(args)
214
215
216
    if client_config:
        engine_args._api_process_count = client_config.get("client_count", 1)
        engine_args._api_process_rank = client_config.get("client_index", 0)
217

218
    async with build_async_engine_client_from_engine_args(
219
220
221
        engine_args,
        usage_context=usage_context,
        client_config=client_config,
222
    ) as engine:
223
224
225
226
227
228
        yield engine


@asynccontextmanager
async def build_async_engine_client_from_engine_args(
    engine_args: AsyncEngineArgs,
229
230
    *,
    usage_context: UsageContext = UsageContext.OPENAI_API_SERVER,
231
    client_config: dict[str, Any] | None = None,
232
) -> AsyncIterator[EngineClient]:
233
    """
234
    Create EngineClient, either:
235
236
237
238
239
240
        - in-process using the AsyncLLMEngine Directly
        - multiprocess using AsyncLLMEngine RPC

    Returns the Client or None if the creation failed.
    """

241
242
243
    # Create the EngineConfig (determines if we can use V1).
    vllm_config = engine_args.create_engine_config(usage_context=usage_context)

244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
    # [startup] Start prefetching model weight shards into the OS page cache
    # in a background thread from the PARENT APIServer process. EngineCore
    # will page-fault on these same files ~10-15 s later (after fork + CUDA
    # context + distributed init + model init). For large-weight cases
    # (tens of GB) this parent-side head start meaningfully shrinks the
    # prefetch+load phase that the engine's in-child prefetch otherwise
    # barely overlaps.
    #
    # Skip in API-only workers that connect to an already-running EngineCore
    # (multi-API-server / disaggregated setups): those processes never load
    # weights, and if we prefetched from all of them we'd contend with the
    # engine's own read. Presence of an `input_address` in client_config is
    # the current marker that this worker is headless.
    #
    # Best-effort: if the model is a local path, glob for safetensors; if
    # it's a repo-id, try to resolve via HF hub (or ModelScope) local cache.
    # Any failure silently falls through to the existing in-child prefetch
    # path. All I/O (incl. directory resolution) runs inside the BG thread
    # so the asyncio event loop is never blocked.
    if not (client_config and client_config.get("input_address")):
        _startup_prefetch_weights(vllm_config)

266
    from vllm.v1.engine.async_llm import AsyncLLM
267

268
    async_llm: AsyncLLM | None = None
269
270
271
272
273
274

    # Don't mutate the input client_config
    client_config = dict(client_config) if client_config else {}
    client_count = client_config.pop("client_count", 1)
    client_index = client_config.pop("client_index", 0)

275
276
277
278
279
    try:
        async_llm = AsyncLLM.from_vllm_config(
            vllm_config=vllm_config,
            usage_context=usage_context,
            enable_log_requests=engine_args.enable_log_requests,
280
            aggregate_engine_logging=engine_args.aggregate_engine_logging,
281
282
283
            disable_log_stats=engine_args.disable_log_stats,
            client_addresses=client_config,
            client_count=client_count,
284
285
            client_index=client_index,
        )
286
287

        # Don't keep the dummy data in memory
288
        assert async_llm is not None
289
290
291
292
293
294
        await async_llm.reset_mm_cache()

        yield async_llm
    finally:
        if async_llm:
            async_llm.shutdown()
295
296


297
def build_app(
298
299
300
    args: Namespace,
    supported_tasks: tuple["SupportedTask", ...] | None = None,
    model_config: ModelConfig | None = None,
301
302
303
304
305
306
307
308
309
310
311
) -> FastAPI:
    if supported_tasks is None:
        warnings.warn(
            "The 'supported_tasks' parameter was not provided to "
            "build_app and will be required in a future version. "
            "Defaulting to ('generate',).",
            DeprecationWarning,
            stacklevel=2,
        )
        supported_tasks = _FALLBACK_SUPPORTED_TASKS

312
    if args.disable_fastapi_docs:
313
314
315
        app = FastAPI(
            openapi_url=None, docs_url=None, redoc_url=None, lifespan=lifespan
        )
316
317
    elif args.enable_offline_docs:
        app = FastAPI(docs_url=None, redoc_url=None, lifespan=lifespan)
318
319
    else:
        app = FastAPI(lifespan=lifespan)
320
    app.state.args = args
321

322
    from vllm.entrypoints.serve import register_vllm_serve_api_routers
323

324
    register_vllm_serve_api_routers(app)
325

326
327
    from vllm.entrypoints.openai.models.api_router import (
        attach_router as register_models_api_router,
328
329
    )

330
    register_models_api_router(app)
331

332
333
    from vllm.entrypoints.sagemaker.api_router import (
        attach_router as register_sagemaker_api_router,
334
335
    )

336
    register_sagemaker_api_router(app, supported_tasks, model_config)
337

338
    if "generate" in supported_tasks:
339
340
341
        from vllm.entrypoints.openai.generate.api_router import (
            register_generate_api_routers,
        )
342

343
        register_generate_api_routers(app)
344

345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
        from vllm.entrypoints.serve.disagg.api_router import (
            attach_router as attach_disagg_router,
        )

        attach_disagg_router(app)

        from vllm.entrypoints.serve.rlhf.api_router import (
            attach_router as attach_rlhf_router,
        )

        attach_rlhf_router(app)

        from vllm.entrypoints.serve.elastic_ep.api_router import (
            attach_router as elastic_ep_attach_router,
        )

        elastic_ep_attach_router(app)

363
364
365
366
367
368
        from vllm.entrypoints.openai.generative_scoring.api_router import (
            register_generative_scoring_api_router,
        )

        register_generative_scoring_api_router(app)

369
370
371
372
373
374
375
    if "generate" in supported_tasks or "render" in supported_tasks:
        from vllm.entrypoints.serve.render.api_router import (
            attach_router as attach_render_router,
        )

        attach_render_router(app)

376
    if "transcription" in supported_tasks:
377
378
        from vllm.entrypoints.openai.speech_to_text.api_router import (
            attach_router as register_speech_to_text_api_router,
379
        )
380

381
        register_speech_to_text_api_router(app)
Zhuohan Li's avatar
Zhuohan Li committed
382

383
384
385
386
387
388
389
    if "realtime" in supported_tasks:
        from vllm.entrypoints.openai.realtime.api_router import (
            attach_router as register_realtime_api_router,
        )

        register_realtime_api_router(app)

390
    if any(task in POOLING_TASKS for task in supported_tasks):
391
        from vllm.entrypoints.pooling.factories import register_pooling_api_routers
392

393
        register_pooling_api_routers(app, supported_tasks, model_config)
394

395
    app.root_path = args.root_path
Zhuohan Li's avatar
Zhuohan Li committed
396
397
398
399
400
401
402
403
    app.add_middleware(
        CORSMiddleware,
        allow_origins=args.allowed_origins,
        allow_credentials=args.allow_credentials,
        allow_methods=args.allowed_methods,
        allow_headers=args.allowed_headers,
    )

404
405
    app.exception_handler(HTTPException)(http_exception_handler)
    app.exception_handler(RequestValidationError)(validation_exception_handler)
406
407
    app.exception_handler(EngineGenerateError)(engine_error_handler)
    app.exception_handler(EngineDeadError)(engine_error_handler)
408
    app.exception_handler(GenerationError)(generation_error_handler)
409
    app.exception_handler(Exception)(exception_handler)
Ethan Xu's avatar
Ethan Xu committed
410

411
    # Ensure --api-key option from CLI takes precedence over VLLM_API_KEY
412
    if tokens := [key for key in (args.api_key or [envs.VLLM_API_KEY]) if key]:
413
414
        from vllm.entrypoints.openai.server_utils import AuthenticationMiddleware

415
        app.add_middleware(AuthenticationMiddleware, tokens=tokens)
416

417
    if args.enable_request_id_headers:
418
419
        from vllm.entrypoints.openai.server_utils import XRequestIdMiddleware

420
        app.add_middleware(XRequestIdMiddleware)
421

422
423
424
    # Add scaling middleware to check for scaling state
    app.add_middleware(ScalingMiddleware)

425
426
427
428
429
430
431
432
    if "realtime" in supported_tasks:
        # Add WebSocket metrics middleware
        from vllm.entrypoints.openai.realtime.metrics import (
            WebSocketMetricsMiddleware,
        )

        app.add_middleware(WebSocketMetricsMiddleware)

433
    if envs.VLLM_DEBUG_LOG_API_SERVER_RESPONSE:
434
435
436
437
438
        logger.warning(
            "CAUTION: Enabling log response in the API Server. "
            "This can include sensitive information and should be "
            "avoided in production."
        )
439
        app.middleware("http")(log_response)
440

441
442
443
444
    for middleware in args.middleware:
        module_path, object_name = middleware.rsplit(".", 1)
        imported = getattr(importlib.import_module(module_path), object_name)
        if inspect.isclass(imported):
445
            app.add_middleware(imported)  # type: ignore[arg-type]
446
447
448
        elif inspect.iscoroutinefunction(imported):
            app.middleware("http")(imported)
        else:
449
450
451
            raise ValueError(
                f"Invalid middleware {middleware}. Must be a function or a class."
            )
452

453
    app = sagemaker_standards_bootstrap(app)
Ethan Xu's avatar
Ethan Xu committed
454
455
456
    return app


457
async def init_app_state(
458
    engine_client: EngineClient,
459
    state: State,
460
    args: Namespace,
461
    supported_tasks: tuple["SupportedTask", ...] | None = None,
462
) -> None:
463
    vllm_config = engine_client.vllm_config
464
465
466
467
468
469
470
471
472
    if supported_tasks is None:
        warnings.warn(
            "The 'supported_tasks' parameter was not provided to "
            "init_app_state and will be required in a future version. "
            "Please pass 'supported_tasks' explicitly.",
            DeprecationWarning,
            stacklevel=2,
        )
        supported_tasks = _FALLBACK_SUPPORTED_TASKS
473

474
    if args.served_model_name is not None:
475
        served_model_names = args.served_model_name
476
    else:
477
        served_model_names = [args.model]
478

479
    if args.enable_log_requests:
480
        request_logger = RequestLogger(max_log_len=args.max_log_len)
481
482
    else:
        request_logger = None
483

484
    base_model_paths = [
485
        BaseModelPath(name=name, model_path=args.model) for name in served_model_names
486
487
    ]

488
    state.engine_client = engine_client
489
    state.log_stats = not args.disable_log_stats
490
    state.vllm_config = vllm_config
491
    state.args = args
492
    resolved_chat_template = load_chat_template(args.chat_template)
493

494
    # Merge default_mm_loras into the static lora_modules
495
496
497
498
499
500
    default_mm_loras = (
        vllm_config.lora_config.default_mm_loras
        if vllm_config.lora_config is not None
        else {}
    )
    lora_modules = process_lora_modules(args.lora_modules, default_mm_loras)
501

502
    state.openai_serving_models = OpenAIServingModels(
503
        engine_client=engine_client,
504
        base_model_paths=base_model_paths,
505
        lora_modules=lora_modules,
506
    )
507
    await state.openai_serving_models.init_static_loras()
508
509
510
511
512
513
514
515
516
517
518
519

    state.openai_serving_render = OpenAIServingRender(
        model_config=engine_client.model_config,
        renderer=engine_client.renderer,
        model_registry=state.openai_serving_models.registry,
        request_logger=request_logger,
        chat_template=resolved_chat_template,
        chat_template_content_format=args.chat_template_content_format,
        trust_request_chat_template=args.trust_request_chat_template,
        enable_auto_tools=args.enable_auto_tool_choice,
        exclude_tools_when_tool_choice_none=args.exclude_tools_when_tool_choice_none,
        tool_parser=args.tool_call_parser,
520
        reasoning_parser=args.structured_outputs_config.reasoning_parser,
521
522
523
524
        default_chat_template_kwargs=args.default_chat_template_kwargs,
        log_error_stack=args.log_error_stack,
    )

525
    state.openai_serving_tokenization = OpenAIServingTokenization(
526
        engine_client,
527
        state.openai_serving_models,
528
        state.openai_serving_render,
529
        request_logger=request_logger,
530
531
        chat_template=resolved_chat_template,
        chat_template_content_format=args.chat_template_content_format,
532
        default_chat_template_kwargs=args.default_chat_template_kwargs,
533
        trust_request_chat_template=args.trust_request_chat_template,
534
    )
535

536
    if "generate" in supported_tasks:
537
538
539
540
        from vllm.entrypoints.openai.generate.api_router import init_generate_state

        await init_generate_state(
            engine_client, state, args, request_logger, supported_tasks
541
        )
542

543
544
545
546
547
548
        from vllm.entrypoints.openai.generative_scoring.api_router import (
            init_generative_scoring_state,
        )

        await init_generative_scoring_state(engine_client, state, args, request_logger)

549
    if "transcription" in supported_tasks:
550
        from vllm.entrypoints.openai.speech_to_text.api_router import (
551
            init_transcription_state,
552
        )
553
554
555

        init_transcription_state(
            engine_client, state, args, request_logger, supported_tasks
556
        )
557

558
559
560
561
562
    if "realtime" in supported_tasks:
        from vllm.entrypoints.openai.realtime.api_router import init_realtime_state

        init_realtime_state(engine_client, state, args, request_logger, supported_tasks)

563
    if any(task in POOLING_TASKS for task in supported_tasks):
564
        from vllm.entrypoints.pooling.factories import init_pooling_state
565

566
        init_pooling_state(engine_client, state, args, request_logger, supported_tasks)
567

568
569
570
    state.enable_server_load_tracking = args.enable_server_load_tracking
    state.server_load_metrics = 0

571

572
573
574
575
576
577
578
579
580
async def init_render_app_state(
    vllm_config: VllmConfig,
    state: State,
    args: Namespace,
) -> None:
    """Initialise FastAPI app state for a CPU-only render server.

    Unlike :func:`init_app_state` this function does not require an
    :class:`~vllm.engine.protocol.EngineClient`; it bootstraps the
581
    preprocessing pipeline (renderer, input_processor)
582
583
584
    directly from the :class:`~vllm.config.VllmConfig`.
    """
    from vllm.entrypoints.chat_utils import load_chat_template
585
    from vllm.entrypoints.openai.models.serving import OpenAIModelRegistry
586
587
588
589
    from vllm.entrypoints.serve.render.serving import OpenAIServingRender
    from vllm.renderers import renderer_from_config

    served_model_names = args.served_model_name or [args.model]
590
591
592
593
594
595
596
    model_registry = OpenAIModelRegistry(
        model_config=vllm_config.model_config,
        base_model_paths=[
            BaseModelPath(name=name, model_path=args.model)
            for name in served_model_names
        ],
    )
597
598
599
600
601
602
603
604
605
606
607
608

    if args.enable_log_requests:
        request_logger = RequestLogger(max_log_len=args.max_log_len)
    else:
        request_logger = None

    renderer = renderer_from_config(vllm_config)
    resolved_chat_template = load_chat_template(args.chat_template)

    state.openai_serving_render = OpenAIServingRender(
        model_config=vllm_config.model_config,
        renderer=renderer,
609
        model_registry=model_registry,
610
611
612
613
614
615
616
        request_logger=request_logger,
        chat_template=resolved_chat_template,
        chat_template_content_format=args.chat_template_content_format,
        trust_request_chat_template=args.trust_request_chat_template,
        enable_auto_tools=args.enable_auto_tool_choice,
        exclude_tools_when_tool_choice_none=args.exclude_tools_when_tool_choice_none,
        tool_parser=args.tool_call_parser,
617
        reasoning_parser=args.structured_outputs_config.reasoning_parser,
618
619
620
621
        default_chat_template_kwargs=args.default_chat_template_kwargs,
        log_error_stack=args.log_error_stack,
    )

622
    state.openai_serving_models = model_registry
623

624
625
626
    # Expose tokenization via the render handler (no engine required).
    state.openai_serving_tokenization = state.openai_serving_render

627
628
629
630
631
632
633
634
635
    state.vllm_config = vllm_config
    # Disable stats logging — there is no engine to poll.
    state.log_stats = False
    state.engine_client = None
    state.args = args
    state.enable_server_load_tracking = False
    state.server_load_metrics = 0


636
def create_server_socket(addr: tuple[str, int]) -> socket.socket:
637
638
639
640
641
642
    family = socket.AF_INET
    if is_valid_ipv6_address(addr[0]):
        family = socket.AF_INET6

    sock = socket.socket(family=family, type=socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
643
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
644
645
646
647
648
    sock.bind(addr)

    return sock


649
650
651
652
653
654
def create_server_unix_socket(path: str) -> socket.socket:
    sock = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM)
    sock.bind(path)
    return sock


655
def validate_api_server_args(args):
656
    valid_tool_parses = ToolParserManager.list_registered()
657
658
659
660
661
    if args.enable_auto_tool_choice and args.tool_call_parser not in valid_tool_parses:
        raise KeyError(
            f"invalid tool call parser: {args.tool_call_parser} "
            f"(chose from {{ {','.join(valid_tool_parses)} }})"
        )
662

663
    valid_reasoning_parsers = ReasoningParserManager.list_registered()
664
665
    if (
        reasoning_parser := args.structured_outputs_config.reasoning_parser
666
    ) and reasoning_parser not in valid_reasoning_parsers:
667
        raise KeyError(
668
            f"invalid reasoning parser: {reasoning_parser} "
669
            f"(chose from {{ {','.join(valid_reasoning_parsers)} }})"
670
        )
671

672

673
@instrument(span_name="API server setup")
674
675
676
677
def setup_server(args):
    """Validate API server args, set up signal handler, create socket
    ready to serve."""

678
    log_version_and_model(logger, VLLM_VERSION, args.model)
679
680
681
682
683
    log_non_default_args(args)

    if args.tool_parser_plugin and len(args.tool_parser_plugin) > 3:
        ToolParserManager.import_tool_parser(args.tool_parser_plugin)

684
685
686
    if args.reasoning_parser_plugin and len(args.reasoning_parser_plugin) > 3:
        ReasoningParserManager.import_reasoning_parser(args.reasoning_parser_plugin)

687
688
    validate_api_server_args(args)

689
690
691
    # workaround to make sure that we bind the port before the engine is set up.
    # This avoids race conditions with ray.
    # see https://github.com/vllm-project/vllm/issues/8204
692
693
694
695
696
    if args.uds:
        sock = create_server_unix_socket(args.uds)
    else:
        sock_addr = (args.host or "", args.port)
        sock = create_server_socket(sock_addr)
697

698
699
700
701
    # workaround to avoid footguns where uvicorn drops requests with too
    # many concurrent requests active
    set_ulimit()

702
703
704
705
706
707
    def signal_handler(*_) -> None:
        # Interrupt server on sigterm while initializing
        raise KeyboardInterrupt("terminated")

    signal.signal(signal.SIGTERM, signal_handler)

708
709
710
711
712
    if args.uds:
        listen_address = f"unix:{args.uds}"
    else:
        addr, port = sock_addr
        is_ssl = args.ssl_keyfile and args.ssl_certfile
713
        host_part = f"[{addr}]" if is_valid_ipv6_address(addr) else addr or "0.0.0.0"
714
        listen_address = f"http{'s' if is_ssl else ''}://{host_part}:{port}"
715
716
717
    return listen_address, sock


718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
async def build_and_serve(
    engine_client: EngineClient,
    listen_address: str,
    sock: socket.socket,
    args: Namespace,
    **uvicorn_kwargs,
) -> asyncio.Task:
    """Build FastAPI app, initialize state, and start serving.

    Returns the shutdown task for the caller to await.
    """

    # Get uvicorn log config (from file or with endpoint filter)
    log_config = get_uvicorn_log_config(args)
    if log_config is not None:
        uvicorn_kwargs["log_config"] = log_config

    supported_tasks = await engine_client.get_supported_tasks()
736
737
    model_config = engine_client.model_config

738
    logger.info("Supported tasks: %s", supported_tasks)
739
    app = build_app(args, supported_tasks, model_config)
740
741
742
743
744
745
746
747
748
749
750
751
752
    await init_app_state(engine_client, app.state, args, supported_tasks)

    logger.info("Starting vLLM server on %s", listen_address)

    return await serve_http(
        app,
        sock=sock,
        enable_ssl_refresh=args.enable_ssl_refresh,
        host=args.host,
        port=args.port,
        log_level=args.uvicorn_log_level,
        # NOTE: When the 'disable_uvicorn_access_log' value is True,
        # no access log will be output.
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
        access_log=not args.disable_uvicorn_access_log,
        timeout_keep_alive=envs.VLLM_HTTP_TIMEOUT_KEEP_ALIVE,
        ssl_keyfile=args.ssl_keyfile,
        ssl_certfile=args.ssl_certfile,
        ssl_ca_certs=args.ssl_ca_certs,
        ssl_cert_reqs=args.ssl_cert_reqs,
        ssl_ciphers=args.ssl_ciphers,
        h11_max_incomplete_event_size=args.h11_max_incomplete_event_size,
        h11_max_header_count=args.h11_max_header_count,
        **uvicorn_kwargs,
    )


async def build_and_serve_renderer(
    vllm_config: VllmConfig,
    listen_address: str,
    sock: socket.socket,
    args: Namespace,
    **uvicorn_kwargs,
) -> asyncio.Task:
    """Build FastAPI app for a CPU-only render server, initialize state, and
    start serving.

    Returns the shutdown task for the caller to await.
    """

    # Get uvicorn log config (from file or with endpoint filter)
    log_config = get_uvicorn_log_config(args)
    if log_config is not None:
        uvicorn_kwargs["log_config"] = log_config

    app = build_app(args, ("render",))
    await init_render_app_state(vllm_config, app.state, args)

    logger.info("Starting vLLM server on %s", listen_address)

    return await serve_http(
        app,
        sock=sock,
        enable_ssl_refresh=args.enable_ssl_refresh,
        host=args.host,
        port=args.port,
        log_level=args.uvicorn_log_level,
        # NOTE: When the 'disable_uvicorn_access_log' value is True,
        # no access log will be output.
798
799
800
801
802
803
804
805
806
807
808
809
810
        access_log=not args.disable_uvicorn_access_log,
        timeout_keep_alive=envs.VLLM_HTTP_TIMEOUT_KEEP_ALIVE,
        ssl_keyfile=args.ssl_keyfile,
        ssl_certfile=args.ssl_certfile,
        ssl_ca_certs=args.ssl_ca_certs,
        ssl_cert_reqs=args.ssl_cert_reqs,
        ssl_ciphers=args.ssl_ciphers,
        h11_max_incomplete_event_size=args.h11_max_incomplete_event_size,
        h11_max_header_count=args.h11_max_header_count,
        **uvicorn_kwargs,
    )


811
812
async def run_server(args, **uvicorn_kwargs) -> None:
    """Run a single-worker API server."""
813
814

    # Add process-specific prefix to stdout and stderr.
815
    decorate_logs("APIServer")
816

817
818
819
820
    listen_address, sock = setup_server(args)
    await run_server_worker(listen_address, sock, args, **uvicorn_kwargs)


821
822
823
async def run_server_worker(
    listen_address, sock, args, client_config=None, **uvicorn_kwargs
) -> None:
824
825
826
827
828
    """Run a single API server worker."""

    if args.tool_parser_plugin and len(args.tool_parser_plugin) > 3:
        ToolParserManager.import_tool_parser(args.tool_parser_plugin)

829
830
831
    if args.reasoning_parser_plugin and len(args.reasoning_parser_plugin) > 3:
        ReasoningParserManager.import_reasoning_parser(args.reasoning_parser_plugin)

832
    async with build_async_engine_client(
833
834
        args,
        client_config=client_config,
835
    ) as engine_client:
836
837
        shutdown_task = await build_and_serve(
            engine_client, listen_address, sock, args, **uvicorn_kwargs
838
        )
839
    # NB: Await server shutdown only after the backend context is exited
840
841
842
843
    try:
        await shutdown_task
    finally:
        sock.close()
844

Ethan Xu's avatar
Ethan Xu committed
845
846
847

if __name__ == "__main__":
    # NOTE(simon):
848
849
    # This section should be in sync with vllm/entrypoints/cli/main.py for CLI
    # entrypoints.
850
    cli_env_setup()
Ethan Xu's avatar
Ethan Xu committed
851
    parser = FlexibleArgumentParser(
852
853
        description="vLLM OpenAI-Compatible RESTful API server."
    )
Ethan Xu's avatar
Ethan Xu committed
854
855
    parser = make_arg_parser(parser)
    args = parser.parse_args()
856
    validate_parsed_serve_args(args)
857

858
    uvloop.run(run_server(args))