test_vllm.py 17.9 KB
Newer Older
1
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
# SPDX-License-Identifier: Apache-2.0

4
"""
5
6
7
8
9
Test Execution Times (Last Run: 2026-01-09):
- test_request_migration_vllm_aggregated: ~95s
- test_request_migration_vllm_prefill: N/A
- test_request_migration_vllm_kv_transfer: N/A
- test_request_migration_vllm_decode: ~115s
10
11
"""

12
import json
13
14
15
16
17
18
19
import logging
import os
import shutil

import pytest

from tests.utils.constants import FAULT_TOLERANCE_MODEL_NAME
20
from tests.utils.managed_process import ManagedProcess
21
from tests.utils.payloads import check_models_api
22
from tests.utils.port_utils import allocate_port, deallocate_port
23

24
25
# Customized utils for migration tests
from .utils import DynamoFrontendProcess, run_migration_test
26
27
28

logger = logging.getLogger(__name__)

29
pytestmark = [
30
    pytest.mark.fault_tolerance,
31
32
33
34
    pytest.mark.vllm,
    pytest.mark.gpu_1,
    pytest.mark.e2e,
    pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME),
35
36
37
    pytest.mark.parametrize(
        "migration_limit", [3, 0], ids=["migration_enabled", "migration_disabled"]
    ),
38
39
40
41
42
43
44
45
    pytest.mark.parametrize(
        "migration_max_seq_len",
        [
            pytest.param(None, id="max_seq_len_disabled"),
            pytest.param(1_000_000, id="max_seq_len_not_exceeded"),
            pytest.param(1, id="max_seq_len_exceeded"),
        ],
    ),
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
    pytest.mark.parametrize(
        "immediate_kill", [True, False], ids=["worker_failure", "graceful_shutdown"]
    ),
    pytest.mark.parametrize(
        "request_api",
        [
            pytest.param("chat"),
            pytest.param(
                "completion",
                marks=pytest.mark.skip(reason="Behavior unverified yet"),
            ),
        ],
    ),
    pytest.mark.parametrize(
        "stream",
        [
            pytest.param(True, id="stream"),
            pytest.param(
                False,
                id="unary",
                marks=pytest.mark.skip(reason="Behavior unverified yet"),
            ),
        ],
    ),
70
    pytest.mark.parametrize("request_plane", ["nats", "tcp"], indirect=True),
71
72
]

73
74

class DynamoWorkerProcess(ManagedProcess):
75
76
77
78
79
80
81
82
83
84
85
    """Process manager for Dynamo worker with vLLM backend

    Supports both aggregated mode (single worker) and disaggregated mode
    (separate prefill and decode workers).

    Args:
        request: pytest request fixture
        worker_id: Unique identifier for the worker (e.g., "worker1", "prefill1")
        frontend_port: Port where the frontend is running
        is_prefill: None for aggregated mode, True for prefill worker, False for decode worker
    """
86

87
88
89
90
91
    def __init__(
        self,
        request,
        worker_id: str,
        frontend_port: int,
92
        is_prefill: bool | None = None,
93
    ):
94
        self.worker_id = worker_id
95
        self.system_port = allocate_port(9100)
96
97
98
99
100
101
102
103
104

        command = [
            "python3",
            "-m",
            "dynamo.vllm",
            "--model",
            FAULT_TOLERANCE_MODEL_NAME,
            "--enforce-eager",
            "--max-model-len",
105
106
107
108
109
110
111
            "8192",  # input + output tokens
            "--max-num-seqs",
            "1",  # number of requests at a time
            "--num-gpu-blocks-override",  # limit total KV cache allocation
            "512",  # 8192 tokens x 1 context / 16 tokens per block = 512 blocks
            "--gpu-memory-utilization",
            "0.15",  # avoid assertion error on vLLM available memory checks
112
        ]
113
        if is_prefill is True:
114
            command.extend(["--disaggregation-mode", "prefill"])
115
        elif is_prefill is False:
116
            command.extend(["--disaggregation-mode", "decode"])
117

118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
        # Aggregated mode and prefill workers publish KV events
        if is_prefill is not False:
            kv_event_port = f"2008{worker_id[-1]}"  # TODO: use dynamic port allocation
            command.extend(
                [
                    "--kv-events-config",
                    json.dumps(
                        {
                            "publisher": "zmq",
                            "topic": "kv-events",
                            "endpoint": f"tcp://*:{kv_event_port}",
                            "enable_kv_cache_events": True,
                        }
                    ),
                ]
            )

135
        # Set environment variables
136
        env = os.environ.copy()
