"tests/v1/spec_decode/test_acceptance_length.py" did not exist on "09194b90a5353bf51cabfeaa0ec93f139c2c4ac3"
startup.py 9.59 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""Benchmark the cold and warm startup time of vLLM models.

This script measures total startup time (including model loading, compilation,
and cache operations) for both cold and warm scenarios:
- Cold startup: Fresh start with no caches (temporary cache directories)
- Warm startup: Using cached compilation and model info
"""

import argparse
import json
import multiprocessing
import os
import shutil
import tempfile
import time
from contextlib import contextmanager
19
from typing import Any, NamedTuple
20
21
22
23
24
25
26
27
28
29

import numpy as np
from tqdm import tqdm

from vllm.benchmarks.lib.utils import (
    convert_to_pytorch_benchmark_format,
    write_to_json,
)
from vllm.engine.arg_utils import EngineArgs

30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
PERCENTAGES = [10, 25, 50, 75, 90, 99]


class MetricDesc(NamedTuple):
    """Descriptor for a metric to collect from each iteration."""

    iter_key: str  # key in the iteration result dict
    suffix: str  # result key suffix, e.g. "startup", "compilation"
    display_name: str


class MetricStats(NamedTuple):
    """Aggregated statistics for a single benchmark metric."""

    key: str  # e.g. "cold_startup", "warm_encoder_compilation"
    display_name: str
    values: list[float]
    avg: float
    percentiles: dict[int, float]


_BASE_METRICS = [
    MetricDesc("total_startup_time", "startup", "Startup time"),
    MetricDesc("compilation_time", "compilation", "Compilation time"),
]
_ENCODER_METRIC = MetricDesc(
    "encoder_compilation_time",
    "encoder_compilation",
    "Encoder compilation time",
)


def _compute_metric(
    phase: str,
    desc: MetricDesc,
    iterations: list[dict[str, float]],
) -> MetricStats:
    values = [m[desc.iter_key] for m in iterations]
    arr = np.array(values)
    return MetricStats(
        key=f"{phase}_{desc.suffix}",
        display_name=desc.display_name,
        values=values,
        avg=float(np.mean(arr)),
        percentiles=dict(zip(PERCENTAGES, np.percentile(arr, PERCENTAGES).tolist())),
    )


def _collect_phase_metrics(
    phase: str,
    iterations: list[dict[str, float]],
    has_encoder: bool,
) -> list[MetricStats]:
    metrics = [_compute_metric(phase, desc, iterations) for desc in _BASE_METRICS]
    if has_encoder:
        metrics.append(_compute_metric(phase, _ENCODER_METRIC, iterations))
    return metrics


def _print_phase(phase_name: str, metrics: list[MetricStats]) -> None:
    print(f"\n{phase_name}:")
    for m in metrics:
        print(f"Avg {m.display_name.lower()}: {m.avg:.2f} seconds")
    for m in metrics:
        print(f"{m.display_name} percentiles:")
        for pct, val in m.percentiles.items():
            print(f"  {pct}%: {val:.2f} seconds")


def _metric_to_json(m: MetricStats) -> dict[str, Any]:
    return {
        f"avg_{m.key}_time": m.avg,
        f"{m.key}_times": m.values,
        f"{m.key}_percentiles": m.percentiles,
    }

106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132

@contextmanager
def cold_startup():
    """
    Context manager to measure cold startup time:
    1. Uses a temporary directory for vLLM cache to avoid any pollution
       between cold startup iterations.
    2. Uses inductor's fresh_cache to clear torch.compile caches.
    """
    from torch._inductor.utils import fresh_cache

    # Use temporary directory for caching to avoid any pollution between cold startups
    original_cache_root = os.environ.get("VLLM_CACHE_ROOT")
    temp_cache_dir = tempfile.mkdtemp(prefix="vllm_startup_bench_cold_")
    try:
        os.environ["VLLM_CACHE_ROOT"] = temp_cache_dir
        with fresh_cache():
            yield
    finally:
        # Clean up temporary cache directory
        shutil.rmtree(temp_cache_dir, ignore_errors=True)
        if original_cache_root:
            os.environ["VLLM_CACHE_ROOT"] = original_cache_root
        else:
            os.environ.pop("VLLM_CACHE_ROOT", None)


133
def run_startup_in_subprocess(engine_args, result_queue):
134
135
136
137
138
139
140
141
142
143
144
    """
    Run LLM startup in a subprocess and return timing metrics via a queue.
    This ensures complete isolation between iterations.
    """
    try:
        # Import inside the subprocess to avoid issues with forking
        from vllm import LLM

        # Measure total startup time
        start_time = time.perf_counter()

145
        llm = LLM.from_engine_args(engine_args)
146
147
148
149
150

        total_startup_time = time.perf_counter() - start_time

        # Extract compilation time if available
        compilation_time = 0.0
151
        encoder_compilation_time = 0.0
152
153
154
155
156
157
158
        if hasattr(llm.llm_engine, "vllm_config"):
            vllm_config = llm.llm_engine.vllm_config
            if (
                hasattr(vllm_config, "compilation_config")
                and vllm_config.compilation_config is not None
            ):
                compilation_time = vllm_config.compilation_config.compilation_time
159
160
161
                encoder_compilation_time = (
                    vllm_config.compilation_config.encoder_compilation_time
                )
162
163
164
165
166

        result_queue.put(
            {
                "total_startup_time": total_startup_time,
                "compilation_time": compilation_time,
167
                "encoder_compilation_time": encoder_compilation_time,
168
169
170
171
172
173
174
175
176
            }
        )

    except Exception as e:
        result_queue.put(None)
        result_queue.put(str(e))


def save_to_pytorch_benchmark_format(
177
    args: argparse.Namespace, metrics: list[MetricStats]
178
179
) -> None:
    base_name = os.path.splitext(args.output_json)[0]
180
181
182
183
184
185
186
187
    for m in metrics:
        records = convert_to_pytorch_benchmark_format(
            args=args,
            metrics={f"avg_{m.key}_time": [m.avg]},
            extra_info={
                f"{m.key}_times": m.values,
                f"{m.key}_percentiles": m.percentiles,
            },
188
        )
189
190
        if records:
            write_to_json(f"{base_name}.{m.key}.pytorch.json", records)
191
192
193
194
195
196


def add_cli_args(parser: argparse.ArgumentParser):
    parser.add_argument(
        "--num-iters-cold",
        type=int,
197
        default=3,
198
199
200
201
202
        help="Number of cold startup iterations.",
    )
    parser.add_argument(
        "--num-iters-warmup",
        type=int,
203
        default=1,
204
205
206
207
208
        help="Number of warmup iterations before benchmarking warm startups.",
    )
    parser.add_argument(
        "--num-iters-warm",
        type=int,
209
        default=3,
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
        help="Number of warm startup iterations.",
    )
    parser.add_argument(
        "--output-json",
        type=str,
        default=None,
        help="Path to save the startup time results in JSON format.",
    )

    parser = EngineArgs.add_cli_args(parser)
    return parser


def main(args: argparse.Namespace):
    # Set multiprocessing start method to 'spawn' for clean process isolation
    # This ensures each subprocess starts fresh without inheriting state
    multiprocessing.set_start_method("spawn", force=True)

    engine_args = EngineArgs.from_cli_args(args)

    def create_llm_and_measure_startup():
        """
        Create LLM instance in a subprocess and measure startup time.
        Returns timing metrics, using subprocess for complete isolation.
        """

        # Create a queue for inter-process communication
        result_queue = multiprocessing.Queue()
        process = multiprocessing.Process(
            target=run_startup_in_subprocess,
            args=(
241
                engine_args,
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
                result_queue,
            ),
        )
        process.start()
        process.join()

        if not result_queue.empty():
            result = result_queue.get()
            if result is None:
                if not result_queue.empty():
                    error_msg = result_queue.get()
                    raise RuntimeError(f"Subprocess failed: {error_msg}")
                else:
                    raise RuntimeError("Subprocess failed with unknown error")
            return result
        else:
            raise RuntimeError("Subprocess did not return a result")

    os.environ["VLLM_ENABLE_V1_MULTIPROCESSING"] = "0"
    print("Setting VLLM_ENABLE_V1_MULTIPROCESSING=0 to collect startup metrics.\n")

263
    # Collect cold startup iterations
264
    print("Measuring cold startup time...\n")
265
    cold_iterations = []
266
267
    for i in tqdm(range(args.num_iters_cold), desc="Cold startup iterations"):
        with cold_startup():
268
            cold_iterations.append(create_llm_and_measure_startup())
269
270
271
272
273
274

    # Warmup for warm startup
    print("\nWarming up for warm startup measurement...\n")
    for _ in tqdm(range(args.num_iters_warmup), desc="Warmup iterations"):
        create_llm_and_measure_startup()

275
    # Collect warm startup iterations
276
    print("\nMeasuring warm startup time...\n")
277
    warm_iterations = []
278
    for i in tqdm(range(args.num_iters_warm), desc="Warm startup iterations"):
279
        warm_iterations.append(create_llm_and_measure_startup())
280

281
282
283
284
285
286
287
288
289
290
    # Determine if encoder compilation occurred in any iteration
    has_encoder = any(
        m["encoder_compilation_time"] > 0 for m in cold_iterations + warm_iterations
    )

    cold_metrics = _collect_phase_metrics("cold", cold_iterations, has_encoder)
    warm_metrics = _collect_phase_metrics("warm", warm_iterations, has_encoder)
    all_metrics = cold_metrics + warm_metrics

    # Print results
291
292
293
    print("\n" + "=" * 60)
    print("STARTUP TIME BENCHMARK RESULTS")
    print("=" * 60)
294
295
    _print_phase("COLD STARTUP", cold_metrics)
    _print_phase("WARM STARTUP", warm_metrics)
296
297
298
299
    print("=" * 60)

    # Output JSON results if specified
    if args.output_json:
300
301
302
        results: dict[str, Any] = {}
        for m in all_metrics:
            results.update(_metric_to_json(m))
303
304
        with open(args.output_json, "w") as f:
            json.dump(results, f, indent=4)
305
        save_to_pytorch_benchmark_format(args, all_metrics)