Unverified Commit 59535682 authored by Jacky's avatar Jacky Committed by GitHub
Browse files

test: Add request migration graceful shutdown E2E test (#3585)


Signed-off-by: default avatarJacky <18255193+kthui@users.noreply.github.com>
parent 8c8922b0
# Fault Tolerance Tests # Fault Tolerance Tests
This directory contains end-to-end tests for Dynamo's fault tolerance capabilities. ## Migration Tests
## Tests
### `test_request_migration.py`
Tests worker fault tolerance with migration support using the `test_request_migration_vllm` function. This test:
0. Downloads the DeepSeek-R1-Distill-Llama-8B model from HuggingFace if not already cached
1. Starts a Dynamo frontend using `python -m dynamo.frontend` with round-robin routing
2. Starts 2 workers sequentially using `python3 -m dynamo.vllm` with specific configuration:
- Model: `deepseek-ai/DeepSeek-R1-Distill-Llama-8B`
- `--enforce-eager`, `--gpu-memory-utilization 0.45`
- `--max-model-len 8192`, `--migration-limit 3`
3. Waits for both workers to be fully ready (health check returns "ready" status)
4. Sends a test request ("Who are you?", 100 tokens) to determine which worker handles requests
5. Determines primary/backup worker roles based on round-robin routing and log analysis
6. Sends a long completion request ("Tell me a long long long story about yourself?", 8000 tokens) in a separate thread
7. Waits 0.5 seconds, then kills the primary worker using SIGKILL process group termination
8. Verifies the request completes successfully despite the worker failure (with 240s timeout)
9. Checks that the frontend logs contain "Stream disconnected... recreating stream..." indicating migration occurred
### `test_request_cancellation.py`
Tests request cancellation functionality across multiple API endpoints and deployment configurations. Contains three test functions:
#### `test_request_cancellation_vllm`
Tests basic request cancellation with a single worker:
0. Downloads the DeepSeek-R1-Distill-Llama-8B model from HuggingFace if not already cached
1. Starts a Dynamo frontend using `python -m dynamo.frontend` with debug logging enabled
2. Starts a single worker using `python3 -m dynamo.vllm` with specific configuration:
- Model: `deepseek-ai/DeepSeek-R1-Distill-Llama-8B`
- `--enforce-eager`, `--gpu-memory-utilization 0.45`, `--max-model-len 8192`, `--migration-limit 3`
- Debug logging enabled on port 8081
3. Tests request cancellation across three scenarios:
- **Completion API**: `/v1/completions` endpoint cancellation
- **Chat Completion API (non-streaming)**: `/v1/chat/completions` endpoint cancellation
- **Chat Completion API (streaming)**: `/v1/chat/completions` with streaming cancellation
4. For each scenario:
- Sends a long request with 1-second timeout to trigger cancellation
- Validates that cancellation messages appear in both frontend and worker logs
- Uses incremental log offset tracking to avoid false positives from previous tests
5. Checks for specific cancellation patterns:
- Frontend log: "issued control message Kill to sender"
- Worker log: "Aborted Request ID: <request_id>" matching the "New Request ID: <request_id>"
#### `test_request_cancellation_vllm_decode`
Tests request cancellation during disaggregated decode phase:
0. Downloads the DeepSeek-R1-Distill-Llama-8B model from HuggingFace if not already cached
1. Starts a Dynamo frontend using `python -m dynamo.frontend` with debug logging enabled
2. Starts a prefill worker using `python3 -m dynamo.vllm --is-prefill-worker` on port 8082
3. Starts a decode worker using `python3 -m dynamo.vllm` on port 8081
4. Tests completion request cancellation in the disaggregated setup
5. Validates cancellation messages appear in prefill worker, decode worker, and frontend logs
6. Checks for specific patterns:
- Frontend log: "issued control message Kill to sender"
- Decode worker log: "Aborted Request ID: <request_id>"
- Prefill worker log: "New Prefill Request ID: <request_id>"
#### `test_request_cancellation_vllm_prefill`
Tests request cancellation during disaggregated prefill phase:
- (Skipped until request cancellation can cancel before receiving the first response)
## Prerequisites
- vLLM backend installed
- NATS and etcd services running (provided by `runtime_services` fixture)
- Access to DeepSeek-R1-Distill-Llama-8B model (automatically downloaded from HuggingFace)
- Sufficient GPU memory
## Running the Tests
To run the fault tolerance tests:
The migration directory contains tests for worker fault tolerance with migration support.
### Test Matrix
| Test | Shutdown Method | Migration Enabled | Expected Result | Verification |
|------|----------------|-------------------|-----------------|--------------|
| `test_request_migration_vllm_worker_failure` | SIGKILL (immediate) | Yes (default) | Request succeeds | "Stream disconnected... recreating stream..." in logs |
| `test_request_migration_vllm_graceful_shutdown` | SIGTERM (10s timeout) | Yes (default) | Request succeeds | "Stream disconnected... recreating stream..." in logs |
| `test_no_request_migration_vllm_worker_failure` | SIGKILL (immediate) | No (migration_limit=0) | Request fails (500) | "Migration limit exhausted" in logs |
| `test_no_request_migration_vllm_graceful_shutdown` | SIGTERM (10s timeout) | No (migration_limit=0) | Request fails (500) | "Migration limit exhausted" in logs |
### Common Test Flow
All migration tests follow this pattern:
1. Start a Dynamo frontend with round-robin routing
2. Start 2 vLLM workers sequentially
3. Send a long completion request (max_tokens=8192) in a separate daemon thread
4. Use parallel polling to determine which worker received the request (checks for "New Request ID:" in logs)
5. Terminate the worker processing the request (method varies by test)
6. Validate the request outcome (success or failure based on migration setting)
7. Verify migration behavior in frontend logs
**Run examples:**
```bash ```bash
# Run all fault tolerance tests # With migration enabled
pytest -m "e2e and vllm" /workspace/tests/fault_tolerance pytest tests/fault_tolerance/migration/test_vllm.py::test_request_migration_vllm_worker_failure -v -s
pytest tests/fault_tolerance/migration/test_vllm.py::test_request_migration_vllm_graceful_shutdown -v -s
# With migration disabled
pytest tests/fault_tolerance/migration/test_vllm.py::test_no_request_migration_vllm_worker_failure -v -s
pytest tests/fault_tolerance/migration/test_vllm.py::test_no_request_migration_vllm_graceful_shutdown -v -s
```
## Cancellation Tests
# Run specific test functions with debug logging The cancellation directory contains tests for request cancellation functionality across multiple
pytest /workspace/tests/fault_tolerance/test_request_migration.py::test_request_migration_vllm -v -s API endpoints, backends, and deployment configurations.
pytest /workspace/tests/fault_tolerance/test_request_cancellation.py::test_request_cancellation_vllm -v -s
pytest /workspace/tests/fault_tolerance/test_request_cancellation.py::test_request_cancellation_vllm_decode -v -s ### Test Overview by Backend
#### vLLM Cancellation Tests
| Test | Mode | Cancellation Phase | Request Type | Setup |
|------|------|-------------------|--------------|-------|
| `test_request_cancellation_vllm_aggregated` | Aggregated | During generation | 3 scenarios: completion, chat, streaming chat | 1 worker |
| `test_request_cancellation_vllm_decode_cancel` | Disaggregated | Remote decode | Streaming chat (5 responses read) | Prefill + Decode workers |
| `test_request_cancellation_vllm_remote_prefill_cancel` | Disaggregated | Remote prefill | Completion (long prompt) | Prefill + Decode workers |
**Run examples:**
```bash
pytest tests/fault_tolerance/cancellation/test_vllm.py::test_request_cancellation_vllm_aggregated -v -s
pytest tests/fault_tolerance/cancellation/test_vllm.py::test_request_cancellation_vllm_decode_cancel -v -s
pytest tests/fault_tolerance/cancellation/test_vllm.py::test_request_cancellation_vllm_remote_prefill_cancel -v -s
``` ```
## Test Markers #### TRT-LLM Cancellation Tests
- `@pytest.mark.e2e`: End-to-end test | Test | Mode | Strategy | Cancellation Phase | Request Type | Setup |
- `@pytest.mark.vllm`: Requires vLLM backend |------|------|----------|-------------------|--------------|-------|
- `@pytest.mark.gpu_1`: Requires single GPU access | `test_request_cancellation_trtllm_aggregated` | Aggregated | N/A | During generation | 3 scenarios: completion, chat, streaming chat | 1 worker (prefill_and_decode) |
- `@pytest.mark.slow`: Known to be slow (due to model loading and inference) | `test_request_cancellation_trtllm_decode_first_decode_cancel` | Disaggregated | Decode-first | Remote decode | Streaming chat (5 responses read) | Prefill + Decode workers |
| `test_request_cancellation_trtllm_decode_first_remote_prefill_cancel` | Disaggregated | Decode-first | Remote prefill | Completion (long prompt) | Prefill + Decode workers |
| `test_request_cancellation_trtllm_prefill_first_prefill_cancel` | Disaggregated | Prefill-first | Local prefill | Completion (long prompt) | Decode + Prefill workers |
| `test_request_cancellation_trtllm_prefill_first_remote_decode_cancel` | Disaggregated | Prefill-first | Remote decode | Streaming chat (5 responses read) | Decode + Prefill workers |
## Environment Variables **Run examples:**
```bash
pytest tests/fault_tolerance/cancellation/test_trtllm.py::test_request_cancellation_trtllm_aggregated -v -s
pytest tests/fault_tolerance/cancellation/test_trtllm.py::test_request_cancellation_trtllm_decode_first_decode_cancel -v -s
# ... (other tests follow same pattern)
```
- `DYN_LOG`: Set to `debug` or `trace` for verbose logging (automatically set to `debug` by worker processes) #### SGLang Cancellation Tests
- `CUDA_VISIBLE_DEVICES`: Control which GPUs are used for testing
## Expected Test Duration | Test | Mode | Cancellation Phase | Request Type | Setup | Notes |
|------|------|-------------------|--------------|-------|-------|
| `test_request_cancellation_sglang_aggregated` | Aggregated | During generation | 3 scenarios: completion, chat, streaming chat (1 response read) | 1 worker | ⚠️ Flaky: SGLang prefill cancellation issues |
| `test_request_cancellation_sglang_decode_cancel` | Disaggregated | Remote decode | Streaming chat (1 response read) | Decode + Prefill workers | Requires 2 GPUs |
The tests typically take 2-3 minutes to complete each, including: **Run examples:**
- Model download/loading time (if not cached) - can take 1-2 minutes for first run ```bash
- Worker startup and registration pytest tests/fault_tolerance/cancellation/test_sglang.py::test_request_cancellation_sglang_aggregated -v -s
- Request processing and response validation pytest tests/fault_tolerance/cancellation/test_sglang.py::test_request_cancellation_sglang_decode_cancel -v -s
- Worker failure simulation and migration (for migration test) / Request cancellation validation (for cancellation tests) ```
- Cleanup
## Troubleshooting ### Common Cancellation Test Pattern
If tests fail: 1. Start frontend and workers (configuration varies by test)
2. Send request (type varies by test scenario)
3. Poll for request ID in worker logs
4. For streaming: read N responses before cancellation
5. Cancel the request via API
6. Verify cancellation messages in worker and frontend logs
1. Check that NATS and etcd services are running **Verification patterns:**
2. Verify vLLM backend is properly installed - Aggregated mode: "Aborted Request ID" in worker logs
3. Ensure sufficient GPU memory is available - Remote prefill: "Aborted Request ID" in prefill, "Aborted Remote Request ID" in decode
4. Check internet connectivity for model download from HuggingFace - Remote decode: "Aborted Request ID" in decode, "Aborted Remote Request ID" in prefill
5. Review test logs for specific error messages
6. Verify that the DeepSeek-R1-Distill-Llama-8B model can be accessed
7. For cancellation tests: Check that timeout-based cancellation is working properly and cancellation patterns appear in logs
8. For migration tests: Verify worker process termination and stream recreation behavior
9. For disaggregated cancellation tests: Ensure both prefill and decode workers are properly started and cancellation works across the disaggregated setup
...@@ -350,7 +350,7 @@ def poll_for_pattern( ...@@ -350,7 +350,7 @@ def poll_for_pattern(
while iteration < max_iterations: while iteration < max_iterations:
# Read the process log # Read the process log
log_content = read_log_content(process._log_path) log_content = read_log_content(process.log_path)
new_content = log_content[current_offset:] new_content = log_content[current_offset:]
# Look for the pattern # Look for the pattern
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import logging
import shutil
import threading
import time
import pytest
import requests
from tests.utils.constants import FAULT_TOLERANCE_MODEL_NAME
from tests.utils.engine_process import FRONTEND_PORT
from tests.utils.managed_process import ManagedProcess
logger = logging.getLogger(__name__)
class DynamoFrontendProcess(ManagedProcess):
"""Process manager for Dynamo frontend"""
def __init__(self, request):
command = ["python", "-m", "dynamo.frontend", "--router-mode", "round-robin"]
log_dir = f"{request.node.name}_frontend"
# Clean up any existing log directory from previous runs
try:
shutil.rmtree(log_dir)
logger.info(f"Cleaned up existing log directory: {log_dir}")
except FileNotFoundError:
# Directory doesn't exist, which is fine
pass
super().__init__(
command=command,
display_output=True,
terminate_existing=True,
log_dir=log_dir,
)
def start_completion_request() -> tuple:
"""
Start a long-running completion request in a separate thread.
Returns:
tuple: (request_thread, response_list)
"""
response_list = [] # Thread safe is not required as only one thread writes to it
def send_request():
prompt = "Tell me a long long long story about yourself?"
max_tokens = 8192
timeout = 240 # Extended timeout for long request
payload = {
"model": FAULT_TOLERANCE_MODEL_NAME,
"prompt": prompt,
"max_tokens": max_tokens,
}
headers = {"Content-Type": "application/json"}
logger.info(
f"Sending completion request with prompt: '{prompt[:50]}...' and max_tokens: {max_tokens}"
)
try:
response = requests.post(
f"http://localhost:{FRONTEND_PORT}/v1/completions",
headers=headers,
json=payload,
timeout=timeout,
)
logger.info(f"Received response with status code: {response.status_code}")
response_list.append(response)
except Exception as e:
logger.error(f"Request failed with error: {e}")
request_thread = threading.Thread(target=send_request, daemon=True)
request_thread.start()
return request_thread, response_list
def determine_request_receiving_worker(
worker1: ManagedProcess, worker2: ManagedProcess, receiving_pattern: str
) -> tuple:
"""
Determine which worker received the request using parallel polling.
Args:
worker1: First worker process
worker2: Second worker process
receiving_pattern: Log pattern indicating request receipt
Returns:
Tuple of (worker_with_request, name_of_worker_with_request)
"""
worker1_results: list[bool] = []
worker2_results: list[bool] = []
# Poll both workers in parallel
def poll_worker(worker: ManagedProcess, result_list: list[bool]):
max_wait_ms = 500
poll_interval_ms = 5
max_iterations = max_wait_ms // poll_interval_ms
iteration = 0
while iteration < max_iterations:
# Check if the worker logs contain 'New Request ID:' message
try:
with open(worker.log_path, "r") as f:
log_content = f.read()
if receiving_pattern in log_content:
result_list.append(True)
return
except Exception as e:
logger.error(f"Could not read log file {worker.log_path}: {e}")
return
time.sleep(poll_interval_ms / 1000.0)
iteration += 1
# Look for which worker received the request
thread1 = threading.Thread(
target=poll_worker, args=(worker1, worker1_results), daemon=True
)
thread2 = threading.Thread(
target=poll_worker, args=(worker2, worker2_results), daemon=True
)
thread1.start()
thread2.start()
thread1.join(timeout=1)
thread2.join(timeout=1)
# Get results from lists
worker1_received = worker1_results[0] if worker1_results else False
worker2_received = worker2_results[0] if worker2_results else False
if worker1_received and not worker2_received:
logger.info("Request was received by Worker 1")
return worker1, "Worker 1"
elif worker2_received and not worker1_received:
logger.info("Request was received by Worker 2")
return worker2, "Worker 2"
elif worker1_received and worker2_received:
pytest.fail("Both workers received the request")
else:
pytest.fail("Neither worker received the request")
def validate_completion_response(
request_thread: threading.Thread, response_list: list
) -> None:
"""
Wait for and validate the completion response after worker failure.
Args:
request_thread: The thread running the completion request
response_list: List containing the response from the request
"""
request_thread.join(timeout=240)
if request_thread.is_alive():
pytest.fail("Request did not complete within 240 seconds")
# Get the response
if len(response_list) != 1:
pytest.fail(f"Received {len(response_list)} responses, expected 1")
response = response_list[0]
assert (
response.status_code == 200
), f"Request failed with status {response.status_code}: {response.text}"
try:
data = response.json()
except ValueError:
pytest.fail(f"Response is not valid JSON: {response.text}")
# Validate OpenAI completion response structure
assert "choices" in data, f"Response missing 'choices' field: {data}"
assert len(data["choices"]) > 0, f"Response has empty 'choices': {data}"
assert "text" in data["choices"][0], f"Response choice missing 'text' field: {data}"
assert data["choices"][0]["text"], f"Response text is empty: {data}"
logger.info(
f"Received valid completion response: {data['choices'][0]['text'][:100]}..."
)
logger.info("Request completed successfully")
def verify_migration_occurred(frontend_process: DynamoFrontendProcess) -> None:
"""
Verify that migration occurred by checking frontend logs for stream disconnection message.
Args:
frontend_process: The frontend process to check logs for
"""
log_path = frontend_process.log_path
try:
with open(log_path, "r") as f:
log_content = f.read()
except Exception as e:
pytest.fail(f"Could not read frontend log file {log_path}: {e}")
assert (
"Stream disconnected... recreating stream..." in log_content
), "'Stream disconnected... recreating stream...' message not found in logs"
assert (
"Cannot recreate stream: " not in log_content
), "'Cannot recreate stream: ...' error found in logs"
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment