"docs/guides/planner_benchmark/README.md" did not exist on "889ab67e0c9a732b2be76619ea4b6f72684c95f8"
test_vllm.py 19.7 KB
Newer Older
1
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
# SPDX-License-Identifier: Apache-2.0

4
5
6
7
8
9
10
11
"""
Test Execution Times (Last Run: 2025-12-09):
- test_request_cancellation_vllm_aggregated: ~55s (gpu_1)
- test_request_cancellation_vllm_decode_cancel: ~53s (gpu_2)
- test_request_cancellation_vllm_prefill_cancel: ~53s (gpu_2)
- Total: 161.65s (0:02:41)
"""

12
import json
13
14
15
16
17
18
import logging
import os
import shutil

import pytest

19
20
from tests.fault_tolerance.cancellation.utils import (
    DynamoFrontendProcess,
21
22
23
    poll_for_pattern,
    read_streaming_responses,
    send_cancellable_request,
24
25
    verify_frontend_cancellation_metrics,
    verify_runtime_cancellation_metrics,
26
)
Alec's avatar
Alec committed
27
from tests.utils.constants import FAULT_TOLERANCE_MODEL_NAME
28
from tests.utils.managed_process import ManagedProcess
29
from tests.utils.payloads import check_health_generate, check_models_api
30
from tests.utils.port_utils import allocate_port, deallocate_port
31
32
33

logger = logging.getLogger(__name__)

34
pytestmark = [
35
    pytest.mark.fault_tolerance,
36
37
38
    pytest.mark.vllm,
    pytest.mark.e2e,
    pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME),
39
    pytest.mark.parametrize("request_plane", ["nats", "tcp"], indirect=True),
40
41
]

42
43
44
45

class DynamoWorkerProcess(ManagedProcess):
    """Process manager for Dynamo worker with vLLM backend"""

46
47
48
49
    def __init__(
        self,
        request,
        frontend_port: int,
50
        is_prefill: bool | None = None,
51
52
53
54
55
56
    ):
        # Allocate system port for this worker
        system_port = allocate_port(9100)
        self.system_port = system_port
        self.frontend_port = frontend_port

57
58
59
60
61
        command = [
            "python3",
            "-m",
            "dynamo.vllm",
            "--model",
Alec's avatar
Alec committed
62
            FAULT_TOLERANCE_MODEL_NAME,
63
64
65
66
            "--enforce-eager",
            "--gpu-memory-utilization",
            "0.45",
            "--max-model-len",
67
            "16384",
68
69
        ]

70
71
72
        # Configure disaggregation mode, KV transfer, and health checks per worker type
        if is_prefill is True:
            # Prefill worker: disaggregated prefill mode; check own status endpoint only
73
            command.extend(["--disaggregation-mode", "prefill"])
74
75
76
77
78
79
            command.extend(
                [
                    "--kv-transfer-config",
                    '{"kv_connector":"NixlConnector","kv_role":"kv_both"}',
                ]
            )
80
81
82
            health_check_urls = [
                (f"http://localhost:{system_port}/health", self.is_ready)
            ]
83
84
85
86
87
88
89
90
91
92
93
94
95
96
        elif is_prefill is False:
            # Decode worker: disaggregated decode mode; also verify frontend sees the model
            command.extend(["--disaggregation-mode", "decode"])
            command.extend(
                [
                    "--kv-transfer-config",
                    '{"kv_connector":"NixlConnector","kv_role":"kv_both"}',
                ]
            )
            health_check_urls = [
                (f"http://localhost:{system_port}/health", self.is_ready),
                (f"http://localhost:{frontend_port}/v1/models", check_models_api),
                (f"http://localhost:{frontend_port}/health", check_health_generate),
            ]
97
        else:
98
            # Aggregated worker: no disaggregation mode; verify frontend sees the model
99
            health_check_urls = [
100
101
102
                (f"http://localhost:{system_port}/health", self.is_ready),
                (f"http://localhost:{frontend_port}/v1/models", check_models_api),
                (f"http://localhost:{frontend_port}/health", check_health_generate),
103
            ]
104

105
        # Set environment variables
106
        env = os.environ.copy()
107
108
        env["DYN_REQUEST_PLANE"] = request.getfixturevalue("request_plane")

109
        env["DYN_LOG"] = "debug"
110
111
112
113
114
        # Disable canary health check - these tests expect full control over requests
        # sent to the workers where canary health check intermittently sends dummy
        # requests to workers interfering with the test process which may cause
        # intermittent failures
        env["DYN_HEALTH_CHECK_ENABLED"] = "false"
115
        env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]'
116
117
        env["DYN_SYSTEM_PORT"] = str(system_port)
        env["DYN_HTTP_PORT"] = str(frontend_port)
118

119
        # Set KV events config and NIXL side channel port only for prefill worker
