Unverified Commit ae51b3f4 authored by Jacky's avatar Jacky Committed by GitHub
Browse files

test: Request Migration Docs and E2E vLLM Tests (#2177)


Signed-off-by: default avatarJacky <18255193+kthui@users.noreply.github.com>
Co-authored-by: default avatarNeelay Shah <neelays@nvidia.com>
parent faafa5ff
...@@ -13,16 +13,10 @@ python -m dynamo.llama_cpp --model-path /data/models/Qwen3-0.6B-Q8_0.gguf [args] ...@@ -13,16 +13,10 @@ python -m dynamo.llama_cpp --model-path /data/models/Qwen3-0.6B-Q8_0.gguf [args]
## Request Migration ## Request Migration
In a [Distributed System](#distributed-system), a request may fail due to connectivity issues between the Frontend and the Backend. You can enable [request migration](../../../docs/architecture/request_migration.md) to handle worker failures gracefully. Use the `--migration-limit` flag to specify how many times a request can be migrated to another worker:
The Frontend will automatically track which Backends are having connectivity issues with it and avoid routing new requests to the Backends with known connectivity issues.
For ongoing requests, there is a `--migration-limit` flag which can be set on the Backend that tells the Frontend how many times a request can be migrated to another Backend should there be a loss of connectivity to the current Backend.
For example,
```bash ```bash
python3 -m dynamo.llama_cpp ... --migration-limit=3 python3 -m dynamo.llama_cpp ... --migration-limit=3
``` ```
indicates a request to this model may be migrated up to 3 times to another Backend, before failing the request, should the Frontend detects a connectivity issue to the current Backend.
The migrated request will continue responding to the original request, allowing for a seamless transition between Backends, and a reduced overall request failure rate at the Frontend for enhanced user experience. This allows a request to be migrated up to 3 times before failing. See the [Request Migration Architecture](../../../docs/architecture/request_migration.md) documentation for details on how this works.
...@@ -143,19 +143,13 @@ When using MoE models, you can also use the our implementation of the native SGL ...@@ -143,19 +143,13 @@ When using MoE models, you can also use the our implementation of the native SGL
## Request Migration ## Request Migration
In a [Distributed System](#distributed-system), a request may fail due to connectivity issues between the Frontend and the Backend. You can enable [request migration](../../../docs/architecture/request_migration.md) to handle worker failures gracefully. Use the `--migration-limit` flag to specify how many times a request can be migrated to another worker:
The Frontend will automatically track which Backends are having connectivity issues with it and avoid routing new requests to the Backends with known connectivity issues.
For ongoing requests, there is a `--migration-limit` flag which can be set on the Backend that tells the Frontend how many times a request can be migrated to another Backend should there be a loss of connectivity to the current Backend.
For example,
```bash ```bash
python3 -m dynamo.sglang ... --migration-limit=3 python3 -m dynamo.sglang ... --migration-limit=3
``` ```
indicates a request to this model may be migrated up to 3 times to another Backend, before failing the request, should the Frontend detects a connectivity issue to the current Backend.
The migrated request will continue responding to the original request, allowing for a seamless transition between Backends, and a reduced overall request failure rate at the Frontend for enhanced user experience. This allows a request to be migrated up to 3 times before failing. See the [Request Migration Architecture](../../../docs/architecture/request_migration.md) documentation for details on how this works.
## Advanced Examples ## Advanced Examples
......
...@@ -263,19 +263,13 @@ Dynamo with TensorRT-LLM supports two methods for transferring KV cache in disag ...@@ -263,19 +263,13 @@ Dynamo with TensorRT-LLM supports two methods for transferring KV cache in disag
## Request Migration ## Request Migration
In a [Distributed System](#distributed-system), a request may fail due to connectivity issues between the Frontend and the Backend. You can enable [request migration](../../../docs/architecture/request_migration.md) to handle worker failures gracefully. Use the `--migration-limit` flag to specify how many times a request can be migrated to another worker:
The Frontend will automatically track which Backends are having connectivity issues with it and avoid routing new requests to the Backends with known connectivity issues.
For ongoing requests, there is a `--migration-limit` flag which can be set on the Backend that tells the Frontend how many times a request can be migrated to another Backend should there be a loss of connectivity to the current Backend.
For example,
```bash ```bash
python3 -m dynamo.trtllm ... --migration-limit=3 python3 -m dynamo.trtllm ... --migration-limit=3
``` ```
indicates a request to this model may be migrated up to 3 times to another Backend, before failing the request, should the Frontend detects a connectivity issue to the current Backend.
The migrated request will continue responding to the original request, allowing for a seamless transition between Backends, and a reduced overall request failure rate at the Frontend for enhanced user experience. This allows a request to be migrated up to 3 times before failing. See the [Request Migration Architecture](../../../docs/architecture/request_migration.md) documentation for details on how this works.
## Client ## Client
......
...@@ -235,16 +235,10 @@ The [documentation](https://docs.vllm.ai/en/v0.9.2/configuration/serve_args.html ...@@ -235,16 +235,10 @@ The [documentation](https://docs.vllm.ai/en/v0.9.2/configuration/serve_args.html
## Request Migration ## Request Migration
In a [Distributed System](#distributed-system), a request may fail due to connectivity issues between the Frontend and the Backend. You can enable [request migration](../../../docs/architecture/request_migration.md) to handle worker failures gracefully. Use the `--migration-limit` flag to specify how many times a request can be migrated to another worker:
The Frontend will automatically track which Backends are having connectivity issues with it and avoid routing new requests to the Backends with known connectivity issues.
For ongoing requests, there is a `--migration-limit` flag which can be set on the Backend that tells the Frontend how many times a request can be migrated to another Backend should there be a loss of connectivity to the current Backend.
For example,
```bash ```bash
python3 -m dynamo.vllm ... --migration-limit=3 python3 -m dynamo.vllm ... --migration-limit=3
``` ```
indicates a request to this model may be migrated up to 3 times to another Backend, before failing the request, should the Frontend detects a connectivity issue to the current Backend.
The migrated request will continue responding to the original request, allowing for a seamless transition between Backends, and a reduced overall request failure rate at the Frontend for enhanced user experience. This allows a request to be migrated up to 3 times before failing. See the [Request Migration Architecture](../../../docs/architecture/request_migration.md) documentation for details on how this works.
...@@ -106,6 +106,7 @@ class DecodeWorkerHandler(BaseWorkerHandler): ...@@ -106,6 +106,7 @@ class DecodeWorkerHandler(BaseWorkerHandler):
async def generate(self, request): async def generate(self, request):
request_id = str(uuid.uuid4().hex) request_id = str(uuid.uuid4().hex)
logger.debug(f"New Request ID: {request_id}")
prompt = TokensPrompt(prompt_token_ids=request["token_ids"]) prompt = TokensPrompt(prompt_token_ids=request["token_ids"])
...@@ -164,6 +165,8 @@ class PrefillWorkerHandler(BaseWorkerHandler): ...@@ -164,6 +165,8 @@ class PrefillWorkerHandler(BaseWorkerHandler):
async def generate(self, request): async def generate(self, request):
request_id = request["request_id"] request_id = request["request_id"]
logger.debug(f"New Prefill Request ID: {request_id}")
prompt = TokensPrompt(prompt_token_ids=request["token_ids"]) prompt = TokensPrompt(prompt_token_ids=request["token_ids"])
sampling_params = msgspec.convert(request["sampling_params"], SamplingParams) sampling_params = msgspec.convert(request["sampling_params"], SamplingParams)
......
# Request Migration Architecture
This document describes how Dynamo implements request migration to handle worker failures gracefully during LLM text generation. Request migration allows in-progress requests to continue on different workers when the original worker becomes unavailable, providing fault tolerance and improved user experience.
## Overview
Request migration is implemented through a Migration operator that sits in the LLM processing pipeline between the Backend operator and the service backend. When a worker fails during request processing, the migration system preserves the partial generation state and recreates the request on a new worker to continue from where the previous worker left off.
## Architecture Components
### Migrator
The migration system is integrated into the LLM processing pipeline between the frontend preprocessing and the actual service backends. This positioning allows it to intercept all communication flows and manage failure scenarios transparently.
Key responsibilities:
- Intercepts all requests and responses flowing through the pipeline
- Detects worker failure scenarios through error pattern matching
- Manages retry logic with configurable migration limits
- Tracks partial response state for seamless continuation
### Migration Limit Configuration
Each model can be configured with a migration limit parameter that specifies the maximum number of times a request can be migrated to another worker:
- Default behavior: no migration allowed
- Can be set independently for different engine types
- Applicable to LLM worker nodes that perform inference
- Allows engines to override user-specified limits for compatibility
## Token State Tracking and Request Migration
The core of the migration system is the ability to preserve and continue partial generations through token state management. This ensures that when a worker fails mid-generation, the new worker can seamlessly continue from the exact point of failure.
### Token Accumulation Process
When a request is being processed and responses are flowing back from a worker, the migration system tracks every token that has been successfully generated:
1. **Initial Request State**: The system starts with the original preprocessed request containing the initial prompt tokens.
2. **Response Tracking**: As each response arrives from the worker, the migration system extracts the newly generated tokens and appends them to the request's token sequence. This creates accumulates all tokens that have been generated.
3. **Token Count Management**: The system also updates the remaining token budget to reflect the number of tokens already generated, ensuring that the total generation stays within the originally requested limits.
### Migration Trigger Scenarios
The migration system handles two distinct failure scenarios:
#### 1. New Request Migration (Initial Connection Failure)
**Scenario**: Worker is unreachable when creating the initial connection.
**Error Pattern**: Communication system reports chosen worker instance is unavailable.
**Migration Process**:
- Detects connection failure during initial stream setup
- Decrements migration retry count
- Attempts to create a new stream with the original request
- No partial state to preserve since generation hasn't started
#### 2. Ongoing Request Migration (Mid-Stream Disconnection)
**Scenario**: Connection lost during active generation after partial responses have been received.
**Error Pattern**: Stream termination detected before generation completion.
**Migration Process**:
1. **Failure Detection**: The system detects the stream disconnection through error monitoring.
2. **State Preservation**: At this point, the request's token sequence contains both the original prompt tokens and all successfully generated tokens from the failed worker.
3. **New Stream Creation**: A fresh stream is created with the accumulated request state, ensuring the new worker has complete context.
4. **Continuation**: The new worker receives the request with the full token context and continues generation from the exact point where the previous worker left off.
### Seamless Token Flow and Request State Evolution
From the client's perspective, the token stream appears continuous and uninterrupted. The client receives tokens from the first worker until failure occurs, then seamlessly continues receiving tokens from the backup worker without any indication of the underlying migration.
The request state evolves dynamically during processing. Initially, the request contains only the original prompt tokens. As generation proceeds, each successfully generated token is appended to the request's token sequence, creating a growing record of the complete conversation context.
When a migration occurs, this accumulated state is transferred to the new worker, which uses it to reconstruct the complete context. The new worker then continues generation as if it had been processing the request from the beginning, but starting from the current position in the sequence.
The migration is transparent because:
1. No tokens are lost or duplicated during the transition
2. The new worker has complete context via the accumulated token sequence
3. Generation continues from the exact failure point
4. Response streaming maintains consistent format and timing
This token accumulation mechanism ensures that migrations are truly seamless, preserving all computational work and maintaining generation quality across worker transitions.
## Benefits
1. **Fault Tolerance**: System continues operating during individual worker failures
2. **Resource Efficiency**: Partial generations are preserved rather than restarted
3. **Seamless User Experience**: Users experience no interruption during worker failures
4. **Configurable Behavior**: Migration limits allow tuning based on deployment requirements
5. **No Token Loss**: Complete preservation of generation state across migrations
## Design Considerations
The migration system is designed with several important architectural considerations:
**Engine Compatibility**: Different LLM engines may have varying capabilities for handling migrated requests. The system allows engines to override migration settings to ensure compatibility and correctness.
**Multi-Model Support**: Since a frontend may serve multiple models simultaneously, migration limits can be configured at the engine level, providing flexibility for different model types with varying reliability characteristics.
**State Management**: The system carefully tracks not only token sequences but also metadata such as remaining token budgets, stop conditions, and sampling parameters to ensure complete state preservation.
**Error Handling**: The migration system distinguishes between different types of failures and applies appropriate recovery strategies for each scenario.
## Operational Impact
Request migration fundamentally changes how the system handles failures, moving from a "fail-fast" approach to a "graceful degradation" model. This architectural shift enables higher availability and better resource utilization while maintaining the same external API contract for clients.
...@@ -71,6 +71,7 @@ The `model_type` can be: ...@@ -71,6 +71,7 @@ The `model_type` can be:
- `model_name`: The name to call the model. Your incoming HTTP requests model name must match this. Defaults to the hugging face repo name, the folder name, or the GGUF file name. - `model_name`: The name to call the model. Your incoming HTTP requests model name must match this. Defaults to the hugging face repo name, the folder name, or the GGUF file name.
- `context_length`: Max model length in tokens. Defaults to the model's set max. Only set this if you need to reduce KV cache allocation to fit into VRAM. - `context_length`: Max model length in tokens. Defaults to the model's set max. Only set this if you need to reduce KV cache allocation to fit into VRAM.
- `kv_cache_block_size`: Size of a KV block for the engine, in tokens. Defaults to 16. - `kv_cache_block_size`: Size of a KV block for the engine, in tokens. Defaults to 16.
- `migration_limit`: Maximum number of times a request may be [migrated to another Instance](../architecture/request_migration.md). Defaults to 0.
See `components/backends` for full code examples. See `components/backends` for full code examples.
......
...@@ -211,19 +211,13 @@ The KV-aware routing arguments: ...@@ -211,19 +211,13 @@ The KV-aware routing arguments:
### Request Migration ### Request Migration
In a [Distributed System](#distributed-system), a request may fail due to connectivity issues between the HTTP Server and the Worker Engine. In a [Distributed System](#distributed-system), you can enable [request migration](../architecture/request_migration.md) to handle worker failures gracefully. Use the `--migration-limit` flag to specify how many times a request can be migrated to another worker:
The HTTP Server will automatically track which Worker Engines are having connectivity issues with it and avoid routing new requests to the Engines with known connectivity issues.
For ongoing requests, there is a `--migration-limit` flag which can be set on the Worker Engines that tells the HTTP Server how many times a request can be migrated to another Engine should there be a loss of connectivity to the current Engine.
For example,
```bash ```bash
dynamo-run in=dyn://... out=vllm ... --migration-limit=3 dynamo-run in=dyn://... out=<engine> ... --migration-limit=3
``` ```
indicates a request to this model may be migrated up to 3 times to another Engine, before failing the request, should the HTTP Server detects a connectivity issue to the current Engine.
The migrated request will continue responding to the original request, allowing for a seamless transition between Engines, and a reduced overall request failure rate at the HTTP Server for enhanced user experience. This allows a request to be migrated up to 3 times before failing. See the [Request Migration Architecture](../architecture/request_migration.md) documentation for details on how this works.
## Full usage details ## Full usage details
......
# Fault Tolerance Tests
This directory contains end-to-end tests for Dynamo's fault tolerance capabilities.
## Tests
### `test_request_migration.py`
Tests worker fault tolerance with migration support using the `test_request_migration_vllm` function. This test:
0. Downloads the DeepSeek-R1-Distill-Llama-8B model from HuggingFace if not already cached
1. Starts a Dynamo frontend using `python -m dynamo.frontend` with round-robin routing
2. Starts 2 workers sequentially using `python3 -m dynamo.vllm` with specific configuration:
- Model: `deepseek-ai/DeepSeek-R1-Distill-Llama-8B`
- `--enforce-eager`, `--gpu-memory-utilization 0.45`
- `--max-model-len 8192`, `--migration-limit 3`
3. Waits for both workers to be fully ready (looking for "Reading Events from" messages)
4. Sends a test request ("Who are you?", 100 tokens) to determine which worker handles requests
5. Determines primary/backup worker roles based on round-robin routing and log analysis
6. Sends a long completion request ("Tell me a long long long story about yourself?", 8000 tokens) in a separate thread
7. Waits 0.5 seconds, then kills the primary worker using SIGKILL process group termination
8. Verifies the request completes successfully despite the worker failure (with 240s timeout)
9. Checks that the frontend logs contain "Stream disconnected... recreating stream..." indicating migration occurred
## Prerequisites
- vLLM backend installed (`pip install ai-dynamo-vllm`)
- NATS and etcd services running (provided by `runtime_services` fixture)
- Access to DeepSeek-R1-Distill-Llama-8B model (automatically downloaded from HuggingFace)
- Sufficient GPU memory (test uses 0.45 GPU memory utilization)
## Running the Tests
To run the fault tolerance tests:
```bash
# Run all fault tolerance tests
pytest /workspace/tests/fault_tolerance
# Run specific test with verbose output
pytest /workspace/tests/fault_tolerance/test_request_migration.py::test_request_migration_vllm -v
# Run with specific markers
pytest -m "e2e and vllm" /workspace/tests/fault_tolerance
# Run with debug logging
pytest /workspace/tests/fault_tolerance/test_request_migration.py::test_request_migration_vllm -v -s
```
## Test Markers
- `@pytest.mark.e2e`: End-to-end test
- `@pytest.mark.vllm`: Requires vLLM backend
- `@pytest.mark.gpu_1`: Requires single GPU access
- `@pytest.mark.slow`: Known to be slow (due to model loading and inference)
## Environment Variables
- `DYN_LOG`: Set to `debug` or `trace` for verbose logging (automatically set to `debug` by worker processes)
- `CUDA_VISIBLE_DEVICES`: Control which GPUs are used for testing
## Expected Test Duration
The test typically takes 2-3 minutes to complete, including:
- Model download/loading time (if not cached) - can take 1-2 minutes for first run
- Worker startup and registration
- Request processing and response validation
- Worker failure simulation and migration
- Cleanup
## Troubleshooting
If tests fail:
1. Check that NATS and etcd services are running
2. Verify vLLM backend is properly installed
3. Ensure sufficient GPU memory is available (test requires ~45% GPU memory)
4. Check internet connectivity for model download from HuggingFace
5. Review test logs for specific error messages
6. Verify that the DeepSeek-R1-Distill-Llama-8B model can be accessed
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import logging
import os
import queue
import shutil
import threading
import time
import pytest
import requests
from huggingface_hub import snapshot_download
from tests.utils.managed_process import ManagedProcess, terminate_process_tree
logger = logging.getLogger(__name__)
class DynamoFrontendProcess(ManagedProcess):
"""Process manager for Dynamo frontend"""
def __init__(self, request):
command = ["python", "-m", "dynamo.frontend", "--router-mode", "round-robin"]
log_dir = f"{request.node.name}_frontend"
# Clean up any existing log directory from previous runs
try:
shutil.rmtree(log_dir)
logger.info(f"Cleaned up existing log directory: {log_dir}")
except FileNotFoundError:
# Directory doesn't exist, which is fine
pass
super().__init__(
command=command,
display_output=True,
terminate_existing=True,
log_dir=log_dir,
)
class DynamoWorkerProcess(ManagedProcess):
"""Process manager for Dynamo worker with vLLM backend"""
def __init__(self, request, worker_id: str):
self.worker_id = worker_id
command = [
"python3",
"-m",
"dynamo.vllm",
"--model",
"deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
"--enforce-eager",
"--gpu-memory-utilization",
"0.45",
"--max-model-len",
"8192",
"--migration-limit",
"3",
]
# Set debug logging environment
env = os.environ.copy()
env["DYN_LOG"] = "debug"
env["DYN_SYSTEM_ENABLED"] = "true"
env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]'
env["DYN_SYSTEM_PORT"] = f"808{worker_id[-1]}"
# TODO: Have the managed process take a command name explicitly to distinguish
# between processes started with the same command.
log_dir = f"{request.node.name}_{worker_id}"
# Clean up any existing log directory from previous runs
try:
shutil.rmtree(log_dir)
logger.info(f"Cleaned up existing log directory: {log_dir}")
except FileNotFoundError:
# Directory doesn't exist, which is fine
pass
super().__init__(
command=command,
env=env,
health_check_urls=[
(f"http://localhost:808{worker_id[-1]}/health", self.is_ready)
],
timeout=300,
display_output=True,
terminate_existing=False,
log_dir=log_dir,
)
def get_pid(self):
"""Get the PID of the worker process"""
return self.proc.pid if self.proc else None
def is_ready(self, response) -> bool:
"""Check the health of the worker process"""
try:
data = response.json()
if data.get("status") == "ready":
logger.info(f"{self.worker_id} status is ready")
return True
logger.warning(
f"{self.worker_id} status is not ready: {data.get('status')}"
)
except ValueError:
logger.warning(f"{self.worker_id} health response is not valid JSON")
return False
def download_model() -> None:
"""
Download the DeepSeek-R1-Distill-Llama-8B model from HuggingFace Hub if not already cached.
"""
model_id = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B"
logger.info(f"Caching model {model_id}...")
max_retries = 5
retry_delay = 30 # seconds
for attempt in range(max_retries):
try:
# Download the model to the default cache directory
# This will skip download if the model is already cached
snapshot_download(
repo_id="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
repo_type="model",
local_files_only=False,
)
logger.info(f"Model {model_id} is ready for use")
return # Success, exit the function
except Exception as e:
if attempt < max_retries - 1: # Not the last attempt
logger.warning(
f"Failed to download model {model_id} (attempt {attempt + 1}/{max_retries}): {e}"
)
logger.info(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
else: # Last attempt failed
logger.error(
f"Failed to download model {model_id} after {max_retries} attempts: {e}"
)
raise
def send_completion_request(
prompt: str, max_tokens: int, timeout: int = 120
) -> requests.Response:
"""Send a completion request to the frontend"""
payload = {
"model": "deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
"prompt": prompt,
"max_tokens": max_tokens,
}
headers = {"Content-Type": "application/json"}
logger.info(
f"Sending completion request with prompt: '{prompt[:50]}...' and max_tokens: {max_tokens}"
)
try:
response = requests.post(
"http://localhost:8080/v1/completions",
headers=headers,
json=payload,
timeout=timeout,
)
logger.info(f"Received response with status code: {response.status_code}")
return response
except requests.exceptions.Timeout:
logger.error(f"Request timed out after {timeout} seconds")
raise
except requests.exceptions.RequestException as e:
logger.error(f"Request failed with error: {e}")
raise
def validate_openai_response(response: requests.Response) -> None:
"""Validate that the response is a proper OpenAI completion response"""
assert (
response.status_code == 200
), f"Request failed with status {response.status_code}: {response.text}"
try:
data = response.json()
except ValueError:
pytest.fail(f"Response is not valid JSON: {response.text}")
# Validate OpenAI completion response structure
assert "choices" in data, f"Response missing 'choices' field: {data}"
assert len(data["choices"]) > 0, f"Response has empty 'choices': {data}"
assert "text" in data["choices"][0], f"Response choice missing 'text' field: {data}"
assert data["choices"][0]["text"], f"Response text is empty: {data}"
logger.info(
f"Received valid completion response: {data['choices'][0]['text'][:100]}..."
)
def check_worker_received_request(worker_process: DynamoWorkerProcess) -> bool:
"""Check if the worker logs contain 'New Request ID:' message indicating it received a request"""
log_path = worker_process._log_path
if log_path and os.path.exists(log_path):
try:
with open(log_path, "r") as f:
log_content = f.read()
return "New Request ID: " in log_content
except Exception as e:
logger.warning(f"Could not read worker log file {log_path}: {e}")
return False
def determine_worker_roles(worker1: DynamoWorkerProcess, worker2: DynamoWorkerProcess):
"""Determine primary and backup workers based on which worker handled the test request"""
worker1_received_test = check_worker_received_request(worker1)
worker2_received_test = check_worker_received_request(worker2)
if worker1_received_test and not worker2_received_test:
primary_worker = (worker2, "Worker 2")
backup_worker = (worker1, "Worker 1")
logger.info("Test request was handled by Worker 1")
return primary_worker, backup_worker
elif worker2_received_test and not worker1_received_test:
primary_worker = (worker1, "Worker 1")
backup_worker = (worker2, "Worker 2")
logger.info("Test request was handled by Worker 2")
return primary_worker, backup_worker
else:
pytest.fail(
f"Could not determine which worker handled the test request. Worker1: {worker1_received_test}, Worker2: {worker2_received_test}"
)
def start_completion_request(primary_worker_name: str):
"""
Start a request in a separate thread.
Args:
primary_worker_name: Name of the primary worker expected to handle the request
Returns:
tuple: (request_thread, response_queue)
"""
response_queue: queue.Queue[requests.Response] = queue.Queue()
def send_formal_request():
response = send_completion_request(
"Tell me a long long long story about yourself?",
8000,
timeout=240, # Extended timeout for long request
)
response_queue.put(response)
request_thread = threading.Thread(target=send_formal_request)
request_thread.start()
return request_thread, response_queue
def validate_completion_response(
request_thread: threading.Thread, response_queue: queue.Queue
):
"""
Wait for and validate the completion response after worker failure.
Args:
request_thread: The thread running the completion request
response_queue: Queue containing the response from the request
"""
request_thread.join(timeout=300)
if request_thread.is_alive():
pytest.fail("Request did not complete within timeout")
# Get the response
if response_queue.empty():
pytest.fail("No response received for request")
response = response_queue.get()
# Validate the response
validate_openai_response(response)
logger.info("✓ Request completed successfully after worker failure")
def verify_migration_occurred(frontend_process: DynamoFrontendProcess) -> None:
"""
Verify that migration occurred by checking frontend logs for stream disconnection message.
Args:
frontend_process: The frontend process to check logs for
Raises:
pytest.fail: If migration message is not found in logs
"""
log_path = frontend_process._log_path
if not log_path or not os.path.exists(log_path):
pytest.fail(f"Frontend log file not found at {log_path}")
try:
with open(log_path, "r") as f:
log_content = f.read()
if "Stream disconnected... recreating stream..." in log_content:
logger.info(
"✓ Migration detected: Found migration message in frontend logs"
)
return
else:
pytest.fail(
"Expected migration did not occur - migration message not found in frontend logs"
)
except Exception as e:
pytest.fail(f"Could not read frontend log file {log_path}: {e}")
@pytest.mark.vllm
@pytest.mark.gpu_1
@pytest.mark.e2e
@pytest.mark.slow
def test_request_migration_vllm(request, runtime_services):
"""
End-to-end test for worker fault tolerance with migration support.
This test verifies that when a worker is killed during request processing,
the system can handle the failure gracefully and migrate the request to
another worker.
"""
# Step 0: Download the model from HuggingFace if not already cached
download_model()
# Step 1: Start the frontend
with DynamoFrontendProcess(request) as frontend:
logger.info("Frontend started successfully")
# Step 2: Start 2 workers sequentially
# Start worker1 first and wait for it to be ready
logger.info("Starting worker 1...")
worker1 = DynamoWorkerProcess(request, "worker1")
with worker1:
# Start worker2 after worker1 is ready
logger.info("Starting worker 2...")
worker2 = DynamoWorkerProcess(request, "worker2")
with worker2:
logger.info(f"Worker 1 PID: {worker1.get_pid()}")
logger.info(f"Worker 2 PID: {worker2.get_pid()}")
# Step 3: Send a test request to see which worker handles it
logger.info("Sending test request to determine worker assignment...")
test_response = send_completion_request("Who are you?", 100, timeout=60)
validate_openai_response(test_response)
logger.info("Test request completed successfully")
# Step 4: Determine worker roles based on test request handling
# Frontend must use round-robin for the detection to work correctly
primary_worker, backup_worker = determine_worker_roles(worker1, worker2)
# Step 5: Send the formal request (expected to be received by the primary worker)
logger.info(
f"Sending formal request - expected to be handled by {primary_worker[1]}"
)
request_thread, response_queue = start_completion_request(
primary_worker[1]
)
# Step 6: Wait 0.5 seconds after sending the formal request, then kill the primary worker
logger.info(
f"Killing {primary_worker[1]} with PID {primary_worker[0].get_pid()}"
)
time.sleep(0.5)
terminate_process_tree(
primary_worker[0].get_pid(), immediate_kill=True, timeout=0
)
# Step 7: Validate the completion response
logger.info("Waiting for formal request to complete")
validate_completion_response(request_thread, response_queue)
# Step 8: Verify migration occurred
logger.info("Checking for migration message in frontend logs")
verify_migration_occurred(frontend)
logger.info(
"Test completed successfully - migration is detected and the request was successful"
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment