test_router_e2e_with_vllm.py 14.1 KB
Newer Older
1
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
# SPDX-License-Identifier: Apache-2.0
3
4
5
6
7

# Timing notes (measured locally):
# - GPU-1 subset (`-m "gpu_1 and not gpu_2"`): 130.43s total for 3 tests.
# These tests load a real model and can be slow/flaky when GPU resources are contended,
# so we set explicit pytest timeouts to fail fast on hangs (see per-test markers below).
8
import json
9
10
11
12
13
14
import logging
import os
from typing import Any, Dict, Optional

import pytest

15
16
17
18
19
from tests.router.e2e_harness import (
    ManagedEngineProcessMixin,
    run_basic_router_test,
    run_indexers_sync_test,
    run_router_decisions_test,
20
)
21
from tests.router.helper import generate_random_suffix
22
from tests.utils.constants import DefaultPort
23
from tests.utils.managed_process import ManagedProcess
24
from tests.utils.port_utils import allocate_ports, deallocate_ports
25
26
27
28

logger = logging.getLogger(__name__)

MODEL_NAME = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
29
30
31

pytestmark = [
    pytest.mark.e2e,
32
    pytest.mark.router,
33
34
35
    pytest.mark.vllm,
    pytest.mark.model(MODEL_NAME),
]
36
37
38
SPEEDUP_RATIO = 10.0
BLOCK_SIZE = 16

39
40
41
42
43
44
45
46
47
48
# Shared vLLM configuration for all tests
# gpu_memory_utilization limits actual VRAM allocation (required for multi-worker on same GPU)
VLLM_ARGS: Dict[str, Any] = {
    "block_size": BLOCK_SIZE,
    "model": MODEL_NAME,
    "gpu_memory_utilization": 0.4,  # Limit VRAM allocation per worker
    "max_model_len": 1024,  # Limit context length to reduce KV cache size
    "enforce_eager": True,  # Disable CUDA graphs for faster startup & lower memory
}

49

50
class VLLMProcess(ManagedEngineProcessMixin):
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
    """Manages vLLM workers using dynamo.vllm (HTTP API + KV events).

    This is a drop-in replacement for MockerProcess that uses real vLLM workers.
    The key difference: dynamo.vllm automatically handles:
    - HTTP API serving
    - KV cache event publishing (ZMQ → NATS bridge)
    - Integration with dynamo.frontend router
    """

    def __init__(
        self,
        request,
        vllm_args: Optional[Dict[str, Any]] = None,
        num_workers: int = 2,
        single_gpu: bool = False,
        data_parallel_size: Optional[int] = None,
67
68
        request_plane: str = "tcp",
        store_backend: str = "etcd",
69
        durable_kv_events: bool = False,
70
71
72
73
74
75
76
77
    ):
        """Initialize vLLM workers with dynamo integration.

        Args:
            request: pytest request fixture for log directory
            vllm_args: Configuration dict with keys:
                - block_size: KV cache block size (default: 16)
                - model: Model name/path (default: TinyLlama-1.1B)
78
79
                - gpu_memory_utilization: Fraction of GPU memory to allocate (optional)
                - num_gpu_blocks_override: Cap on number of KV cache blocks (optional)
80
                - max_model_len: Maximum sequence length (optional)
81
                - enforce_eager: Disable CUDA graphs (default: False)
82
            num_workers: Number of vLLM worker processes
83
            single_gpu: If True, all workers share GPU 0
84
            data_parallel_size: If set, enables data parallelism with this many ranks (num_workers must equal data_parallel_size)
85
86
            request_plane: Request plane to use ("nats", "tcp", or "http"). Defaults to "tcp".
            store_backend: Storage backend to use ("etcd" or "file"). Defaults to "etcd".
87
            durable_kv_events: If True, use JetStream for durable KV events. Defaults to False (NATS Core mode).
88
89
90
91
92
93
94
        """
        # Generate unique namespace for isolation
        namespace_suffix = generate_random_suffix()
        self.namespace = f"test-namespace-{namespace_suffix}"
        self.component_name = "backend"
        self.endpoint = f"dyn://{self.namespace}.{self.component_name}.generate"
        self.num_workers = num_workers
95
        self.data_parallel_size = data_parallel_size
96
        self.worker_processes = []
97
        self.store_backend = store_backend
98

99
100
101
102
103
104
105
106
107
108
109
        # Dynamically allocate unique system, KV event, and NIXL side-channel
        # ports (one of each per worker) to avoid conflicts in parallel test runs.
        self._system_ports = allocate_ports(num_workers, DefaultPort.SYSTEM1.value)
        self._kv_event_ports = allocate_ports(num_workers, DefaultPort.SYSTEM1.value)
        self._nixl_ports = allocate_ports(num_workers, DefaultPort.SYSTEM1.value)
        request.addfinalizer(
            lambda: deallocate_ports(
                self._system_ports + self._kv_event_ports + self._nixl_ports
            )
        )