137
138
        env["DYN_REQUEST_PLANE"] = request.getfixturevalue("request_plane")

139
        # All workers need unique NIXL side channel ports for KV transfer
140
141
142
        env[
            "VLLM_NIXL_SIDE_CHANNEL_PORT"
        ] = f"560{worker_id[-1]}"  # TODO: use dynamic port allocation
143

144
        env["DYN_LOG"] = "debug"
145
146
147
148
149
        # Disable canary health check - these tests expect full control over requests
        # sent to the workers where canary health check intermittently sends dummy
        # requests to workers interfering with the test process which may cause
        # intermittent failures
        env["DYN_HEALTH_CHECK_ENABLED"] = "false"
150
        env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]'
151
        env["DYN_SYSTEM_PORT"] = str(self.system_port)
152
        env["DYN_HTTP_PORT"] = str(frontend_port)
153

154
155
156
        # Disable backend shutdown grace period for all migration tests
        env["DYN_GRACEFUL_SHUTDOWN_GRACE_PERIOD_SECS"] = "0"

157
158
159
160
161
162
163
164
165
166
        # Configure health check based on worker type
        health_check_urls = [
            (f"http://localhost:{self.system_port}/health", self.is_ready)
        ]
        if is_prefill is None or is_prefill is False:
            # aggregated or decode
            health_check_urls.append(
                (f"http://localhost:{frontend_port}/v1/models", check_models_api)
            )

167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
        # TODO: Have the managed process take a command name explicitly to distinguish
        #       between processes started with the same command.
        log_dir = f"{request.node.name}_{worker_id}"

        # Clean up any existing log directory from previous runs
        try:
            shutil.rmtree(log_dir)
            logger.info(f"Cleaned up existing log directory: {log_dir}")
        except FileNotFoundError:
            # Directory doesn't exist, which is fine
            pass

        super().__init__(
            command=command,
            env=env,
182
            health_check_urls=health_check_urls,
183
184
            timeout=300,
            display_output=True,
185
            terminate_all_matching_process_names=False,
186
187
188
            stragglers=["VLLM::EngineCore"],
            straggler_commands=["-m dynamo.vllm"],
            log_dir=log_dir,
189
            display_name=worker_id,
190
191
        )

192
193
194
195
196
197
198
199
200
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Release allocated port when worker exits."""
        try:
            # system_port is always allocated in __init__
            deallocate_port(self.system_port)
        except Exception as e:
            logging.warning(f"Failed to release vLLM worker port: {e}")

        return super().__exit__(exc_type, exc_val, exc_tb)
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216

    def is_ready(self, response) -> bool:
        """Check the health of the worker process"""
        try:
            data = response.json()
            if data.get("status") == "ready":
                logger.info(f"{self.worker_id} status is ready")
                return True
            logger.warning(
                f"{self.worker_id} status is not ready: {data.get('status')}"
            )
        except ValueError:
            logger.warning(f"{self.worker_id} health response is not valid JSON")
        return False


217
@pytest.mark.timeout(290)  # 3x average
218
@pytest.mark.post_merge
219
220
221
222
223
224
def test_request_migration_vllm_aggregated(
    request,
    runtime_services_dynamic_ports,
    set_ucx_tls_no_mm,
    predownload_models,
    migration_limit,
225
    migration_max_seq_len,
226
227
228
    immediate_kill,
    request_api,
    stream,
229
230
):
    """
231
    End-to-end test for aggregated worker request migration.
232

233
234
235
    Parameters:
        immediate_kill: True for abrupt kill (SIGKILL), False for graceful shutdown (SIGTERM)
        migration_limit: > 0 to verify migration succeeds, 0 to verify request fails
236
        migration_max_seq_len: Max sequence length for migration state tracking
237
238
        request_api: "chat" for chat completion API, "completion" for completion API
        stream: True for streaming, False for non-streaming
