"tests/models/multimodal/pooling/test_llava_next.py" did not exist on "75404d041be0d6e656b59cbbea23520d47d37b66"
api_server.py 28.4 KB
Newer Older
1
import asyncio
2
import atexit
3
4
import importlib
import inspect
5
import multiprocessing
6
import os
7
import re
8
import signal
9
import socket
10
import tempfile
11
import uuid
12
from argparse import Namespace
13
from contextlib import asynccontextmanager
14
from functools import partial
15
from http import HTTPStatus
16
from typing import AsyncIterator, Optional, Set, Tuple
17

18
import uvloop
19
from fastapi import APIRouter, FastAPI, Request
Zhuohan Li's avatar
Zhuohan Li committed
20
21
from fastapi.exceptions import RequestValidationError
from fastapi.middleware.cors import CORSMiddleware
22
from fastapi.responses import JSONResponse, Response, StreamingResponse
23
from starlette.datastructures import State
24
from starlette.routing import Mount
25
from typing_extensions import assert_never
Zhuohan Li's avatar
Zhuohan Li committed
26

27
import vllm.envs as envs
28
from vllm.config import ModelConfig
Woosuk Kwon's avatar
Woosuk Kwon committed
29
from vllm.engine.arg_utils import AsyncEngineArgs
30
from vllm.engine.async_llm_engine import AsyncLLMEngine  # type: ignore
31
32
33
from vllm.engine.multiprocessing.client import MQLLMEngineClient
from vllm.engine.multiprocessing.engine import run_mp_engine
from vllm.engine.protocol import EngineClient
34
from vllm.entrypoints.chat_utils import load_chat_template
35
from vllm.entrypoints.launcher import serve_http
36
from vllm.entrypoints.logger import RequestLogger
37
38
from vllm.entrypoints.openai.cli_args import (make_arg_parser,
                                              validate_parsed_serve_args)
39
40
# yapf conflicts with isort for this block
# yapf: disable
41
from vllm.entrypoints.openai.protocol import (ChatCompletionRequest,
42
                                              ChatCompletionResponse,
43
                                              CompletionRequest,
44
                                              CompletionResponse,
45
46
                                              DetokenizeRequest,
                                              DetokenizeResponse,
47
                                              EmbeddingRequest,
48
49
50
                                              EmbeddingResponse,
                                              EmbeddingResponseData,
                                              ErrorResponse,
51
                                              LoadLoraAdapterRequest,
52
                                              PoolingRequest, PoolingResponse,
53
                                              ScoreRequest, ScoreResponse,
54
                                              TokenizeRequest,
55
56
57
                                              TokenizeResponse,
                                              UnloadLoraAdapterRequest)
# yapf: enable
58
59
from vllm.entrypoints.openai.serving_chat import OpenAIServingChat
from vllm.entrypoints.openai.serving_completion import OpenAIServingCompletion
60
from vllm.entrypoints.openai.serving_embedding import OpenAIServingEmbedding
61
62
63
from vllm.entrypoints.openai.serving_engine import OpenAIServing
from vllm.entrypoints.openai.serving_models import (BaseModelPath,
                                                    OpenAIServingModels)
64
from vllm.entrypoints.openai.serving_pooling import OpenAIServingPooling
65
from vllm.entrypoints.openai.serving_score import OpenAIServingScores
66
67
from vllm.entrypoints.openai.serving_tokenization import (
    OpenAIServingTokenization)
68
from vllm.entrypoints.openai.tool_parsers import ToolParserManager
69
from vllm.entrypoints.utils import with_cancellation
70
from vllm.logger import init_logger
yhu422's avatar
yhu422 committed
71
from vllm.usage.usage_lib import UsageContext
72
from vllm.utils import (FlexibleArgumentParser, get_open_zmq_ipc_path,
73
                        is_valid_ipv6_address, set_ulimit)
