"csrc/quantization/fp8/fp8_marlin.cu" did not exist on "7038e8b80303bf6128acbe508dec910183a1be56"
test_router_e2e_with_vllm.py 14.9 KB
Newer Older
1
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
# SPDX-License-Identifier: Apache-2.0
3
4
5
6
7

# Timing notes (measured locally):
# - GPU-1 subset (`-m "gpu_1 and not gpu_2"`): 130.43s total for 3 tests.
# These tests load a real model and can be slow/flaky when GPU resources are contended,
# so we set explicit pytest timeouts to fail fast on hangs (see per-test markers below).
8
import json
9
10
11
12
13
14
import logging
import os
from typing import Any, Dict, Optional

import pytest

15
16
17
18
19
from tests.router.e2e_harness import (
    ManagedEngineProcessMixin,
    run_basic_router_test,
    run_indexers_sync_test,
    run_router_decisions_test,
20
)
21
from tests.router.helper import generate_random_suffix
22
from tests.utils.constants import DefaultPort
23
from tests.utils.managed_process import ManagedProcess
24
from tests.utils.port_utils import allocate_ports, deallocate_ports
25
26
27
28

logger = logging.getLogger(__name__)

MODEL_NAME = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
29
30
31

pytestmark = [
    pytest.mark.e2e,
32
    pytest.mark.router,
33
34
35
    pytest.mark.vllm,
    pytest.mark.model(MODEL_NAME),
]
36
37
38
SPEEDUP_RATIO = 10.0
BLOCK_SIZE = 16

39
40
41
42
43
44
45
46
47
48
# Shared vLLM configuration for all tests
# gpu_memory_utilization limits actual VRAM allocation (required for multi-worker on same GPU)
VLLM_ARGS: Dict[str, Any] = {
    "block_size": BLOCK_SIZE,
    "model": MODEL_NAME,
    "gpu_memory_utilization": 0.4,  # Limit VRAM allocation per worker
    "max_model_len": 1024,  # Limit context length to reduce KV cache size
    "enforce_eager": True,  # Disable CUDA graphs for faster startup & lower memory
}

49
50
51
52
53
54
55
VLLM_ARGS_NO_BLOCK_SIZE: Dict[str, Any] = {
    "model": MODEL_NAME,
    "gpu_memory_utilization": 0.4,  # Limit VRAM allocation per worker
    "max_model_len": 1024,  # Limit context length to reduce KV cache size
    "enforce_eager": True,  # Disable CUDA graphs for faster startup & lower memory
}

56

57
class VLLMProcess(ManagedEngineProcessMixin):
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
    """Manages vLLM workers using dynamo.vllm (HTTP API + KV events).

    This is a drop-in replacement for MockerProcess that uses real vLLM workers.
    The key difference: dynamo.vllm automatically handles:
    - HTTP API serving
    - KV cache event publishing (ZMQ → NATS bridge)
    - Integration with dynamo.frontend router
    """

    def __init__(
        self,
        request,
        vllm_args: Optional[Dict[str, Any]] = None,
        num_workers: int = 2,
        single_gpu: bool = False,
        data_parallel_size: Optional[int] = None,
74
75
        request_plane: str = "tcp",
        store_backend: str = "etcd",
76
        durable_kv_events: bool = False,
77
78
79
80
81
82
83
    ):
        """Initialize vLLM workers with dynamo integration.

        Args:
            request: pytest request fixture for log directory
            vllm_args: Configuration dict with keys:
                - model: Model name/path (default: TinyLlama-1.1B)
84
85
                - gpu_memory_utilization: Fraction of GPU memory to allocate (optional)
                - num_gpu_blocks_override: Cap on number of KV cache blocks (optional)
86
                - max_model_len: Maximum sequence length (optional)
87
                - enforce_eager: Disable CUDA graphs (default: False)
88
            num_workers: Number of vLLM worker processes
89
            single_gpu: If True, all workers share GPU 0
90
            data_parallel_size: If set, enables data parallelism with this many ranks (num_workers must equal data_parallel_size)
91
92
            request_plane: Request plane to use ("nats", "tcp", or "http"). Defaults to "tcp".
            store_backend: Storage backend to use ("etcd" or "file"). Defaults to "etcd".
93
            durable_kv_events: If True, use JetStream for durable KV events. Defaults to False (NATS Core mode).
94
95
96
97
98
99
100
        """
        # Generate unique namespace for isolation
        namespace_suffix = generate_random_suffix()
        self.namespace = f"test-namespace-{namespace_suffix}"
        self.component_name = "backend"
        self.endpoint = f"dyn://{self.namespace}.{self.component_name}.generate"
        self.num_workers = num_workers
101
        self.data_parallel_size = data_parallel_size
102
        self.worker_processes = []
103
        self.store_backend = store_backend
104

105
106
107
108
109
110
111
112
113
114
115
        # Dynamically allocate unique system, KV event, and NIXL side-channel
        # ports (one of each per worker) to avoid conflicts in parallel test runs.
        self._system_ports = allocate_ports(num_workers, DefaultPort.SYSTEM1.value)
        self._kv_event_ports = allocate_ports(num_workers, DefaultPort.SYSTEM1.value)
        self._nixl_ports = allocate_ports(num_workers, DefaultPort.SYSTEM1.value)
        request.addfinalizer(
            lambda: deallocate_ports(
                self._system_ports + self._kv_event_ports + self._nixl_ports
            )
        )

116
117
118
119
        if vllm_args is None:
            vllm_args = {}

        model = vllm_args.get("model", MODEL_NAME)
120
121
        gpu_memory_utilization = vllm_args.get("gpu_memory_utilization")
        num_gpu_blocks_override = vllm_args.get("num_gpu_blocks_override")
122
        max_model_len = vllm_args.get("max_model_len")
123
        enforce_eager = vllm_args.get("enforce_eager", False)
124
125
126
127
128
129
130
131

        self.model_name = model

        # Create vLLM worker processes
        # Matches test.sh behavior:
        # - When data_parallel_size is set, launch one process per DP rank
        # - Each process gets --data-parallel-rank and --data-parallel-size
        # - Each process runs on its own GPU via CUDA_VISIBLE_DEVICES
132
        # - --kv-transfer-config enables KV cache transfer between ranks
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151

        for worker_idx in range(num_workers):
            # Calculate GPU device for this process
            if single_gpu:
                # Force all processes to GPU 0 (for single-GPU testing)
                gpu_device = "0"
            elif data_parallel_size is not None:
                # Worker sees dp_rank GPUs (each DP rank gets its own GPU)
                worker_start_gpu = worker_idx * data_parallel_size
                gpu_device = ",".join(
                    str(i)
                    for i in range(
                        worker_start_gpu, worker_start_gpu + data_parallel_size
                    )
                )
            else:
                # No DP; worker sees one GPU
                gpu_device = str(worker_idx)

152
153
154
155
            command = ["python3", "-m", "dynamo.vllm", "--model", model]

            if "block_size" in vllm_args:
                command.extend(["--block-size", str(vllm_args["block_size"])])
156

157
158
159
160
161
162
163
164
165
166
            # Disable CUDA graphs for faster startup & lower memory
            if enforce_eager:
                command.append("--enforce-eager")

            # Limit VRAM allocation (required for multi-worker on same GPU)
            if gpu_memory_utilization is not None:
                command.extend(
                    ["--gpu-memory-utilization", str(gpu_memory_utilization)]
                )

167
168
169
170
            # Add optional max_model_len if specified
            if max_model_len is not None:
                command.extend(["--max-model-len", str(max_model_len)])

171
172
173
174
175
176
            # Cap block count for predictable KV cache behavior
            if num_gpu_blocks_override is not None:
                command.extend(
                    ["--num-gpu-blocks-override", str(num_gpu_blocks_override)]
                )