239
240
    """

241
    # Step 1: Start the frontend
242
243
244
245
246
    with DynamoFrontendProcess(
        request,
        migration_limit=migration_limit,
        migration_max_seq_len=migration_max_seq_len,
    ) as frontend:
247
248
        logger.info("Frontend started successfully")

249
        # Step 2: Start 2 workers
250
        with DynamoWorkerProcess(request, "worker1", frontend.frontend_port) as worker1:
251
252
            logger.info(f"Worker 1 PID: {worker1.get_pid()}")

253
            with DynamoWorkerProcess(
254
255
256
                request,
                "worker2",
                frontend.frontend_port,
257
            ) as worker2:
258
259
                logger.info(f"Worker 2 PID: {worker2.get_pid()}")

260
261
262
263
264
265
266
                # Step 3: Run migration test
                run_migration_test(
                    frontend,
                    worker1,
                    worker2,
                    receiving_pattern="Decode Request ID: ",
                    migration_limit=migration_limit,
267
                    migration_max_seq_len=migration_max_seq_len,
268
269
270
                    immediate_kill=immediate_kill,
                    use_chat_completion=(request_api == "chat"),
                    stream=stream,
271
272
273
                )


274
@pytest.mark.skip(reason="Prefill migration not yet supported")
275
@pytest.mark.timeout(350)  # 3x average
276
@pytest.mark.nightly
277
278
279
280
281
282
def test_request_migration_vllm_prefill(
    request,
    runtime_services_dynamic_ports,
    set_ucx_tls_no_mm,
    predownload_models,
    migration_limit,
283
    migration_max_seq_len,
284
285
286
    immediate_kill,
    request_api,
    stream,
287
288
):
    """
289
290
291
292
293
294
295
296
297
    End-to-end test for prefill worker request migration in disaggregated mode.

    Setup: 1 decode worker + 2 prefill workers

    Parameters:
        immediate_kill: True for abrupt kill (SIGKILL), False for graceful shutdown (SIGTERM)
        migration_limit: > 0 to verify migration succeeds, 0 to verify request fails
        request_api: "chat" for chat completion API, "completion" for completion API
        stream: True for streaming, False for non-streaming
298
299
    """

300
    # Step 1: Start the frontend
301
302
303
304
305
    with DynamoFrontendProcess(
        request,
        migration_limit=migration_limit,
        migration_max_seq_len=migration_max_seq_len,
    ) as frontend:
306
307
        logger.info("Frontend started successfully")

308
309
310
311
312
313
314
315
316
317
        # Step 2: Start decode worker first (required for prefill workers to connect)
        with DynamoWorkerProcess(
            request,
            "worker0",
            frontend.frontend_port,
            is_prefill=False,
        ) as decode_worker:
            logger.info(f"Decode Worker PID: {decode_worker.get_pid()}")

            # Step 3: Start 2 prefill workers
318
            with DynamoWorkerProcess(
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
                request,
                "worker1",
                frontend.frontend_port,
                is_prefill=True,
            ) as prefill1:
                logger.info(f"Prefill Worker 1 PID: {prefill1.get_pid()}")

                with DynamoWorkerProcess(
                    request,
                    "worker2",
                    frontend.frontend_port,
                    is_prefill=True,
                ) as prefill2:
                    logger.info(f"Prefill Worker 2 PID: {prefill2.get_pid()}")

                    # Step 4: Run migration test
                    run_migration_test(
                        frontend,
                        prefill1,
                        prefill2,
                        receiving_pattern="Prefill Request ID: ",
                        migration_limit=migration_limit,
341
                        migration_max_seq_len=migration_max_seq_len,
342
343
344
345
346
                        immediate_kill=immediate_kill,
                        use_chat_completion=(request_api == "chat"),
                        stream=stream,
                        use_long_prompt=True,
                    )
347

348

349
@pytest.mark.skip(
350
351
352
353
354
355
356
357
358
    reason=(
        "Migration reuses the same request_id for vLLM, but the prefill worker's "
        "KV cache still holds the request due to delay_free_blocks in disaggregated mode. "
        "With chat completions API, prefix cache hits on chat template tokens cause "
        "an assertion error in vLLM's KV cache manager (save_new_computed_blocks expects "
        "no new computed blocks for existing requests)."
    ),
)
@pytest.mark.timeout(350)  # 3x average
359
@pytest.mark.nightly
360
361
362
363
364
365
def test_request_migration_vllm_kv_transfer(
    request,
    runtime_services_dynamic_ports,
    set_ucx_tls_no_mm,
    predownload_models,
    migration_limit,
366
    migration_max_seq_len,
367
368
369
    immediate_kill,
    request_api,
    stream,
370
371
):
    """
372
    End-to-end test for request migration during KV transfer in disaggregated mode.
373

374
    Setup: 1 prefill worker + 2 decode workers
375

376
377
378
379
380
    Parameters:
        immediate_kill: True for abrupt kill (SIGKILL), False for graceful shutdown (SIGTERM)
        migration_limit: > 0 to verify migration succeeds, 0 to verify request fails
        request_api: "chat" for chat completion API, "completion" for completion API
        stream: True for streaming, False for non-streaming