74
from vllm.version import __version__ as VLLM_VERSION
Zhuohan Li's avatar
Zhuohan Li committed
75

76
TIMEOUT_KEEP_ALIVE = 5  # seconds
Zhuohan Li's avatar
Zhuohan Li committed
77

78
prometheus_multiproc_dir: tempfile.TemporaryDirectory
79

80
# Cannot use __name__ (https://github.com/vllm-project/vllm/pull/4765)
81
logger = init_logger('vllm.entrypoints.openai.api_server')
82

83
_running_tasks: Set[asyncio.Task] = set()
84

85

86
@asynccontextmanager
87
async def lifespan(app: FastAPI):
88
89
    try:
        if app.state.log_stats:
90
            engine_client: EngineClient = app.state.engine_client
91
92
93

            async def _force_log():
                while True:
94
95
                    await asyncio.sleep(10.)
                    await engine_client.do_log_stats()
96
97
98
99
100
101
102
103
104
105
106
107
108
109

            task = asyncio.create_task(_force_log())
            _running_tasks.add(task)
            task.add_done_callback(_running_tasks.remove)
        else:
            task = None
        try:
            yield
        finally:
            if task is not None:
                task.cancel()
    finally:
        # Ensure app state including engine ref is gc'd
        del app.state
110
111


112
@asynccontextmanager
113
async def build_async_engine_client(
114
        args: Namespace) -> AsyncIterator[EngineClient]:
115

116
    # Context manager to handle engine_client lifecycle
117
118
119
    # Ensures everything is shutdown and cleaned up on error/exit
    engine_args = AsyncEngineArgs.from_cli_args(args)

120
121
122
123
124
125
126
127
128
    async with build_async_engine_client_from_engine_args(
            engine_args, args.disable_frontend_multiprocessing) as engine:
        yield engine


@asynccontextmanager
async def build_async_engine_client_from_engine_args(
    engine_args: AsyncEngineArgs,
    disable_frontend_multiprocessing: bool = False,
129
) -> AsyncIterator[EngineClient]:
130
    """
131
    Create EngineClient, either:
132
133
134
135
136
137
        - in-process using the AsyncLLMEngine Directly
        - multiprocess using AsyncLLMEngine RPC

    Returns the Client or None if the creation failed.
    """

138
    # AsyncLLMEngine.
139
    if (MQLLMEngineClient.is_unsupported_config(engine_args)
140
            or envs.VLLM_USE_V1 or disable_frontend_multiprocessing):
141

142
143
144
145
146
147
148
149
150
        engine_client: Optional[EngineClient] = None
        try:
            engine_client = AsyncLLMEngine.from_engine_args(
                engine_args=engine_args,
                usage_context=UsageContext.OPENAI_API_SERVER)
            yield engine_client
        finally:
            if engine_client and hasattr(engine_client, "shutdown"):
                engine_client.shutdown()
151

152
    # MQLLMEngine.
153
    else:
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
        if "PROMETHEUS_MULTIPROC_DIR" not in os.environ:
            # Make TemporaryDirectory for prometheus multiprocessing
            # Note: global TemporaryDirectory will be automatically
            #   cleaned up upon exit.
            global prometheus_multiproc_dir
            prometheus_multiproc_dir = tempfile.TemporaryDirectory()
            os.environ[
                "PROMETHEUS_MULTIPROC_DIR"] = prometheus_multiproc_dir.name
        else:
            logger.warning(
                "Found PROMETHEUS_MULTIPROC_DIR was set by user. "
                "This directory must be wiped between vLLM runs or "
                "you will find inaccurate metrics. Unset the variable "
                "and vLLM will properly handle cleanup.")

169
        # Select random path for IPC.
170
        ipc_path = get_open_zmq_ipc_path()
171
172
        logger.debug("Multiprocessing frontend to use %s for IPC Path.",
                     ipc_path)
173

174
        # Start RPCServer in separate process (holds the LLMEngine).