120
        # to avoid conflicts with decode worker
121
        if is_prefill is True:
122
123
124
125
126
127
128
129
130
131
132
133
134
            command.extend(
                [
                    "--kv-events-config",
                    json.dumps(
                        {
                            "publisher": "zmq",
                            "topic": "kv-events",
                            "endpoint": "tcp://*:20082",
                            "enable_kv_cache_events": True,
                        }
                    ),
                ]
            )
135
136
137
            env[
                "VLLM_NIXL_SIDE_CHANNEL_PORT"
            ] = "5601"  # TODO: use dynamic port allocation
138

139
        # Set log directory based on worker type
140
141
142
143
144
145
        if is_prefill is True:
            worker_type = "prefill_worker"
        elif is_prefill is False:
            worker_type = "decode_worker"
        else:
            worker_type = "worker"
146
147
148
149
150
151
152
153
154
155
156
157
158
        log_dir = f"{request.node.name}_{worker_type}"

        # Clean up any existing log directory from previous runs
        try:
            shutil.rmtree(log_dir)
            logger.info(f"Cleaned up existing log directory: {log_dir}")
        except FileNotFoundError:
            # Directory doesn't exist, which is fine
            pass

        super().__init__(
            command=command,
            env=env,
159
            health_check_urls=health_check_urls,
160
161
            timeout=300,
            display_output=True,
162
            terminate_all_matching_process_names=False,
163
164
165
166
167
168
169
            # Ensure any orphaned vLLM engine cores or child helpers are cleaned up
            stragglers=[
                "VLLM::EngineCore",
            ],
            straggler_commands=[
                "-m dynamo.vllm",
            ],
170
171
172
173
174
175
176
177
178
            log_dir=log_dir,
        )

        self.is_prefill = is_prefill

    def get_pid(self):
        """Get the PID of the worker process"""
        return self.proc.pid if self.proc else None

179
180
181
182
183
184
185
186
187
188
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Release allocated port when worker exits."""
        try:
            # system_port is always allocated in __init__
            deallocate_port(self.system_port)
        except Exception as e:
            logging.warning(f"Failed to release vLLM worker port: {e}")

        return super().__exit__(exc_type, exc_val, exc_tb)

189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
    def is_ready(self, response) -> bool:
        """Check the health of the worker process"""
        try:
            data = response.json()
            if data.get("status") == "ready":
                worker_type = "Prefill worker" if self.is_prefill else "Worker"
                logger.info(f"{worker_type} status is ready")
                return True
            worker_type = "Prefill worker" if self.is_prefill else "Worker"
            logger.warning(f"{worker_type} status is not ready: {data.get('status')}")
        except ValueError:
            worker_type = "Prefill worker" if self.is_prefill else "Worker"
            logger.warning(f"{worker_type} health response is not valid JSON")
        return False


205
@pytest.mark.timeout(110)  # 3x average
206
@pytest.mark.post_merge
207
@pytest.mark.gpu_1
208
209
210
def test_request_cancellation_vllm_aggregated(
    request, runtime_services_dynamic_ports, predownload_models
):
211
    """
212
    End-to-end test for request cancellation functionality in aggregated mode.
213
214
215

    This test verifies that when a request is cancelled by the client,
    the system properly handles the cancellation and cleans up resources
216
    on the worker side in aggregated (single worker) mode. Tests three scenarios:
217
218
219
    1. Completion request
    2. Chat completion request (non-streaming)
    3. Chat completion request (streaming)
220
221
222
223
224

    Timing (Last Run: 2025-12-09): ~55s total
    - Engine initialization: ~15s
    - Testing 3 scenarios: ~38s (~12s each)
    - Teardown: ~2s
