ready_checker.py 2.55 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""Utilities for checking endpoint readiness."""

import asyncio
import time

import aiohttp
from tqdm.asyncio import tqdm

11
12
from vllm.logger import init_logger

13
from .endpoint_request_func import RequestFunc, RequestFuncInput, RequestFuncOutput
14

15
16
logger = init_logger(__name__)

17
18

async def wait_for_endpoint(
19
    request_func: RequestFunc,
20
    test_input: RequestFuncInput,
21
    session: aiohttp.ClientSession,
22
23
24
25
26
    timeout_seconds: int = 600,
    retry_interval: int = 5,
) -> RequestFuncOutput:
    """
    Wait for an endpoint to become available before starting benchmarks.
27

28
29
30
31
32
    Args:
        request_func: The async request function to call
        test_input: The RequestFuncInput to test with
        timeout_seconds: Maximum time to wait in seconds (default: 10 minutes)
        retry_interval: Time between retries in seconds (default: 5 seconds)
33

34
35
    Returns:
        RequestFuncOutput: The successful response
36

37
38
39
40
41
42
    Raises:
        ValueError: If the endpoint doesn't become available within the timeout
    """
    deadline = time.perf_counter() + timeout_seconds
    output = RequestFuncOutput(success=False)
    print(f"Waiting for endpoint to become up in {timeout_seconds} seconds")
43

44
    with tqdm(
45
        total=timeout_seconds,
46
47
48
        bar_format="{desc} |{bar}| {elapsed} elapsed, {remaining} remaining",
        unit="s",
    ) as pbar:
49
        while True:
50
51
52
53
54
55
56
57
58
59
60
61
            # update progress bar
            remaining = deadline - time.perf_counter()
            elapsed = timeout_seconds - remaining
            update_amount = min(elapsed - pbar.n, timeout_seconds - pbar.n)
            pbar.update(update_amount)
            pbar.refresh()
            if remaining <= 0:
                pbar.close()
                break

            # ping the endpoint using request_func
            try:
62
                output = await request_func(
63
64
                    request_func_input=test_input, session=session
                )
65
66
67
                if output.success:
                    pbar.close()
                    return output
68
                else:
69
70
                    err_last_line = str(output.error).rstrip().rsplit("\n", 1)[-1]
                    logger.warning("Endpoint is not ready. Error='%s'", err_last_line)
71
72
            except aiohttp.ClientConnectorError:
                pass
73

74
75
76
77
            # retry after a delay
            sleep_duration = min(retry_interval, remaining)
            if sleep_duration > 0:
                await asyncio.sleep(sleep_duration)
78

79
    return output