175
176
        # the current process might have CUDA context,
        # so we need to spawn a new process
177
178
        context = multiprocessing.get_context("spawn")

179
180
181
182
        # The Process can raise an exception during startup, which may
        # not actually result in an exitcode being reported. As a result
        # we use a shared variable to communicate the information.
        engine_alive = multiprocessing.Value('b', True, lock=False)
183
184
185
        engine_process = context.Process(target=run_mp_engine,
                                         args=(engine_args,
                                               UsageContext.OPENAI_API_SERVER,
186
                                               ipc_path, engine_alive))
187
        engine_process.start()
188
        engine_pid = engine_process.pid
189
        assert engine_pid is not None, "Engine process failed to start."
190
        logger.info("Started engine process with PID %d", engine_pid)
191

192
193
194
195
196
197
198
199
        def _cleanup_ipc_path():
            socket_path = ipc_path.replace("ipc://", "")
            if os.path.exists(socket_path):
                os.remove(socket_path)

        # Ensure we clean up the local IPC socket file on exit.
        atexit.register(_cleanup_ipc_path)

200
201
        # Build RPCClient, which conforms to EngineClient Protocol.
        engine_config = engine_args.create_engine_config()
202
203
204
205
        build_client = partial(MQLLMEngineClient, ipc_path, engine_config,
                               engine_pid)
        mq_engine_client = await asyncio.get_running_loop().run_in_executor(
            None, build_client)
206
        try:
207
208
            while True:
                try:
209
                    await mq_engine_client.setup()
210
                    break
211
                except TimeoutError:
212
213
                    if (not engine_process.is_alive()
                            or not engine_alive.value):
214
                        raise RuntimeError(
215
216
                            "Engine process failed to start. See stack "
                            "trace for the root cause.") from None
217

218
            yield mq_engine_client  # type: ignore[misc]
219
220
        finally:
            # Ensure rpc server process was terminated
221
            engine_process.terminate()
222
223

            # Close all open connections to the backend
224
            mq_engine_client.close()
225

226
227
228
229
230
            # Wait for engine process to join
            engine_process.join(4)
            if engine_process.exitcode is None:
                # Kill if taking longer than 5 seconds to stop
                engine_process.kill()
231

232
233
234
235
236
            # Lazy import for prometheus multiprocessing.
            # We need to set PROMETHEUS_MULTIPROC_DIR environment variable
            # before prometheus_client is imported.
            # See https://prometheus.github.io/client_python/multiprocess/
            from prometheus_client import multiprocess
237
            multiprocess.mark_process_dead(engine_process.pid)
238

239

Ethan Xu's avatar
Ethan Xu committed
240
router = APIRouter()
Zhuohan Li's avatar
Zhuohan Li committed
241

242

243
def mount_metrics(app: FastAPI):
244
245
246
247
248
249
250
251
252
    # Lazy import for prometheus multiprocessing.
    # We need to set PROMETHEUS_MULTIPROC_DIR environment variable
    # before prometheus_client is imported.
    # See https://prometheus.github.io/client_python/multiprocess/
    from prometheus_client import (CollectorRegistry, make_asgi_app,
                                   multiprocess)

    prometheus_multiproc_dir_path = os.getenv("PROMETHEUS_MULTIPROC_DIR", None)
    if prometheus_multiproc_dir_path is not None:
253
254
        logger.debug("vLLM to use %s as PROMETHEUS_MULTIPROC_DIR",
                     prometheus_multiproc_dir_path)
255
256
257
258
259
260
261
262
263
        registry = CollectorRegistry()
        multiprocess.MultiProcessCollector(registry)

        # Add prometheus asgi middleware to route /metrics requests
        metrics_route = Mount("/metrics", make_asgi_app(registry=registry))
    else:
        # Add prometheus asgi middleware to route /metrics requests
        metrics_route = Mount("/metrics", make_asgi_app())