110
111
112
113
114
        if vllm_args is None:
            vllm_args = {}

        block_size = vllm_args.get("block_size", BLOCK_SIZE)
        model = vllm_args.get("model", MODEL_NAME)
115
116
        gpu_memory_utilization = vllm_args.get("gpu_memory_utilization")
        num_gpu_blocks_override = vllm_args.get("num_gpu_blocks_override")
117
        max_model_len = vllm_args.get("max_model_len")
118
        enforce_eager = vllm_args.get("enforce_eager", False)
119
120
121
122
123
124
125
126

        self.model_name = model

        # Create vLLM worker processes
        # Matches test.sh behavior:
        # - When data_parallel_size is set, launch one process per DP rank
        # - Each process gets --data-parallel-rank and --data-parallel-size
        # - Each process runs on its own GPU via CUDA_VISIBLE_DEVICES
127
        # - --kv-transfer-config enables KV cache transfer between ranks
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156

        for worker_idx in range(num_workers):
            # Calculate GPU device for this process
            if single_gpu:
                # Force all processes to GPU 0 (for single-GPU testing)
                gpu_device = "0"
            elif data_parallel_size is not None:
                # Worker sees dp_rank GPUs (each DP rank gets its own GPU)
                worker_start_gpu = worker_idx * data_parallel_size
                gpu_device = ",".join(
                    str(i)
                    for i in range(
                        worker_start_gpu, worker_start_gpu + data_parallel_size
                    )
                )
            else:
                # No DP; worker sees one GPU
                gpu_device = str(worker_idx)

            command = [
                "python3",
                "-m",
                "dynamo.vllm",
                "--model",
                model,
                "--block-size",
                str(block_size),
            ]

157
158
159
160
161
162
163
164
165
166
            # Disable CUDA graphs for faster startup & lower memory
            if enforce_eager:
                command.append("--enforce-eager")

            # Limit VRAM allocation (required for multi-worker on same GPU)
            if gpu_memory_utilization is not None:
                command.extend(
                    ["--gpu-memory-utilization", str(gpu_memory_utilization)]
                )

167
168
169
170
            # Add optional max_model_len if specified
            if max_model_len is not None:
                command.extend(["--max-model-len", str(max_model_len)])

171
172
173
174
175
176
            # Cap block count for predictable KV cache behavior
            if num_gpu_blocks_override is not None:
                command.extend(
                    ["--num-gpu-blocks-override", str(num_gpu_blocks_override)]
                )

177
178
179
180
181
182
183
184
185
            if data_parallel_size is not None:
                # Add DP configuration for external load balancing
                # See: https://docs.vllm.ai/en/v0.10.0/serving/data_parallel_deployment.html#external-load-balancing
                command.extend(
                    [
                        "--data-parallel-size",
                        str(data_parallel_size),
                        # "--data-parallel-address", "127.0.0.1",  # Required for DP coordination
                        # "--data-parallel-rpc-port", "13345",  # RPC port for DP coordination
186
                        # "--kv-transfer-config", '{"kv_connector":"NixlConnector","kv_role":"kv_both"}',  # Required for KV transfer between DP ranks
187
188
189
                    ]
                )

190
191
192
193
            # Use --durable-kv-events to enable JetStream mode (local indexer disabled)
            if durable_kv_events:
                command.append("--durable-kv-events")

194
195
196
197
198
            # Ports are dynamically allocated for xdist-safe parallel execution.
            system_port = self._system_ports[worker_idx]
            kv_event_port = self._kv_event_ports[worker_idx]
            nixl_port = self._nixl_ports[worker_idx]

199
200
201
202
203
204
205
206
207
208
209
            # Pass KV events config explicitly via CLI
            kv_events_cfg = json.dumps(
                {
                    "publisher": "zmq",
                    "topic": "kv-events",
                    "endpoint": f"tcp://*:{kv_event_port}",
                    "enable_kv_cache_events": True,
                }
            )
            command.extend(["--kv-events-config", kv_events_cfg])

210
            env = os.environ.copy()  # Copy parent environment
211
212
213
214
            env_vars = {
                "CUDA_VISIBLE_DEVICES": gpu_device,
                "DYN_NAMESPACE": self.namespace,
                "DYN_REQUEST_PLANE": request_plane,
215
216
                "DYN_SYSTEM_PORT": str(system_port),
                "VLLM_NIXL_SIDE_CHANNEL_PORT": str(nixl_port),
217
218
219
220
221
222
223
224
                "PYTHONHASHSEED": "0",  # for deterministic event id's
            }

            # Add DYN_FILE_KV if using file storage backend
            if self.store_backend == "file" and "DYN_FILE_KV" in os.environ:
                env_vars["DYN_FILE_KV"] = os.environ["DYN_FILE_KV"]

            env.update(env_vars)