177
178
179
180
181
182
183
184
185
            if data_parallel_size is not None:
                # Add DP configuration for external load balancing
                # See: https://docs.vllm.ai/en/v0.10.0/serving/data_parallel_deployment.html#external-load-balancing
                command.extend(
                    [
                        "--data-parallel-size",
                        str(data_parallel_size),
                        # "--data-parallel-address", "127.0.0.1",  # Required for DP coordination
                        # "--data-parallel-rpc-port", "13345",  # RPC port for DP coordination
186
                        # "--kv-transfer-config", '{"kv_connector":"NixlConnector","kv_role":"kv_both"}',  # Required for KV transfer between DP ranks
187
188
189
                    ]
                )

190
191
192
193
            # Use --durable-kv-events to enable JetStream mode (local indexer disabled)
            if durable_kv_events:
                command.append("--durable-kv-events")

194
195
196
197
198
            # Ports are dynamically allocated for xdist-safe parallel execution.
            system_port = self._system_ports[worker_idx]
            kv_event_port = self._kv_event_ports[worker_idx]
            nixl_port = self._nixl_ports[worker_idx]

199
200
201
202
203
204
205
206
207
208
209
            # Pass KV events config explicitly via CLI
            kv_events_cfg = json.dumps(
                {
                    "publisher": "zmq",
                    "topic": "kv-events",
                    "endpoint": f"tcp://*:{kv_event_port}",
                    "enable_kv_cache_events": True,
                }
            )
            command.extend(["--kv-events-config", kv_events_cfg])

210
            env = os.environ.copy()  # Copy parent environment
211
212
213
214
            env_vars = {
                "CUDA_VISIBLE_DEVICES": gpu_device,
                "DYN_NAMESPACE": self.namespace,
                "DYN_REQUEST_PLANE": request_plane,
215
216
                "DYN_SYSTEM_PORT": str(system_port),
                "VLLM_NIXL_SIDE_CHANNEL_PORT": str(nixl_port),
217
218
219
220
221
222
223
224
                "PYTHONHASHSEED": "0",  # for deterministic event id's
            }

            # Add DYN_FILE_KV if using file storage backend
            if self.store_backend == "file" and "DYN_FILE_KV" in os.environ:
                env_vars["DYN_FILE_KV"] = os.environ["DYN_FILE_KV"]

            env.update(env_vars)
225
226
227
228
229
230
231
232
233
234

            # Create managed process for the worker
            process = ManagedProcess(
                command=command,
                env=env,
                timeout=120,  # Allow time for model loading
                display_output=True,
                health_check_ports=[],
                health_check_urls=[],
                log_dir=request.node.name,
235
                terminate_all_matching_process_names=False,
236
237
238
239
240
            )
            self.worker_processes.append(process)
            if data_parallel_size is not None:
                logger.info(
                    f"Created {data_parallel_size} DP ranks per worker on GPU(s) {gpu_device} "
241
                    f"(gpu_mem={gpu_memory_utilization}, system_port={system_port}) "
242
243
244
245
246
                    f"with endpoint: {self.endpoint}"
                )
            else:
                logger.info(
                    f"Created vLLM worker {worker_idx} on GPU {gpu_device} "
247
                    f"(gpu_mem={gpu_memory_utilization}, system_port={system_port}) "
248
249
250
                    f"with endpoint: {self.endpoint}"
                )

251
252
253
    process_name = "vLLM worker"
    cleanup_name = "vLLM worker resources"
    init_delay_reason = "initialize NIXL before starting next worker"
254
255


256
@pytest.mark.pre_merge
257
@pytest.mark.gpu_1
258
@pytest.mark.timeout(150)  # ~3x average (~43s/test), rounded up
259
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
260
def test_vllm_kv_router_basic(
261
262
263
264
265
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
266
):
267
268
269
270
271
272
273
    run_basic_router_test(
        engine_process_cls=VLLMProcess,
        engine_args_name="vllm_args",
        engine_args=VLLM_ARGS,
        num_workers=2,
        single_gpu=True,
        request=request,
274
        request_plane=request_plane,
275
        block_size=BLOCK_SIZE,
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
        model_name=MODEL_NAME,
    )