264
    # Workaround for 307 Redirect for /metrics
265
    metrics_route.path_regex = re.compile("^/metrics(?P<path>.*)$")
266
    app.routes.append(metrics_route)
267
268


269
270
271
272
273
def base(request: Request) -> OpenAIServing:
    # Reuse the existing instance
    return tokenization(request)


274
275
276
277
def models(request: Request) -> OpenAIServingModels:
    return request.app.state.openai_serving_models


278
def chat(request: Request) -> Optional[OpenAIServingChat]:
279
280
281
    return request.app.state.openai_serving_chat


282
def completion(request: Request) -> Optional[OpenAIServingCompletion]:
283
284
285
    return request.app.state.openai_serving_completion


286
287
288
289
def pooling(request: Request) -> Optional[OpenAIServingPooling]:
    return request.app.state.openai_serving_pooling


290
291
def embedding(request: Request) -> Optional[OpenAIServingEmbedding]:
    return request.app.state.openai_serving_embedding
292
293


294
295
296
297
def score(request: Request) -> Optional[OpenAIServingScores]:
    return request.app.state.openai_serving_scores


298
299
def tokenization(request: Request) -> OpenAIServingTokenization:
    return request.app.state.openai_serving_tokenization
300
301


302
def engine_client(request: Request) -> EngineClient:
303
304
305
    return request.app.state.engine_client


Ethan Xu's avatar
Ethan Xu committed
306
@router.get("/health")
307
async def health(raw_request: Request) -> Response:
308
    """Health check."""
309
    await engine_client(raw_request).check_health()
310
311
312
    return Response(status_code=200)


Ethan Xu's avatar
Ethan Xu committed
313
@router.post("/tokenize")
314
@with_cancellation
315
async def tokenize(request: TokenizeRequest, raw_request: Request):
316
317
    handler = tokenization(raw_request)

318
    generator = await handler.create_tokenize(request, raw_request)
319
320
321
    if isinstance(generator, ErrorResponse):
        return JSONResponse(content=generator.model_dump(),
                            status_code=generator.code)
322
    elif isinstance(generator, TokenizeResponse):
323
324
        return JSONResponse(content=generator.model_dump())

325
326
    assert_never(generator)

327

Ethan Xu's avatar
Ethan Xu committed
328
@router.post("/detokenize")
329
@with_cancellation
330
async def detokenize(request: DetokenizeRequest, raw_request: Request):
331
332
    handler = tokenization(raw_request)

333
    generator = await handler.create_detokenize(request, raw_request)
334
335
336
    if isinstance(generator, ErrorResponse):
        return JSONResponse(content=generator.model_dump(),
                            status_code=generator.code)
337
    elif isinstance(generator, DetokenizeResponse):
338
339
        return JSONResponse(content=generator.model_dump())

340
341
    assert_never(generator)

342

Ethan Xu's avatar
Ethan Xu committed
343
@router.get("/v1/models")
344
async def show_available_models(raw_request: Request):
345
    handler = models(raw_request)
346

347
348
    models_ = await handler.show_available_models()
    return JSONResponse(content=models_.model_dump())
Zhuohan Li's avatar
Zhuohan Li committed
349
350


Ethan Xu's avatar
Ethan Xu committed
351
@router.get("/version")
352
async def show_version():
353
    ver = {"version": VLLM_VERSION}
354
355
356
    return JSONResponse(content=ver)


Ethan Xu's avatar
Ethan Xu committed
357
@router.post("/v1/chat/completions")
358
@with_cancellation
359
360
async def create_chat_completion(request: ChatCompletionRequest,
                                 raw_request: Request):
361
362
363
364
    handler = chat(raw_request)
    if handler is None:
        return base(raw_request).create_error_response(
            message="The model does not support Chat Completions API")
365

366
    generator = await handler.create_chat_completion(request, raw_request)
367

368
369
370
    if isinstance(generator, ErrorResponse):
        return JSONResponse(content=generator.model_dump(),
                            status_code=generator.code)
371

372
    elif isinstance(generator, ChatCompletionResponse):
373
        return JSONResponse(content=generator.model_dump())
374

375
376
    return StreamingResponse(content=generator, media_type="text/event-stream")

377

Ethan Xu's avatar
Ethan Xu committed
378
@router.post("/v1/completions")
379
@with_cancellation
380
async def create_completion(request: CompletionRequest, raw_request: Request):
381
382
383
384
385
386
    handler = completion(raw_request)
    if handler is None:
        return base(raw_request).create_error_response(
            message="The model does not support Completions API")

    generator = await handler.create_completion(request, raw_request)
387
388
389
    if isinstance(generator, ErrorResponse):
        return JSONResponse(content=generator.model_dump(),
                            status_code=generator.code)
390
    elif isinstance(generator, CompletionResponse):
391
        return JSONResponse(content=generator.model_dump())
Zhuohan Li's avatar
Zhuohan Li committed
392

393
394
    return StreamingResponse(content=generator, media_type="text/event-stream")

Zhuohan Li's avatar
Zhuohan Li committed
395

Ethan Xu's avatar
Ethan Xu committed
396
@router.post("/v1/embeddings")
397
@with_cancellation
398
async def create_embedding(request: EmbeddingRequest, raw_request: Request):
399
400
    handler = embedding(raw_request)
    if handler is None:
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
        fallback_handler = pooling(raw_request)
        if fallback_handler is None:
            return base(raw_request).create_error_response(
                message="The model does not support Embeddings API")

        logger.warning(
            "Embeddings API will become exclusive to embedding models "
            "in a future release. To return the hidden states directly, "
            "use the Pooling API (`/pooling`) instead.")

        res = await fallback_handler.create_pooling(request, raw_request)
        if isinstance(res, PoolingResponse):
            generator = EmbeddingResponse(
                id=res.id,
                object=res.object,
                created=res.created,
                model=res.model,
                data=[
                    EmbeddingResponseData(
                        index=d.index,
                        embedding=d.data,  # type: ignore
                    ) for d in res.data
                ],
                usage=res.usage,
            )
        else:
            generator = res
    else:
        generator = await handler.create_embedding(request, raw_request)
430

431
432
433
    if isinstance(generator, ErrorResponse):
        return JSONResponse(content=generator.model_dump(),
                            status_code=generator.code)
434
    elif isinstance(generator, EmbeddingResponse):
435
436
        return JSONResponse(content=generator.model_dump())

437
438
    assert_never(generator)

439

440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
@router.post("/pooling")
@with_cancellation
async def create_pooling(request: PoolingRequest, raw_request: Request):
    handler = pooling(raw_request)
    if handler is None:
        return base(raw_request).create_error_response(
            message="The model does not support Pooling API")

    generator = await handler.create_pooling(request, raw_request)
    if isinstance(generator, ErrorResponse):
        return JSONResponse(content=generator.model_dump(),
                            status_code=generator.code)
    elif isinstance(generator, PoolingResponse):
        return JSONResponse(content=generator.model_dump())

    assert_never(generator)


458
@router.post("/score")
459
@with_cancellation
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
async def create_score(request: ScoreRequest, raw_request: Request):
    handler = score(raw_request)
    if handler is None:
        return base(raw_request).create_error_response(
            message="The model does not support Score API")

    generator = await handler.create_score(request, raw_request)
    if isinstance(generator, ErrorResponse):
        return JSONResponse(content=generator.model_dump(),
                            status_code=generator.code)
    elif isinstance(generator, ScoreResponse):
        return JSONResponse(content=generator.model_dump())

    assert_never(generator)