381
382
    """

383
    # Step 1: Start the frontend
384
385
386
387
388
    with DynamoFrontendProcess(
        request,
        migration_limit=migration_limit,
        migration_max_seq_len=migration_max_seq_len,
    ) as frontend:
389
390
        logger.info("Frontend started successfully")

391
        # Step 2: Start prefill worker first
392
        with DynamoWorkerProcess(
393
394
395
396
397
398
399
400
            request,
            "worker0",
            frontend.frontend_port,
            is_prefill=True,
        ) as prefill_worker:
            logger.info(f"Prefill Worker PID: {prefill_worker.get_pid()}")

            # Step 3: Start 2 decode workers
401
            with DynamoWorkerProcess(
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
                request,
                "worker1",
                frontend.frontend_port,
                is_prefill=False,
            ) as decode1:
                logger.info(f"Decode Worker 1 PID: {decode1.get_pid()}")

                with DynamoWorkerProcess(
                    request,
                    "worker2",
                    frontend.frontend_port,
                    is_prefill=False,
                ) as decode2:
                    logger.info(f"Decode Worker 2 PID: {decode2.get_pid()}")

                    # Step 4: Run migration test
                    run_migration_test(
                        frontend,
                        decode1,
                        decode2,
                        receiving_pattern="Decode Request ID: ",
                        migration_limit=migration_limit,
424
                        migration_max_seq_len=migration_max_seq_len,
425
426
427
428
                        immediate_kill=immediate_kill,
                        use_chat_completion=(request_api == "chat"),
                        stream=stream,
                        use_long_prompt=True,
429
430
431
                    )


432
@pytest.mark.skip(
433
434
435
436
437
438
439
440
441
    reason=(
        "Migration reuses the same request_id for vLLM, but the prefill worker's "
        "KV cache still holds the request due to delay_free_blocks in disaggregated mode. "
        "With chat completions API, prefix cache hits on chat template tokens cause "
        "an assertion error in vLLM's KV cache manager (save_new_computed_blocks expects "
        "no new computed blocks for existing requests)."
    ),
)
@pytest.mark.timeout(350)  # 3x average
442
@pytest.mark.nightly
443
444
445
446
447
448
def test_request_migration_vllm_decode(
    request,
    runtime_services_dynamic_ports,
    set_ucx_tls_no_mm,
    predownload_models,
    migration_limit,
449
    migration_max_seq_len,
450
451
452
    immediate_kill,
    request_api,
    stream,
453
454
):
    """
455
    End-to-end test for decode worker request migration in disaggregated mode.
456

457
    Setup: 1 prefill worker + 2 decode workers
458

459
460
461
462
463
    Parameters:
        immediate_kill: True for abrupt kill (SIGKILL), False for graceful shutdown (SIGTERM)
        migration_limit: > 0 to verify migration succeeds, 0 to verify request fails
        request_api: "chat" for chat completion API, "completion" for completion API
        stream: True for streaming, False for non-streaming
464
    """
465
466
467
468
    if not stream:
        pytest.skip(
            "Decode test requires streaming to wait for response before stopping worker"
        )
469

470
    # Step 1: Start the frontend
471
472
473
474
475
    with DynamoFrontendProcess(
        request,
        migration_limit=migration_limit,
        migration_max_seq_len=migration_max_seq_len,
    ) as frontend:
476
477
        logger.info("Frontend started successfully")

478
        # Step 2: Start prefill worker first
479
        with DynamoWorkerProcess(
480
481
482
483
484
485
486
487
            request,
            "worker0",
            frontend.frontend_port,
            is_prefill=True,
        ) as prefill_worker:
            logger.info(f"Prefill Worker PID: {prefill_worker.get_pid()}")

            # Step 3: Start 2 decode workers
488
            with DynamoWorkerProcess(
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
                request,
                "worker1",
                frontend.frontend_port,
                is_prefill=False,
            ) as decode1:
                logger.info(f"Decode Worker 1 PID: {decode1.get_pid()}")

                with DynamoWorkerProcess(
                    request,
                    "worker2",
                    frontend.frontend_port,
                    is_prefill=False,
                ) as decode2:
                    logger.info(f"Decode Worker 2 PID: {decode2.get_pid()}")

                    # Step 4: Run migration test
                    run_migration_test(
                        frontend,
                        decode1,
                        decode2,
                        receiving_pattern="Decode Request ID: ",
                        migration_limit=migration_limit,
511
                        migration_max_seq_len=migration_max_seq_len,
512
513
514
515
                        immediate_kill=immediate_kill,
                        use_chat_completion=(request_api == "chat"),
                        stream=stream,
                        wait_for_new_response_before_stop=True,
516
                    )