@pytest.mark.pre_merge
@pytest.mark.gpu_1
@pytest.mark.timeout(150)  # ~3x average (~43s/test), rounded up
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
def test_vllm_kv_router_without_block_size_specified_in_vllm_args(
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
):
    run_basic_router_test(
        engine_process_cls=VLLMProcess,
        engine_args_name="vllm_args",
        engine_args=VLLM_ARGS_NO_BLOCK_SIZE,
        num_workers=2,
        single_gpu=True,
        request=request,
        request_plane=request_plane,
        block_size=BLOCK_SIZE,
300
301
        model_name=MODEL_NAME,
    )
302
303


304
@pytest.mark.pre_merge
305
@pytest.mark.gpu_1
306
@pytest.mark.timeout(150)  # ~3x average (~43s/test), rounded up
307
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
308
def test_router_decisions_vllm_multiple_workers(
309
310
311
312
313
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
314
):
315
316
317
318
319
    run_router_decisions_test(
        engine_process_cls=VLLMProcess,
        engine_args_name="vllm_args",
        engine_args=VLLM_ARGS,
        request=request,
320
        request_plane=request_plane,
321
322
323
324
325
326
327
        model_name=MODEL_NAME,
        block_size=BLOCK_SIZE,
        component_name="backend",
        num_workers=2,
        single_gpu=True,
        test_dp_rank=False,
    )
328
329
330


@pytest.mark.gpu_2
331
@pytest.mark.nightly
332
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
333
@pytest.mark.timeout(600)  # 10 min max (multi-GPU + DP startup variance)
334
def test_router_decisions_vllm_dp(
335
336
337
338
339
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
340
):
341
342
343
344
345
346
347
    """Validate KV cache prefix reuse with vLLM by sending progressive requests with overlapping prefixes.
    Same flow as test_router_decisions_vllm_multiple_workers; force first request to (worker_id, dp_rank=1).
    Dump events from router and verify:
        * All but one (worker_id, dp_rank) should have no events (due to prefix reuse)
        * The (worker_id, dp_rank) with events should have exactly 4 events (one per request)
        * All events should be on the forced (worker_id, dp_rank=1) (verifying forced routing and prefix reuse)
    """
348
349
350
351
352
    run_router_decisions_test(
        engine_process_cls=VLLMProcess,
        engine_args_name="vllm_args",
        engine_args=VLLM_ARGS,
        request=request,
353
        request_plane=request_plane,
354
355
356
357
358
359
360
361
        model_name=MODEL_NAME,
        block_size=BLOCK_SIZE,
        component_name="backend",
        num_workers=1,
        single_gpu=False,
        test_dp_rank=True,
        extra_process_kwargs={"data_parallel_size": 2},
    )
362

363
364
365

@pytest.mark.pre_merge
@pytest.mark.gpu_1
366
@pytest.mark.timeout(150)  # ~3x average (~43s/test), rounded up
367
@pytest.mark.parametrize(
368
    "store_backend,durable_kv_events,request_plane",
369
    [
370
        ("etcd", False, "tcp"),
371
    ],
372
373
    ids=["nats_core"],
    indirect=["durable_kv_events", "request_plane"],
374
)
375
def test_vllm_indexers_sync(
376
377
378
379
380
381
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    file_storage_backend,
    set_ucx_tls_no_mm,
    store_backend,
382
    durable_kv_events,
383
    request_plane,
384
):
385
386
387
388
389
390
    run_indexers_sync_test(
        engine_process_cls=VLLMProcess,
        engine_args_name="vllm_args",
        engine_args=VLLM_ARGS,
        request=request,
        runtime_services_dynamic_ports=runtime_services_dynamic_ports,
391
392
        store_backend=store_backend,
        durable_kv_events=durable_kv_events,
393
394
395
396
397
        request_plane=request_plane,
        block_size=BLOCK_SIZE,
        model_name=MODEL_NAME,
        num_workers=2,
    )