225
226
    """

227
    # Step 1: Start the frontend (allocates its own frontend_port)
228
229
230
    with DynamoFrontendProcess(request) as frontend:
        logger.info("Frontend started successfully")

231
232
        # Step 2: Start a single worker (allocates its own system_port)
        with DynamoWorkerProcess(request, frontend.frontend_port) as worker:
233
234
            logger.info(f"Worker PID: {worker.get_pid()}")

235
            # Step 3: Test request cancellation with polling approach
236
237
238
239
240
241
242
243
244
245
246
            frontend_log_offset, worker_log_offset = 0, 0

            test_scenarios = [
                ("completion", "Completion request cancellation"),
                ("chat_completion", "Chat completion request cancellation"),
                (
                    "chat_completion_stream",
                    "Chat completion stream request cancellation",
                ),
            ]

247
            for idx, (request_type, description) in enumerate(test_scenarios):
248
249
                logger.info(f"Testing {description.lower()}...")

250
                # Send the request (non-blocking)
251
252
253
                cancellable_req = send_cancellable_request(
                    frontend.frontend_port, request_type
                )
254

255
                # Poll for "Decode Request ID" pattern (vLLM v2 pattern)
256
257
                request_id, worker_log_offset = poll_for_pattern(
                    process=worker,
258
                    pattern="Decode Request ID: ",
259
260
                    log_offset=worker_log_offset,
                    match_type="contains",
261
                )
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282

                # For streaming, read 5 responses before cancelling
                if request_type == "chat_completion_stream":
                    read_streaming_responses(cancellable_req, expected_count=5)

                # Now cancel the request
                cancellable_req.cancel()
                logger.info(f"Cancelled request ID: {request_id}")

                # Poll for "Aborted Request ID" with matching ID
                _, worker_log_offset = poll_for_pattern(
                    process=worker,
                    pattern=f"Aborted Request ID: {request_id}",
                    log_offset=worker_log_offset,
                )

                # Verify frontend log has kill message
                _, frontend_log_offset = poll_for_pattern(
                    process=frontend,
                    pattern="issued control message Kill to sender",
                    log_offset=frontend_log_offset,
283
284
285
286
                )

                logger.info(f"{description} detected successfully")

287
288
289
290
291
292
293
294
295
296
297
                # Verify cancellation metrics after each scenario
                verify_frontend_cancellation_metrics(
                    frontend_port=frontend.frontend_port,
                    request_type=request_type,
                    expected_count=1,
                )
                verify_runtime_cancellation_metrics(
                    worker_system_port=worker.system_port,
                    expected_count=idx + 1,
                )

298

299
@pytest.mark.skip(reason="Nightly CI failure: https://linear.app/nvidia/issue/DYN-2606")
300
@pytest.mark.timeout(150)  # 3x average
301
@pytest.mark.nightly
302
@pytest.mark.gpu_2
303
def test_request_cancellation_vllm_decode_cancel(
304
    request, runtime_services_dynamic_ports, set_ucx_tls_no_mm, predownload_models
Alec's avatar
Alec committed
305
):
306
    """
307
    End-to-end test for request cancellation during decode phase.
308

309
    This test verifies that when a request is cancelled by the client during the decode phase,
310
311
    the system properly handles the cancellation and cleans up resources
    on the decode worker side in a disaggregated setup.
312
313
314
315
316

    Timing (Last Run: 2025-12-09): ~53s total (requires 2 GPUs)
    - Engine initialization: ~23s (decode + prefill workers)
    - Testing stream cancellation during decode: ~28s
    - Teardown: ~2s
317
318
    """

319
    # Step 1: Start the frontend (allocates its own frontend_port)
320
321
322
    with DynamoFrontendProcess(request) as frontend:
        logger.info("Frontend started successfully")

323
324
325
326
        # Step 2: Start the prefill worker (allocates its own system_port)
        with DynamoWorkerProcess(
            request, frontend.frontend_port, is_prefill=True
        ) as prefill_worker:
327
328
            logger.info(f"Prefill Worker PID: {prefill_worker.get_pid()}")

329
330
331
332
            # Step 3: Start the decode worker (allocates its own system_port)
            with DynamoWorkerProcess(
                request, frontend.frontend_port, is_prefill=False
            ) as decode_worker:
333
334
                logger.info(f"Decode Worker PID: {decode_worker.get_pid()}")

335
                # Step 4: Test request cancellation for streaming scenario
336
                logger.info(
337
338
339
340
                    "Testing chat completion stream request cancellation in decode worker (decode phase)..."
                )

                # Send streaming request (non-blocking)
341
342
343
                cancellable_req = send_cancellable_request(
                    frontend.frontend_port, "chat_completion_stream"
                )
344

345
                # Poll for "Decode Request ID" pattern in decode worker (vLLM v2 pattern)
346
347
                request_id, decode_log_offset = poll_for_pattern(
                    process=decode_worker,
348
                    pattern="Decode Request ID: ",
349
350
351
                    match_type="contains",
                )

352
                # Verify same request ID reached prefill worker (as "Prefill Request ID")
353
354
                _, prefill_log_offset = poll_for_pattern(
                    process=prefill_worker,
355
                    pattern=f"Prefill Request ID: {request_id}",
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
                )

                # Read 5 streaming responses (decode phase)
                read_streaming_responses(cancellable_req, expected_count=5)

                # Now cancel the request
                cancellable_req.cancel()
                logger.info(f"Cancelled request ID: {request_id}")

                # Poll for "Aborted Request ID" in decode worker
                _, decode_log_offset = poll_for_pattern(
                    process=decode_worker,
                    pattern=f"Aborted Request ID: {request_id}",
                    log_offset=decode_log_offset,
                )

                # Verify frontend log has kill message
                _, frontend_log_offset = poll_for_pattern(
                    process=frontend,
                    pattern="issued control message Kill to sender",
376
377
378
                )

                logger.info(
379
                    "Chat completion stream cancellation in decode phase detected successfully"
380
381
                )

382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
                # Verify cancellation metrics
                verify_frontend_cancellation_metrics(
                    frontend_port=frontend.frontend_port,
                    request_type="chat_completion_stream",
                    expected_count=1,
                )
                verify_runtime_cancellation_metrics(
                    worker_system_port=decode_worker.system_port,
                    expected_count=1,
                )
                verify_runtime_cancellation_metrics(
                    worker_system_port=prefill_worker.system_port,
                    expected_count=0,
                    component="prefill",
                )

398

399
@pytest.mark.timeout(150)  # 3x average
400
@pytest.mark.nightly
401
@pytest.mark.gpu_2
402
def test_request_cancellation_vllm_prefill_cancel(
403
    request, runtime_services_dynamic_ports, set_ucx_tls_no_mm, predownload_models
404
):
405
    """