476
@router.post("/v1/score")
477
@with_cancellation
478
479
480
481
482
483
484
485
async def create_score_v1(request: ScoreRequest, raw_request: Request):
    logger.warning(
        "To indicate that Score API is not part of standard OpenAI API, we "
        "have moved it to `/score`. Please update your client accordingly.")

    return await create_score(request, raw_request)


486
487
488
489
490
491
if envs.VLLM_TORCH_PROFILER_DIR:
    logger.warning(
        "Torch Profiler is enabled in the API server. This should ONLY be "
        "used for local development!")

    @router.post("/start_profile")
492
    async def start_profile(raw_request: Request):
493
        logger.info("Starting profiler...")
494
        await engine_client(raw_request).start_profile()
495
496
497
498
        logger.info("Profiler started.")
        return Response(status_code=200)

    @router.post("/stop_profile")
499
    async def stop_profile(raw_request: Request):
500
        logger.info("Stopping profiler...")
501
        await engine_client(raw_request).stop_profile()
502
503
504
505
        logger.info("Profiler stopped.")
        return Response(status_code=200)


506
507
508
509
510
511
if envs.VLLM_ALLOW_RUNTIME_LORA_UPDATING:
    logger.warning(
        "Lora dynamic loading & unloading is enabled in the API server. "
        "This should ONLY be used for local development!")

    @router.post("/v1/load_lora_adapter")
512
513
    async def load_lora_adapter(request: LoadLoraAdapterRequest,
                                raw_request: Request):
514
515
516
517
518
        handler = models(raw_request)
        response = await handler.load_lora_adapter(request)
        if isinstance(response, ErrorResponse):
            return JSONResponse(content=response.model_dump(),
                                status_code=response.code)
519
520
521
522

        return Response(status_code=200, content=response)

    @router.post("/v1/unload_lora_adapter")
523
524
    async def unload_lora_adapter(request: UnloadLoraAdapterRequest,
                                  raw_request: Request):
525
526
527
528
529
        handler = models(raw_request)
        response = await handler.unload_lora_adapter(request)
        if isinstance(response, ErrorResponse):
            return JSONResponse(content=response.model_dump(),
                                status_code=response.code)
530
531
532
533

        return Response(status_code=200, content=response)


534
def build_app(args: Namespace) -> FastAPI:
535
536
537
538
539
540
541
    if args.disable_fastapi_docs:
        app = FastAPI(openapi_url=None,
                      docs_url=None,
                      redoc_url=None,
                      lifespan=lifespan)
    else:
        app = FastAPI(lifespan=lifespan)
Ethan Xu's avatar
Ethan Xu committed
542
543
    app.include_router(router)
    app.root_path = args.root_path
Zhuohan Li's avatar
Zhuohan Li committed
544

545
546
    mount_metrics(app)

Zhuohan Li's avatar
Zhuohan Li committed
547
548
549
550
551
552
553
554
    app.add_middleware(
        CORSMiddleware,
        allow_origins=args.allowed_origins,
        allow_credentials=args.allow_credentials,
        allow_methods=args.allowed_methods,
        allow_headers=args.allowed_headers,
    )

Ethan Xu's avatar
Ethan Xu committed
555
556
    @app.exception_handler(RequestValidationError)
    async def validation_exception_handler(_, exc):
557
558
559
        err = ErrorResponse(message=str(exc),
                            type="BadRequestError",
                            code=HTTPStatus.BAD_REQUEST)
Ethan Xu's avatar
Ethan Xu committed
560
561
562
        return JSONResponse(err.model_dump(),
                            status_code=HTTPStatus.BAD_REQUEST)

563
    if token := envs.VLLM_API_KEY or args.api_key:
564
565
566

        @app.middleware("http")
        async def authentication(request: Request, call_next):
567
568
            if request.method == "OPTIONS":
                return await call_next(request)
569
570
571
572
            url_path = request.url.path
            if app.root_path and url_path.startswith(app.root_path):
                url_path = url_path[len(app.root_path):]
            if not url_path.startswith("/v1"):