225
226
227
228
229
230
231
232
233
234

            # Create managed process for the worker
            process = ManagedProcess(
                command=command,
                env=env,
                timeout=120,  # Allow time for model loading
                display_output=True,
                health_check_ports=[],
                health_check_urls=[],
                log_dir=request.node.name,
235
                terminate_all_matching_process_names=False,
236
237
238
239
240
            )
            self.worker_processes.append(process)
            if data_parallel_size is not None:
                logger.info(
                    f"Created {data_parallel_size} DP ranks per worker on GPU(s) {gpu_device} "
241
                    f"(gpu_mem={gpu_memory_utilization}, system_port={system_port}) "
242
243
244
245
246
                    f"with endpoint: {self.endpoint}"
                )
            else:
                logger.info(
                    f"Created vLLM worker {worker_idx} on GPU {gpu_device} "
247
                    f"(gpu_mem={gpu_memory_utilization}, system_port={system_port}) "
248
249
250
                    f"with endpoint: {self.endpoint}"
                )

251
252
253
    process_name = "vLLM worker"
    cleanup_name = "vLLM worker resources"
    init_delay_reason = "initialize NIXL before starting next worker"
254
255


256
@pytest.mark.pre_merge
257
@pytest.mark.gpu_1
258
@pytest.mark.timeout(150)  # ~3x average (~43s/test), rounded up
259
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
260
def test_vllm_kv_router_basic(
261
262
263
264
265
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
266
):
267
268
269
270
271
272
273
    run_basic_router_test(
        engine_process_cls=VLLMProcess,
        engine_args_name="vllm_args",
        engine_args=VLLM_ARGS,
        num_workers=2,
        single_gpu=True,
        request=request,
274
        request_plane=request_plane,
275
276
277
        block_size=BLOCK_SIZE,
        model_name=MODEL_NAME,
    )
278
279


280
@pytest.mark.pre_merge
281
@pytest.mark.gpu_1
282
@pytest.mark.timeout(150)  # ~3x average (~43s/test), rounded up
283
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
284
def test_router_decisions_vllm_multiple_workers(
285
286
287
288
289
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
290
):
291
292
293
294
295
    run_router_decisions_test(
        engine_process_cls=VLLMProcess,
        engine_args_name="vllm_args",
        engine_args=VLLM_ARGS,
        request=request,
296
        request_plane=request_plane,
297
298
299
300
301
302
303
        model_name=MODEL_NAME,
        block_size=BLOCK_SIZE,
        component_name="backend",
        num_workers=2,
        single_gpu=True,
        test_dp_rank=False,
    )
304
305
306


@pytest.mark.gpu_2
307
@pytest.mark.nightly
308
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
309
@pytest.mark.timeout(600)  # 10 min max (multi-GPU + DP startup variance)
310
def test_router_decisions_vllm_dp(
311
312
313
314
315
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
316
):
317
318
319
320
321
322
323
    """Validate KV cache prefix reuse with vLLM by sending progressive requests with overlapping prefixes.
    Same flow as test_router_decisions_vllm_multiple_workers; force first request to (worker_id, dp_rank=1).
    Dump events from router and verify:
        * All but one (worker_id, dp_rank) should have no events (due to prefix reuse)
        * The (worker_id, dp_rank) with events should have exactly 4 events (one per request)
        * All events should be on the forced (worker_id, dp_rank=1) (verifying forced routing and prefix reuse)
    """
324
325
326
327
328
    run_router_decisions_test(
        engine_process_cls=VLLMProcess,
        engine_args_name="vllm_args",
        engine_args=VLLM_ARGS,
        request=request,
329
        request_plane=request_plane,
330
331
332
333
334
335
336
337
        model_name=MODEL_NAME,
        block_size=BLOCK_SIZE,
        component_name="backend",
        num_workers=1,
        single_gpu=False,
        test_dp_rank=True,
        extra_process_kwargs={"data_parallel_size": 2},
    )
338

339
340
341

@pytest.mark.pre_merge
@pytest.mark.gpu_1
342
@pytest.mark.timeout(150)  # ~3x average (~43s/test), rounded up
343
@pytest.mark.parametrize(
344
    "store_backend,durable_kv_events,request_plane",
345
    [
346
        ("etcd", False, "tcp"),
347
    ],
348
349
    ids=["nats_core"],
    indirect=["durable_kv_events", "request_plane"],
350
)
351
def test_vllm_indexers_sync(
352
353
354
355
356
357
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    file_storage_backend,
    set_ucx_tls_no_mm,
    store_backend,
358
    durable_kv_events,
359
    request_plane,
360
):
361
362
363
364
365
366
    run_indexers_sync_test(
        engine_process_cls=VLLMProcess,
        engine_args_name="vllm_args",
        engine_args=VLLM_ARGS,
        request=request,
        runtime_services_dynamic_ports=runtime_services_dynamic_ports,
367
368
        store_backend=store_backend,
        durable_kv_events=durable_kv_events,
369
370
371
372
373
        request_plane=request_plane,
        block_size=BLOCK_SIZE,
        model_name=MODEL_NAME,
        num_workers=2,
    )