406
    End-to-end test for request cancellation during prefill phase.
407

408
    This test verifies that when a request is cancelled by the client during the prefill phase,
409
410
    the system properly handles the cancellation and cleans up resources
    on both the decode and prefill workers in a disaggregated setup.
411
412
413
414
415

    Timing (Last Run: 2025-12-09): ~53s total (requires 2 GPUs)
    - Engine initialization: ~23s (decode + prefill workers)
    - Testing cancellation during prefill: ~28s
    - Teardown: ~2s
416
    """
417

418
    # Step 1: Start the frontend (allocates its own frontend_port)
419
420
421
    with DynamoFrontendProcess(request) as frontend:
        logger.info("Frontend started successfully")

422
423
424
425
        # Step 2: Start the prefill worker (allocates its own system_port)
        with DynamoWorkerProcess(
            request, frontend.frontend_port, is_prefill=True
        ) as prefill_worker:
426
427
            logger.info(f"Prefill Worker PID: {prefill_worker.get_pid()}")

428
429
430
431
            # Step 3: Start the decode worker (allocates its own system_port)
            with DynamoWorkerProcess(
                request, frontend.frontend_port, is_prefill=False
            ) as decode_worker:
432
433
                logger.info(f"Decode Worker PID: {decode_worker.get_pid()}")

434
435
436
                # Step 4: Test request cancellation during prefill phase
                # Note: With the new architecture, prefill routing happens in the frontend,
                # so the request goes directly to the prefill worker first
437
                logger.info(
438
                    "Testing completion request cancellation during prefill phase..."
439
440
                )

441
442
                # Send request with long prompt (non-blocking)
                cancellable_req = send_cancellable_request(
443
                    frontend.frontend_port, "completion", use_long_prompt=True
444
                )
445

446
447
448
                # Poll for "Prefill Request ID" pattern in prefill worker (vLLM v2 pattern)
                # With new architecture, prefill is routed by frontend's internal router
                request_id, prefill_log_offset = poll_for_pattern(
449
                    process=prefill_worker,
450
451
                    pattern="Prefill Request ID: ",
                    match_type="contains",
452
453
454
455
                )

                # Cancel during prefill phase
                cancellable_req.cancel()
456
                logger.info(f"Cancelled request ID: {request_id} during prefill")
457

458
                # Poll for "Aborted Prefill Request ID" in prefill worker (where cancellation happens)
459
460
461
462
463
464
465
466
467
468
469
470
                _, prefill_log_offset = poll_for_pattern(
                    process=prefill_worker,
                    pattern=f"Aborted Prefill Request ID: {request_id}",
                    log_offset=prefill_log_offset,
                )

                # Verify frontend log has kill message
                _, frontend_log_offset = poll_for_pattern(
                    process=frontend,
                    pattern="issued control message Kill to sender",
                )

471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
                # Verify decode worker never received the request
                pattern = "Request ID: "
                try:
                    _, decode_log_offset = poll_for_pattern(
                        process=decode_worker,
                        pattern=pattern,
                        max_wait_ms=10,
                        match_type="contains",
                    )
                    pytest.fail(
                        "Decode worker received request cancelled during prefill phase"
                    )
                except AssertionError as e:
                    assert str(e).startswith(
                        f"Failed to find '{pattern}' pattern after 2 iterations "
                    ), f"Unexpected error: {e}"

488
                logger.info(
489
                    "Completion request cancellation during prefill phase detected successfully"
490
                )
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506

                # Verify cancellation metrics
                verify_frontend_cancellation_metrics(
                    frontend_port=frontend.frontend_port,
                    request_type="completion",
                    expected_count=1,
                )
                verify_runtime_cancellation_metrics(
                    worker_system_port=decode_worker.system_port,
                    expected_count=0,
                )
                verify_runtime_cancellation_metrics(
                    worker_system_port=prefill_worker.system_port,
                    expected_count=1,
                    component="prefill",
                )