573
574
575
576
577
578
                return await call_next(request)
            if request.headers.get("Authorization") != "Bearer " + token:
                return JSONResponse(content={"error": "Unauthorized"},
                                    status_code=401)
            return await call_next(request)

579
580
581
582
583
584
585
586
587
588
589
590
    if args.enable_request_id_headers:
        logger.warning(
            "CAUTION: Enabling X-Request-Id headers in the API Server. "
            "This can harm performance at high QPS.")

        @app.middleware("http")
        async def add_request_id(request: Request, call_next):
            request_id = request.headers.get(
                "X-Request-Id") or uuid.uuid4().hex
            response = await call_next(request)
            response.headers["X-Request-Id"] = request_id
            return response
591

592
593
594
595
596
597
598
599
    for middleware in args.middleware:
        module_path, object_name = middleware.rsplit(".", 1)
        imported = getattr(importlib.import_module(module_path), object_name)
        if inspect.isclass(imported):
            app.add_middleware(imported)
        elif inspect.iscoroutinefunction(imported):
            app.middleware("http")(imported)
        else:
600
601
            raise ValueError(f"Invalid middleware {middleware}. "
                             f"Must be a function or a class.")
602

Ethan Xu's avatar
Ethan Xu committed
603
604
605
    return app


606
def init_app_state(
607
    engine_client: EngineClient,
608
609
    model_config: ModelConfig,
    state: State,
610
    args: Namespace,
611
) -> None:
612
    if args.served_model_name is not None:
613
        served_model_names = args.served_model_name
614
    else:
615
        served_model_names = [args.model]
616

617
618
619
620
621
    if args.disable_log_requests:
        request_logger = None
    else:
        request_logger = RequestLogger(max_log_len=args.max_log_len)

622
623
624
625
626
    base_model_paths = [
        BaseModelPath(name=name, model_path=args.model)
        for name in served_model_names
    ]

627
    state.engine_client = engine_client
628
    state.log_stats = not args.disable_log_stats
Ethan Xu's avatar
Ethan Xu committed
629

630
631
632
    resolved_chat_template = load_chat_template(args.chat_template)
    logger.info("Using supplied chat template:\n%s", resolved_chat_template)

633
634
635
636
637
638
639
    state.openai_serving_models = OpenAIServingModels(
        model_config=model_config,
        base_model_paths=base_model_paths,
        lora_modules=args.lora_modules,
        prompt_adapters=args.prompt_adapters,
    )
    # TODO: The chat template is now broken for lora adapters :(
640
    state.openai_serving_chat = OpenAIServingChat(
641
        engine_client,
642
        model_config,
643
        state.openai_serving_models,
644
645
        args.response_role,
        request_logger=request_logger,
646
647
        chat_template=resolved_chat_template,
        chat_template_content_format=args.chat_template_content_format,
648
        return_tokens_as_token_ids=args.return_tokens_as_token_ids,
649
        enable_auto_tools=args.enable_auto_tool_choice,
650
        tool_parser=args.tool_call_parser,
651
        enable_prompt_tokens_details=args.enable_prompt_tokens_details,
652
    ) if model_config.runner_type == "generate" else None
653
    state.openai_serving_completion = OpenAIServingCompletion(
654
        engine_client,
655
        model_config,
656
        state.openai_serving_models,
657
        request_logger=request_logger,
658
        return_tokens_as_token_ids=args.return_tokens_as_token_ids,
659
    ) if model_config.runner_type == "generate" else None
660
    state.openai_serving_pooling = OpenAIServingPooling(
661
        engine_client,
662
        model_config,
663
        state.openai_serving_models,
664
        request_logger=request_logger,
665
666
        chat_template=resolved_chat_template,
        chat_template_content_format=args.chat_template_content_format,
667
    ) if model_config.runner_type == "pooling" else None
668
669
670
    state.openai_serving_embedding = OpenAIServingEmbedding(
        engine_client,
        model_config,
671
        state.openai_serving_models,
672
673
674
675
        request_logger=request_logger,
        chat_template=resolved_chat_template,
        chat_template_content_format=args.chat_template_content_format,
    ) if model_config.task == "embed" else None
676
677
678
    state.openai_serving_scores = OpenAIServingScores(
        engine_client,
        model_config,
679
        state.openai_serving_models,
680
        request_logger=request_logger
681
    ) if model_config.task == "score" else None
682
    state.openai_serving_tokenization = OpenAIServingTokenization(
683
        engine_client,
684
        model_config,
685
        state.openai_serving_models,
686
        request_logger=request_logger,
687
688
        chat_template=resolved_chat_template,
        chat_template_content_format=args.chat_template_content_format,
689
    )
690
691


692
693
694
695
696
697
698
699
700
701
702
703
def create_server_socket(addr: Tuple[str, int]) -> socket.socket:
    family = socket.AF_INET
    if is_valid_ipv6_address(addr[0]):
        family = socket.AF_INET6

    sock = socket.socket(family=family, type=socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(addr)

    return sock


704
async def run_server(args, **uvicorn_kwargs) -> None:
705
706
707
    logger.info("vLLM API server version %s", VLLM_VERSION)
    logger.info("args: %s", args)

708
709
710
711
712
713
714
715
716
    if args.tool_parser_plugin and len(args.tool_parser_plugin) > 3:
        ToolParserManager.import_tool_parser(args.tool_parser_plugin)

    valide_tool_parses = ToolParserManager.tool_parsers.keys()
    if args.enable_auto_tool_choice \
        and args.tool_call_parser not in valide_tool_parses:
        raise KeyError(f"invalid tool call parser: {args.tool_call_parser} "
                       f"(chose from {{ {','.join(valide_tool_parses)} }})")

717
718
719
    # workaround to make sure that we bind the port before the engine is set up.
    # This avoids race conditions with ray.
    # see https://github.com/vllm-project/vllm/issues/8204
720
721
    sock_addr = (args.host or "", args.port)
    sock = create_server_socket(sock_addr)
722

723
724
725
726
    # workaround to avoid footguns where uvicorn drops requests with too
    # many concurrent requests active
    set_ulimit()

727
728
729
730
731
732
    def signal_handler(*_) -> None:
        # Interrupt server on sigterm while initializing
        raise KeyboardInterrupt("terminated")

    signal.signal(signal.SIGTERM, signal_handler)

733
    async with build_async_engine_client(args) as engine_client:
734
735
        app = build_app(args)

736
737
        model_config = await engine_client.get_model_config()
        init_app_state(engine_client, model_config, app.state, args)
738
739
740
741
742
743
744
745
746
747
748

        shutdown_task = await serve_http(
            app,
            host=args.host,
            port=args.port,
            log_level=args.uvicorn_log_level,
            timeout_keep_alive=TIMEOUT_KEEP_ALIVE,
            ssl_keyfile=args.ssl_keyfile,
            ssl_certfile=args.ssl_certfile,
            ssl_ca_certs=args.ssl_ca_certs,
            ssl_cert_reqs=args.ssl_cert_reqs,
749
750
751
            **uvicorn_kwargs,
        )

752
753
    # NB: Await server shutdown only after the backend context is exited
    await shutdown_task
754

755
756
    sock.close()

Ethan Xu's avatar
Ethan Xu committed
757
758
759
760
761
762
763
764

if __name__ == "__main__":
    # NOTE(simon):
    # This section should be in sync with vllm/scripts.py for CLI entrypoints.
    parser = FlexibleArgumentParser(
        description="vLLM OpenAI-Compatible RESTful API server.")
    parser = make_arg_parser(parser)
    args = parser.parse_args()
765
    validate_parsed_serve_args(args)
766

767
    uvloop.run(run_server(args))