"deploy/vscode:/vscode.git/clone" did not exist on "d81a00efdf29d3e39b8a52d6b8151ac117400e81"
Unverified Commit 52c75363 authored by Tzu-Ling Kan's avatar Tzu-Ling Kan Committed by GitHub
Browse files

test: fault injection tests for k8s (#3194)


Signed-off-by: default avatarnnshah1 <neelays@nvidia.com>
Signed-off-by: default avatartzulingk@nvidia.com <tzulingk@nvidia.com>
Co-authored-by: default avatarnnshah1 <neelays@nvidia.com>
parent 116b9b43
<!--
SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# Fault Tolerance Test Suite
As a large scale distributed inference serving framework in addition
to providing high throughput and low latency, Dynamo needs to
provide fault detection, resilency, and quick recovery in the face of
unforseen failures. In order to test Dynamo we are developing a test
suite to inject and measure the impact of different types of failure
conditions.
## Test Architecture
The fault tolerance test suite is designed as a set of pytest
configurations that launch typical dynamo deployments in a Kubernetes
environemnt and then inject failures by terminating processes or
pods. To test the recovery time and impact of failures a set number of
clients are launched in parallel. Each client sends a set number of
synchronous requests. Log files are stored for each pod as well as for
each client and inspected using a post-processing script.
> [!NOTE]
> Test pass / failure is not an indication of SLA for recovery or resilience
> It only indicates is the test was executed and data was collected
### Test Sequence Diagram
```mermaid
sequenceDiagram
participant Tester as Test Runner
participant DynamoKubernetes as Dynamo Kubernetes Platform
participant DynamoDeployment as Dynamo Deployment
participant Clients as Client Processes
participant Logs as Log Files
participant Parser as Results Parser
Tester->>DynamoKubernetes: Deploy Dynamo graph (Frontend + Workers)
DynamoKubernetes->>DynamoDeployment: Create pods/services (Frontend, Workers)
DynamoDeployment->>Tester: Signal ready (all pods running)
Tester->>Clients: Launch clients (concurrent requests)
Clients->>DynamoDeployment: Send requests via Port Forwarding to Frontend
Tester->>DynamoDeployment: Inject failures (delete pods/terminate processes)
Clients->>Logs: Log request results to files
DynamoDeployment->>Logs: Save pod logs
Tester->>DynamoKubernetes: Teardown deployment (delete pods/services)
DynamoKubernetes->>DynamoDeployment: Delete resources
Tester->>Parser: Parse logs
Parser->>Tester: Generate results table
```
### Test Scenarios
The test suite is organized around three core components: **Deployments**, **Client Load**, and **Failures**. Each scenario combines these elements to simulate fault conditions and measure system resilience.
#### Deployments
Deployments represent specific graphs that are deployed using the Dynamo Kubernetes Platform.
The following deployment configurations are defined in `scenarios.py`:
| Deployment Name | Description |
|-------------------------|-----------------------------------------------------------------------------|
| `agg-tp-1-dp-1` | Aggregated worker with 1 replica for each service (frontend, decode). |
| `agg-tp-1-dp-2` | Aggregated worker with 2 replicas for each service (frontend, decode). |
| `disagg-tp-1-dp-1` | Disaggregated deployment with 1 replica for each service (frontend, decode, prefill). |
| `disagg-tp-1-dp-2` | Disaggregated deployment with 2 replicas for each service (frontend, decode, prefill). |
#### Client Load
- **Concurrent Clients**: 10 clients by default, adjustable per scenario.
- **Requests per Client**: 100 requests, simulating sustained load.
- **Input/Output Token Length**: 100 tokens for both input prompts and generated outputs.
- **Request Rate Limit**: Ensures clients do not overwhelm the service, with a maximum of 1 request per second per client.
#### Failures
Failures are injected into deployed pods either by using pod delete or
sending signals to specified processes.
The following failure types are defined in `scenarios.py`:
| Failure Name | Description | Injection Method |
|--------------------------|-----------------------------------------------------------------------------|--------------------------------------------|
| `none` | No failure injection. | N/A |
| `frontend` | Terminate frontend process/pod. | `SIGINT` signal to `dynamo.frontend`. |
| `frontend_pod` | Delete frontend pod. | Kubernetes API pod deletion. |
| `decode_worker` | Terminate decode worker process/pod. | `SIGINT` signal to `dynamo.vllm` |
| `decode_worker_pod` | Delete decode worker pod. | Kubernetes API pod deletion. |
| `prefill_worker` | Terminate prefill worker process/pod. | `SIGINT` signal to`dynamo.vllm` |
| `prefill_worker_pod` | Delete prefill worker pod. | Kubernetes API pod deletion. |
| `vllm_decode_engine_core`| Terminate VLLM decode engine core process. | `SIGKILL` signal to `VLLM::EngineCore` |
| `vllm_prefill_engine_core`| Terminate VLLM prefill engine core process. | `SIGKILL` signal to `VLLM::EngineCore` |
#### Example Scenario Breakdown
**Scenario**: `agg-tp-2-dp-1-decode_worker`
- **Deployment**: Aggregation with 1 decoder worker replica (`agg-tp-2-dp-1`).
- **Client Load**: 10 clients, 100 requests each, max request rate 1/sec.
- **Failure**: Terminates 1 decoder worker process 10 seconds into the test.
#### Example Scenario Execution:
Run all deployments and failure scenarios
```bash
pytest tests/fault_tolerance/deploy/test_deployment.py -s -v --namespace ${NAMESPACE}
```
### Test Results Directory
For each test scenario a directory of log files is created and post processed to summarize the test.
```
test_fault_scenario[agg-tp-1-dp-1-none]
.
├── client_0.log.txt
├── client_1.log.txt
├── client_2.log.txt
├── client_3.log.txt
├── client_4.log.txt
├── client_5.log.txt
├── client_6.log.txt
├── client_7.log.txt
├── client_8.log.txt
├── client_9.log.txt
├── Frontend
│ ├── fault-tolerance-test-frontend-576bd784dc-jv68q.log
│ ├── fault-tolerance-test-frontend-576bd784dc-jv68q.metrics.log
│ ├── fault-tolerance-test-frontend-576bd784dc-jv68q.previous.log
│ └── fault-tolerance-test-frontend-576bd784dc-jv68q.yaml
├── test.log.txt
└── VllmDecodeWorker
├── fault-tolerance-test-vllmdecodeworker-56b7bdf447-6tzqq.log
├── fault-tolerance-test-vllmdecodeworker-56b7bdf447-6tzqq.metrics.log
├── fault-tolerance-test-vllmdecodeworker-56b7bdf447-6tzqq.previous.log
└── fault-tolerance-test-vllmdecodeworker-56b7bdf447-6tzqq.yaml
```
| File/Directory Name | Description |
|------------------------------------|------------------------------------------------------------------------------------------------|
| **client_*.log.txt** | Request/response logs for each client instance (contains JSON-formatted request details) |
| **{Service}/*.log | Container log for pod at end of test (Frontend, VllmDecodeWroer, etc.) |
| **{Service}/*.previous.log** | Previous container log for pod in case of crash / exit. (Frontend, VllmDecodeWroer, etc.). Empty if N/A. |
| **{Service}/*.metrics.log** | Metrics as reported by `/metrics` for the service |
| **{Service}/*.yaml** | yaml for pod including status transitions |
| **test.log.txt** | Primary test execution log (contains fault injection timing, process management, and test status)|
### Summary Results
Results are presented in table format after each test providing summary statistics.
```
Test Group: agg-tp-1-dp-1
╒═════════════════════════╤═══════════╤═══════════╤══════════╤═══════════╤══════════╤═══════════╤═══════════╤════════════╕
│ Failure │ Startup │ Success │ Failed │ Success │ Failed │ Latency │ Latency │ Recovery │
│ │ │ Before │ Before │ After │ After │ Before │ After │ │
╞═════════════════════════╪═══════════╪═══════════╪══════════╪═══════════╪══════════╪═══════════╪═══════════╪════════════╡
│ none │ 180.00 │ 1500.00 │ 0.00 │ N/A │ N/A │ 1.19 │ N/A │ N/A │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ frontend │ 181.00 │ 153.00 │ 0.00 │ 820.00 │ 527.00 │ 1.21 │ 1.18 │ 3.36 │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ frontend_pod │ 169.00 │ 140.00 │ 0.00 │ 785.00 │ 305.00 │ 1.20 │ 1.18 │ 5.39 │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ decode_worker │ 161.00 │ 140.00 │ 0.00 │ 510.00 │ 850.00 │ 1.21 │ 1.18 │ 154.11 │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ decode_worker_pod │ 181.00 │ 140.00 │ 0.00 │ 511.00 │ 849.00 │ 1.22 │ 1.18 │ 156.47 │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ vllm_decode_engine_core │ 181.00 │ 140.00 │ 0.00 │ 524.00 │ 836.00 │ 1.21 │ 1.19 │ 152.52 │
╘═════════════════════════╧═══════════╧═══════════╧══════════╧═══════════╧══════════╧═══════════╧═══════════╧════════════╛
```
| Column Name | Description |
|-----------------------|-----------------------------------------------------------------------------|
| **Failure** | Type of fault injection applied during the test (or 'none' for baseline) |
| **Startup** | Time (seconds) taken for the service to become ready after initialization |
| **Succes/nBefore** | Numoer of client requests that succeeded before fault injection |
| **Failed/nBefore** | Number of client requests that failed or were invalid before fault injection |
| **Success/nAftere** | Number of client requests that succeeded after fault injection |
| **Latency Before** | Average request latency (seconds) for successful requests before fault injection |
| **Latency After** | Average request latency (seconds) for successful requests after fault injection |
| **Recovery Time** | Time (seconds) taken for failed components to recover after fault injection |
## Example Results
The following results were obtained running on a cluster of A100
nodes.
### Aggregated Workers
#### No Redundancy
To demonstrate the failure and recovery time in the case that there is
a single instance of each process we ran a simmple "agg-tp-1-dp-1" configuration.
```mermaid
graph LR
Client["Client"]
Frontend["Frontend"]
Client --> Frontend
Frontend --> DecodePool
%% Decode Worker Pool (vertical layout)
subgraph DecodePool["Decode Worker Pool"]
direction TB
subgraph Decode1["Decode 1"]
direction TB
D1GPU0["GPU 0"]
end
end
%% Styling
style DecodePool stroke:#000,stroke-width:2px
```
#### Results:
```
Test Group: agg-tp-1-dp-1
╒═════════════════════════╤═══════════╤═══════════╤══════════╤═══════════╤══════════╤═══════════╤═══════════╤════════════╕
│ Failure │ Startup │ Success │ Failed │ Success │ Failed │ Latency │ Latency │ Recovery │
│ │ │ Before │ Before │ After │ After │ Before │ After │ │
╞═════════════════════════╪═══════════╪═══════════╪══════════╪═══════════╪══════════╪═══════════╪═══════════╪════════════╡
│ none │ 180.00 │ 1500.00 │ 0.00 │ N/A │ N/A │ 1.19 │ N/A │ N/A │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ frontend │ 181.00 │ 153.00 │ 0.00 │ 820.00 │ 527.00 │ 1.21 │ 1.18 │ 3.36 │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ frontend_pod │ 169.00 │ 140.00 │ 0.00 │ 785.00 │ 305.00 │ 1.20 │ 1.18 │ 5.39 │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ decode_worker │ 161.00 │ 140.00 │ 0.00 │ 510.00 │ 850.00 │ 1.21 │ 1.18 │ 154.11 │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ decode_worker_pod │ 181.00 │ 140.00 │ 0.00 │ 511.00 │ 849.00 │ 1.22 │ 1.18 │ 156.47 │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ vllm_decode_engine_core │ 181.00 │ 140.00 │ 0.00 │ 524.00 │ 836.00 │ 1.21 │ 1.19 │ 152.52 │
╘═════════════════════════╧═══════════╧═══════════╧══════════╧═══════════╧══════════╧═══════════╧═══════════╧════════════╛
```
#### Summary:
1. Recovery time for the decode worker itself is the largest and a decode worker failure has the largest impact (as expected)
2. Recovery time doesn't include time for the ready probe to return `ready` so even if the process is recovered early (as in the case of the Frontend) requests may fail until the pod is probed.
#### Redundant Workers (Over Provisoned)
To demonstrate the failure and recovery time in the case that there
are multiple instances of each process (except for the frontend) we
ran a simple "agg-tp-1-dp-2" configuration.
```mermaid
graph LR
Client["Client"]
Frontend_1["Frontend_1"]
Frontend_2["Frontend_2"]
Client --> Frontend_1
Client --> Frontend_2
Frontend_1 --> DecodePool
Frontend_2 --> DecodePool
subgraph DecodePool["Decode Worker Pool"]
direction LR
subgraph Decode1["Decode 1"]
direction TB
D1GPU0["GPU 0"]
end
subgraph Decode2["Decode 2"]
direction TB
D2GPU0["GPU 0"]
end
end
style DecodePool stroke:#000,stroke-width:2px
```
#### Results:
```
Test Group: agg-tp-1-dp-2
╒═════════════════════════╤═══════════╤═══════════╤══════════╤═══════════╤══════════╤═══════════╤═══════════╤════════════╕
│ Failure │ Startup │ Success │ Failed │ Success │ Failed │ Latency │ Latency │ Recovery │
│ │ │ Before │ Before │ After │ After │ Before │ After │ │
╞═════════════════════════╪═══════════╪═══════════╪══════════╪═══════════╪══════════╪═══════════╪═══════════╪════════════╡
│ none │ 181.00 │ 1500.00 │ 0.00 │ N/A │ N/A │ 1.18 │ N/A │ N/A │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ frontend │ 181.00 │ 121.00 │ 0.00 │ 1373.00 │ 6.00 │ 1.21 │ 1.17 │ 4.37 │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ frontend_pod │ 182.00 │ 122.00 │ 0.00 │ 1378.00 │ 0.00 │ 1.21 │ 1.17 │ 5.24 │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ decode_worker │ 169.00 │ 121.00 │ 0.00 │ 1374.00 │ 5.00 │ 1.20 │ 1.18 │ 153.09 │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ decode_worker_pod │ 181.00 │ 125.00 │ 0.00 │ 1369.00 │ 6.00 │ 1.21 │ 1.18 │ 152.72 │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ vllm_decode_engine_core │ 182.00 │ 120.00 │ 0.00 │ 1375.00 │ 5.00 │ 1.20 │ 1.18 │ 154.75 │
╘═════════════════════════╧═══════════╧═══════════╧══════════╧═══════════╧══════════╧═══════════╧═══════════╧════════════╛
```
#### Summary:
1. By immediately detecting a decode worker failure, Dynamo can limit
the failures and reroute requests to healthy workers with minimal
impact.
### Disaggregated Workers
#### No Redunancy
To demonstrate the failure and recovery time in the case of a
disaaggregated deployment with a single instance for each process in
the graph we ran a simple `disagg-tp-1-dp-1` configuration.
```mermaid
graph LR
Client["Client"]
Frontend["Frontend"]
Client --> Frontend
Frontend <--> DecodePool
%% Prefill Worker Pool (horizontal layout)
subgraph PrefillPool["Prefill Worker Pool"]
direction LR
subgraph Prefill1["Prefill 1"]
direction TB
P1GPU0["GPU 0"]
end
end
%% Decode Worker Pool (vertical layout)
subgraph DecodePool["Decode Worker Pool"]
direction TB
subgraph Decode1["Decode 1"]
direction TB
D1GPU0["GPU 0"]
end
end
DecodePool --> PrefillPool
PrefillPool -.-> DecodePool
%% Styling
style PrefillPool stroke:#0066cc,stroke-width:2px
style DecodePool stroke:#000,stroke-width:2px
```
#### Results:
```
Test Group: disagg-tp-1-dp-1
╒══════════════════════════╤═══════════╤═══════════╤══════════╤═══════════╤══════════╤═══════════╤═══════════╤════════════╕
│ Failure │ Startup │ Success │ Failed │ Success │ Failed │ Latency │ Latency │ Recovery │
│ │ │ Before │ Before │ After │ After │ Before │ After │ │
╞══════════════════════════╪═══════════╪═══════════╪══════════╪═══════════╪══════════╪═══════════╪═══════════╪════════════╡
│ none │ 175.00 │ 1500.00 │ 0.00 │ N/A │ N/A │ 1.99 │ N/A │ N/A │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ frontend │ 182.00 │ 100.00 │ 0.00 │ 817.00 │ 583.00 │ 1.91 │ 2.00 │ 4.28 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ frontend_pod │ 181.00 │ 81.00 │ 0.00 │ 1024.00 │ 395.00 │ 2.31 │ 1.96 │ 5.53 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ decode_worker │ 181.00 │ 82.00 │ 0.00 │ 560.00 │ 858.00 │ 2.26 │ 1.98 │ 155.79 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ decode_worker_pod │ 181.00 │ 92.00 │ 0.00 │ 566.00 │ 842.00 │ 2.21 │ 1.83 │ 174.15 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ prefill_worker │ 182.00 │ 84.00 │ 0.00 │ 1346.00 │ 70.00 │ 2.22 │ 1.49 │ 153.53 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ prefill_worker_pod │ 161.00 │ 83.00 │ 0.00 │ 1362.00 │ 55.00 │ 2.21 │ 1.51 │ 154.18 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ vllm_decode_engine_core │ 167.00 │ 81.00 │ 0.00 │ 569.00 │ 850.00 │ 2.33 │ 2.12 │ 153.81 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ vllm_prefill_engine_core │ 182.00 │ 83.00 │ 0.00 │ 568.00 │ 849.00 │ 2.24 │ 2.00 │ 153.84 │
╘══════════════════════════╧═══════════╧═══════════╧══════════╧═══════════╧══════════╧═══════════╧═══════════╧════════════╛
```
#### Summary:
1. Prefill worker engine failure causes decode engine failure.
2. When prefill workers fail gracefully, decode workers will automatically do prefill as well.
#### Redundant Workers
To demonstrate the failure and recovery time in the case that there
are multiple instances of each process (except for the frontend and
decode worker) we ran a simple "disagg-tp-1-dp-2"
configuration.
```mermaid
graph LR
Client["Client"]
Frontend_1["Frontend 1"]
Frontend_2["Frontend 2"]
Client --> Frontend_1
Client --> Frontend_2
Frontend_1 <--> DecodePool
Frontend_2 <--> DecodePool
%% Prefill Worker Pool (horizontal layout)
subgraph PrefillPool["Prefill Worker Pool"]
direction LR
subgraph Prefill1["Prefill 1"]
direction TB
P1GPU0["GPU 0"]
end
subgraph Prefill2["Prefill 2"]
direction TB
P2GPU0["GPU 0"]
end
end
%% Decode Worker Pool (vertical layout)
subgraph DecodePool["Decode Worker Pool"]
direction TB
subgraph Decode1["Decode 1"]
direction TB
D1GPU0["GPU 0"]
end
end
DecodePool --> PrefillPool
PrefillPool -.-> DecodePool
%% Styling
style PrefillPool stroke:#0066cc,stroke-width:2px
style DecodePool stroke:#000,stroke-width:2px
```
#### Results:
```
Test Group: disagg-tp-1-dp-2
╒══════════════════════════╤═══════════╤═══════════╤══════════╤═══════════╤══════════╤═══════════╤═══════════╤════════════╕
│ Failure │ Startup │ Success │ Failed │ Success │ Failed │ Latency │ Latency │ Recovery │
│ │ │ Before │ Before │ After │ After │ Before │ After │ │
╞══════════════════════════╪═══════════╪═══════════╪══════════╪═══════════╪══════════╪═══════════╪═══════════╪════════════╡
│ none │ 181.00 │ 1500.00 │ 0.00 │ N/A │ N/A │ 1.47 │ N/A │ N/A │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ frontend │ 182.00 │ 100.00 │ 0.00 │ 1390.00 │ 10.00 │ 1.75 │ 1.43 │ 4.32 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ frontend_pod │ 182.00 │ 91.00 │ 0.00 │ 1409.00 │ 0.00 │ 1.78 │ 1.43 │ 5.48 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ decode_worker │ 182.00 │ 94.00 │ 0.00 │ 1404.00 │ 2.00 │ 1.78 │ 1.58 │ 154.30 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ decode_worker_pod │ 181.00 │ 100.00 │ 0.00 │ 1394.00 │ 6.00 │ 1.75 │ 1.57 │ 153.00 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ prefill_worker │ 172.00 │ 90.00 │ 0.00 │ 1408.00 │ 2.00 │ 1.78 │ 1.44 │ 154.68 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ prefill_worker_pod │ 174.00 │ 100.00 │ 0.00 │ 1398.00 │ 2.00 │ 1.74 │ 1.41 │ 155.59 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ vllm_decode_engine_core │ 181.00 │ 91.00 │ 0.00 │ 1403.00 │ 6.00 │ 1.79 │ 1.56 │ 157.54 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ vllm_prefill_engine_core │ 181.00 │ 94.00 │ 0.00 │ 1404.00 │ 2.00 │ 1.77 │ 1.43 │ 154.10 │
╘══════════════════════════╧═══════════╧═══════════╧══════════╧═══════════╧══════════╧═══════════╧═══════════╧════════════╛
```
#### Summary:
1. Redundant prefill workers are able to absorb the load.
2. When prefill workers go down, decode workers can also do prefill locally.
## Quick Start
### Install Dynamo Platform
Follow the instructions to install `Dynamo` in your Kubernetes cluster.
[https://github.com/ai-dynamo/dynamo/blob/main/docs/guides/dynamo_deploy/installation_guide.md]
### Mount Workspace and Kube Config
Ensure you are able to run a `Dynamo` deployment directly from your host.
Then run the development container mounting the workspace and your kube config.
```
./container/run.sh --mount-workspace -it -v ~/.kube:/root/.kube
```
### Run the tests
```
pytest tests/fault_tolerance/deploy/test_deployment.py -s -v --namespace ${NAMESPACE} --image ${IMAGE}
```
### Note on Running with Additional Credentials
When running on an cluster that requires additional authentication (such as `AKS`) in addition you will need
to authenticate and install cli as appropriate in to the container. As an example, before running the tests you
in an `AKS` cluster you would need to do the following:
```
# In case you have multiple configs
export KUBECONFIG=~/.kube/dynamo-kubeconfig
curl -sL https://aka.ms/InstallAzureCLIDeb
az aks install-cli
az login
```
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import os
import random
import time
from copy import deepcopy
from datetime import datetime
from typing import Any, Dict
import requests
from tests.utils.managed_deployment import ManagedDeployment
LOG_FORMAT = "[TEST] %(asctime)s %(levelname)s %(name)s: %(message)s"
DATE_FORMAT = "%Y-%m-%dT%H:%M:%S"
payload = {
"model": "",
"messages": [
{
"role": "user",
"content": "",
}
],
"max_tokens": 0,
"temperature": 0.1,
# "seed": 10,
"ignore_eos": True,
"min_tokens": 0,
"stream": False,
}
# Configure logging
logging.basicConfig(
level=logging.INFO,
format=LOG_FORMAT,
datefmt=DATE_FORMAT, # ISO 8601 UTC format
)
def _get_random_prompt(length):
word_list = [f"{i}" for i in range(10)]
return " ".join(random.choices(word_list, k=length))
def _single_request(
url,
pod,
payload,
model,
logger,
retry_attempts=1,
input_token_length=100,
output_token_length=100,
timeout=30,
retry_delay=1,
):
prompt = _get_random_prompt(input_token_length)
payload_copy = deepcopy(payload)
payload_copy["messages"][0]["content"] = prompt
payload_copy["max_tokens"] = output_token_length
payload_copy["min_tokens"] = output_token_length
payload_copy["model"] = model
response = None
end_time = None
start_time = time.time()
results = []
while retry_attempts:
start_request_time = time.time()
response = None
try:
response = requests.post(
url,
json=payload_copy,
timeout=timeout,
)
end_time = time.time()
content = None
try:
content = response.json()
except ValueError:
pass
results.append(
{
"status": response.status_code,
"result": content,
"request_elapsed_time": end_time - start_request_time,
"url": url,
"pod": pod,
}
)
if response.status_code != 200:
time.sleep(retry_delay)
retry_attempts -= 1
continue
else:
break
except (requests.RequestException, requests.Timeout) as e:
results.append(
{
"status": str(e),
"result": None,
"request_elapsed_time": time.time() - start_request_time,
"url": url,
"pod": pod,
}
)
time.sleep(retry_delay)
retry_attempts -= 1
continue
return {
"time": datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
"results": results,
"total_time": time.time() - start_time,
"url": url,
"pod": pod,
}
def client(
deployment_spec,
namespace,
model,
log_dir,
index,
requests_per_client,
input_token_length,
output_token_length,
max_retries,
max_request_rate,
retry_delay=1,
):
logger = logging.getLogger(f"CLIENT: {index}")
logging.getLogger("httpx").setLevel(logging.WARNING)
managed_deployment = ManagedDeployment(log_dir, deployment_spec, namespace)
pod_ports: Dict[str, Any] = {}
min_elapsed_time = (1 / max_request_rate) if max_request_rate > 0 else 0.0
try:
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, f"client_{index}.log.txt")
with open(log_path, "w") as log:
for i in range(requests_per_client):
pods = managed_deployment.get_pods(
managed_deployment.frontend_service_name
)
port = 0
pod_name = None
pods_ready = []
for pod in pods[managed_deployment.frontend_service_name]:
if pod.ready():
pods_ready.append(pod)
else:
if pod.name in pod_ports:
pod_ports[pod.name].stop()
del pod_ports[pod.name]
if pods_ready:
pod = pods_ready[i % len(pods_ready)]
if pod.name not in pod_ports:
port_forward = managed_deployment.port_forward(
pod, deployment_spec.port
)
if port_forward:
pod_ports[pod.name] = port_forward
if pod.name in pod_ports:
port = pod_ports[pod.name].local_port
pod_name = pod.name
url = f"http://localhost:{port}/{deployment_spec.endpoint}"
result = _single_request(
url,
pod_name,
payload,
model,
logger,
max_retries,
input_token_length=input_token_length,
output_token_length=output_token_length,
retry_delay=retry_delay,
)
logger.info(
f"Request: {i} Pod {pod_name} Local Port {port} Status: {result['results'][-1]['status']} Latency: {result['results'][-1]['request_elapsed_time']}"
)
log.write(json.dumps(result) + "\n")
log.flush()
if result["total_time"] < min_elapsed_time:
time.sleep(min_elapsed_time - result["total_time"])
except Exception as e:
logger.error(str(e))
logger.info("Exiting")
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
def pytest_addoption(parser):
parser.addoption("--image", type=str, default=None)
parser.addoption("--namespace", type=str, default="fault-tolerance-test")
@pytest.fixture
def image(request):
return request.config.getoption("--image")
@pytest.fixture
def namespace(request):
return request.config.getoption("--namespace")
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import json
import os
import re
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
from tabulate import tabulate
def parse_test_log(file_path):
start_time = None
ready_time = None
fault_time = None
start_cmd: Optional[List[str]] = None
if not os.path.isfile(file_path):
return None, None, None
with open(file_path, "r") as f:
for line in f:
line = line.strip()
if "Starting Deployment fault-tolerance-test with spec" in line:
start_time = datetime.fromisoformat(
line.split(" ")[1].replace("T", " ")
)
start_cmd = []
elif "Deployment fault-tolerance-test is ready" in line:
ready_time = datetime.fromisoformat(
line.split(" ")[1].replace("T", " ")
)
elif "Injecting failure for:" in line:
fault_time = datetime.fromisoformat(
line.split(" ")[1].replace("T", " ")
)
startup_time = (
(ready_time - start_time).total_seconds() if start_time and ready_time else None
)
return startup_time, fault_time, start_cmd
def parse_client_logs(test_dir, expected_length=100):
all_logs = []
for file in os.listdir(test_dir):
if file.startswith("client_") and file.endswith(".log.txt"):
with open(os.path.join(test_dir, file), "r") as f:
request_number = 0
for line in f:
request_number += 1
data = json.loads(line.strip())
for result in data["results"]:
log_entry = {
"time": datetime.fromisoformat(
data["time"].replace("T", " ")
),
"status": result["status"],
"request_elapsed_time": result["request_elapsed_time"],
"request_number": request_number - 1,
"client": file.split("_")[1].split(".")[0],
}
if (
"result" in result
and result["result"]
and "choices" in result["result"]
and result["result"]["choices"]
):
log_entry["success"] = True
if "content" in result["result"]["choices"][0]["message"]:
content = result["result"]["choices"][0]["message"][
"content"
]
elif (
"reasoning_content"
in result["result"]["choices"][0]["message"]
):
content = result["result"]["choices"][0]["message"][
"reasoning_content"
]
if not content or len(content) < expected_length:
log_entry["success"] = False
else:
log_entry["success"] = False
all_logs.append(log_entry)
if len(all_logs):
df = pd.DataFrame(all_logs)
df.sort_values("time", inplace=True)
return df
return None
def calculate_metrics(df, fault_time, sla=None):
if fault_time:
before_fault = df[df["time"] <= fault_time]
after_fault = df[df["time"] > fault_time]
else:
before_fault = df
after_fault = None
# Existing latency metrics (only successful requests)
successful_before = before_fault[before_fault["success"]]
avg_before = successful_before["request_elapsed_time"].mean()
std_before = successful_before["request_elapsed_time"].std()
success_before_count = before_fault["success"].sum()
failure_before_count = len(before_fault) - success_before_count
avg_after, std_after, success_after_count, failure_after_count = (
None,
None,
None,
None,
)
if after_fault is not None and not after_fault.empty:
successful_after = after_fault[after_fault["success"]]
avg_after = successful_after["request_elapsed_time"].mean()
std_after = successful_after["request_elapsed_time"].std()
success_after_count = after_fault["success"].sum()
failure_after_count = len(after_fault) - success_after_count
if sla:
# SLA violations (only successful requests exceeding the SLA)
violations_before = (successful_before["request_elapsed_time"] > sla).sum()
violations_after = (
(successful_after["request_elapsed_time"] > sla).sum()
if after_fault is not None and not after_fault.empty
else None
)
else:
violations_before = None
violations_after = None
return (
success_before_count,
failure_before_count,
success_after_count,
failure_after_count,
avg_before,
std_before,
avg_after,
std_after,
violations_before,
violations_after,
)
def parse_process_log(log_dir, process_name):
process_ready_pattern = {
"Frontend": re.compile(r"added model"),
"VllmDecodeWorker": re.compile(
r"VllmWorker for (?P<model_name>.*?) has been initialized"
),
"VllmPrefillWorker": re.compile(
r"VllmWorker for (?P<model_name>.*?) has been initialized"
),
}
if not os.path.isdir(log_dir):
return {}
ready_times: Dict[str, List[Tuple[datetime, str, float]]] = {}
for entry in os.listdir(log_dir):
if entry.endswith(".log") and "metrics" not in entry:
replica_number = entry.split(".")[0]
if replica_number not in ready_times:
ready_times[replica_number] = []
process_start_time = None
with open(os.path.join(log_dir, entry), "r") as f:
for line in f:
line = line.strip()
if not line:
continue
# Try to parse as JSONL first
try:
json_data = json.loads(line)
# Extract timestamp and message from JSON format
if "time" in json_data:
timestamp = datetime.fromisoformat(
json_data["time"].replace("Z", "")
)
log_message = json_data.get("message", "")
else:
continue
except (json.JSONDecodeError, ValueError, KeyError):
# Fall back to readable format parsing
clean_line = re.sub(
r"\x1b\[.*?m", "", line
) # Remove ANSI codes
if not clean_line:
continue
parts = clean_line.split()
if len(parts) < 2:
continue
try:
# Parse timestamp (remove 'Z' for naive datetime)
timestamp = datetime.fromisoformat(
parts[0].replace("Z", "")
)
except ValueError:
continue
log_message = " ".join(parts[1:])
if not process_start_time:
process_start_time = timestamp
relative_time = (timestamp - process_start_time).total_seconds()
# Check for process start lines
if process_name in process_ready_pattern:
if process_ready_pattern[process_name].search(log_message):
if "previous" in entry:
location = 0
else:
location = -1
ready_times[replica_number].insert(
location, (timestamp, log_message, relative_time)
)
return ready_times
def calculate_recovery_time(test_dir, failure_type, fault_time):
if not fault_time:
return None
processes = [
"Frontend",
"VllmDecodeWorker",
"VllmPrefillWorker",
]
process_start = {}
start_time = None
for process in processes:
starts = parse_process_log(os.path.join(test_dir, process), process)
if starts:
process_start[process] = starts
last_recovery_time = 0
for process, replicas in process_start.items():
for replica, container_starts in replicas.items():
for starts in container_starts:
start_time = starts[0]
recovery_time = (start_time - fault_time).total_seconds()
if recovery_time > last_recovery_time:
last_recovery_time = recovery_time
if last_recovery_time == 0:
return None
return last_recovery_time
def process_test_directory(test_dir, sla):
if "test_fault_scenario" not in test_dir:
return {}
test_name = test_dir.split("test_fault_scenario[", 1)[1].rstrip("]")
failure_type = test_name.split("-")[-1]
test_prefix = "-".join(test_name.split("-")[:-1])
startup_time, fault_time, start_cmd = parse_test_log(
os.path.join(test_dir, "test.log.txt")
)
df = parse_client_logs(test_dir)
if df is None or df.empty:
return None
(
success_before,
failure_before,
success_after,
failure_after,
avg_before,
std_before,
avg_after,
std_after,
violations_before,
violations_after,
) = calculate_metrics(df, fault_time, sla)
recovery_time = calculate_recovery_time(test_dir, failure_type, fault_time)
return {
"test": test_prefix,
"cmd": start_cmd,
"failure": failure_type,
"start_time": startup_time,
"success_before_requests": success_before,
"failed_before_requests": failure_before,
"success_after_requests": success_after,
"failed_after_requests": failure_after,
"avg_latency_before": avg_before,
"std_latency_before": std_before,
"avg_latency_after": avg_after,
"std_latency_after": std_after,
"violations_before": violations_before,
"violations_after": violations_after,
"recovery_time": recovery_time,
}
def main(logs_dir, tablefmt, log_paths=None, sla=None):
results = []
if log_paths:
for log_path in log_paths:
result = process_test_directory(log_path, sla)
if result:
results.append(result)
elif logs_dir:
for entry in os.listdir(logs_dir):
if entry.startswith("test_fault_scenario[") and os.path.isdir(
os.path.join(logs_dir, entry)
):
result = process_test_directory(os.path.join(logs_dir, entry), sla)
if result:
results.append(result)
# Group results by test prefix
grouped: dict[str, list[dict[str, Any]]] = {}
commands = {}
for res in results:
test_prefix = res["test"]
if test_prefix not in grouped:
grouped[test_prefix] = []
commands[test_prefix] = res["cmd"]
grouped[test_prefix].append(res)
order = [
"none",
"frontend",
"frontend_pod",
"decode_worker",
"decode_worker_pod",
"prefill_worker",
"prefill_worker_pod",
"vllm_decode_engine_core",
"vllm_prefill_engine_core",
]
# Print grouped tables
for test_prefix, group in grouped.items():
new_group = []
for failure in order:
for res in group:
if failure == res["failure"]:
new_group.append(res)
group = new_group
if sla:
headers = [
"Failure",
"Startup",
"Success\nBefore",
"Failed\nBefore",
"Success\nAfter",
"Failed\nAfter",
"Latency\nBefore",
"Latency\nAfter",
"Violations\nBefore",
"Violations\nAfter",
"Recovery",
]
else:
headers = [
"Failure",
"Startup",
"Success\nBefore",
"Failed\nBefore",
"Success\nAfter",
"Failed\nAfter",
"Latency\nBefore",
"Latency\nAfter",
"Recovery",
]
rows = []
for res in group:
if sla:
row = [
res["failure"],
res["start_time"], # if res["start_time"] is not None else "N/A",
res["success_before_requests"],
res["failed_before_requests"],
res["success_after_requests"],
res["failed_after_requests"],
res["avg_latency_before"],
res["avg_latency_after"],
res["violations_before"],
res["violations_after"],
res["recovery_time"],
]
else:
row = [
res["failure"],
res["start_time"], # if res["start_time"] is not None else "N/A",
res["success_before_requests"],
res["failed_before_requests"],
res["success_after_requests"],
res["failed_after_requests"],
res["avg_latency_before"],
res["avg_latency_after"],
res["recovery_time"],
]
rows.append(row)
print(f"\nTest Group: {test_prefix}")
# print(f"\nTest Command: {commands[test_prefix]}")
print(
tabulate(
rows,
headers,
tablefmt=tablefmt,
floatfmt=".2f",
missingval="N/A",
numalign="right",
stralign="center",
)
)
print("\n" + "=" * 80)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Parse test results")
parser.add_argument("--log-dir", default=".", help="Path to the logs directory")
parser.add_argument(
"--format", choices=["fancy", "markdown"], default="fancy", help="Table format"
)
parser.add_argument("--sla", type=float, default=None)
args = parser.parse_args()
# Map format choices to tabulate formats
tablefmt = (
"fancy_grid" if args.format == "fancy" else "pipe"
) # Using pipe for markdown compatibility
main(args.log_dir, tablefmt, args.sla)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from typing import Optional
from tests.utils.managed_deployment import DeploymentSpec
@dataclass
class Load:
clients: int = 10
requests_per_client: int = 150
input_token_length: int = 100
output_token_length: int = 100
max_retries: int = 1
max_request_rate: float = 1
sla: Optional[float] = None
@dataclass
class Failure:
time: int
pod_name: str
command: str
signal: str = "SIGINT"
replicas: int = 1
@dataclass
class Scenario:
deployment: DeploymentSpec
load: Load
failures: list[Failure]
model: Optional[str] = None
# Each Deployment Spec contains
# the dynamo deployment configuration
deployment_specs = {
"agg-tp-1-dp-1": (
DeploymentSpec("/workspace/components/backends/vllm/deploy/agg.yaml")
),
"disagg-tp-1-dp-1": (
DeploymentSpec("/workspace/components/backends/vllm/deploy/disagg.yaml")
),
}
# TP-2 scenarios
deployment_specs["agg-tp-2-dp-1"] = DeploymentSpec(
"/workspace/components/backends/vllm/deploy/agg.yaml"
)
deployment_specs["agg-tp-2-dp-1"].set_tensor_parallel(2, ["VllmDecodeWorker"])
deployment_specs["disagg-prefill-tp-2-decode-tp-2-dp-1"] = DeploymentSpec(
"/workspace/components/backends/vllm/deploy/disagg.yaml"
)
deployment_specs["disagg-prefill-tp-2-decode-tp-2-dp-1"][
"VllmPrefillWorker"
].tensor_parallel_size = 2
deployment_specs["disagg-prefill-tp-2-decode-tp-2-dp-1"][
"VllmDecodeWorker"
].tensor_parallel_size = 2
# TP-4 scenarios
deployment_specs["agg-tp-4-dp-1"] = DeploymentSpec(
"/workspace/components/backends/vllm/deploy/agg.yaml"
)
deployment_specs["agg-tp-4-dp-1"].set_tensor_parallel(4, ["VllmDecodeWorker"])
deployment_specs["disagg-prefill-tp-4-decode-tp-4-dp-1"] = DeploymentSpec(
"/workspace/components/backends/vllm/deploy/disagg.yaml"
)
deployment_specs["disagg-prefill-tp-4-decode-tp-4-dp-1"][
"VllmPrefillWorker"
].tensor_parallel_size = 4
deployment_specs["disagg-prefill-tp-4-decode-tp-4-dp-1"][
"VllmDecodeWorker"
].tensor_parallel_size = 4
# Derivative Specs With Incremented Replicats
deployment_specs["agg-tp-1-dp-2"] = DeploymentSpec(
"/workspace/components/backends/vllm/deploy/agg.yaml"
)
deployment_specs["agg-tp-1-dp-2"]["Frontend"].replicas = 2
deployment_specs["agg-tp-1-dp-2"]["VllmDecodeWorker"].replicas = 2
deployment_specs["disagg-tp-1-dp-2"] = DeploymentSpec(
"/workspace/components/backends/vllm/deploy/disagg.yaml"
)
deployment_specs["disagg-tp-1-dp-2"]["Frontend"].replicas = 2
deployment_specs["disagg-tp-1-dp-2"]["VllmDecodeWorker"].replicas = 2
deployment_specs["disagg-tp-1-dp-2"]["VllmPrefillWorker"].replicas = 2
# Each failure scenaro contains a list of failure injections
# Each failure injection has a time in seconds after the pervious injection and
# a list of failures to inject including the number of failures for each type.
# Failures are currently process termination or pod deletion
#
# Example:
#
# "prefill_worker": [[30, [("dynamo_prefillworker", 1)]]],
#
# terminates 1 prefill worker after 30 seconds
failures = {
"frontend": [Failure(30, "Frontend", "dynamo.frontend")],
"frontend_pod": [Failure(30, "Frontend", "delete_pod")],
"decode_worker": [Failure(30, "VllmDecodeWorker", "dynamo.vllm", "SIGKILL")],
"decode_worker_pod": [Failure(30, "VllmDecodeWorker", "delete_pod")],
"prefill_worker": [Failure(30, "VllmPrefillWorker", "dynamo.vllm", "SIGKILL")],
"prefill_worker_pod": [Failure(30, "VllmPrefillWorker", "delete_pod")],
"vllm_decode_engine_core": [
Failure(30, "VllmDecodeWorker", "VLLM::EngineCore", "SIGKILL")
],
"vllm_prefill_engine_core": [
Failure(30, "VllmPrefillWorker", "VLLM::EngineCore", "SIGKILL")
],
"none": [],
}
load = Load()
# model = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B"
model = None
# Populate Scenarios
scenarios = {}
for deployment_name, deployment_spec in deployment_specs.items():
for failure_name, failure in failures.items():
if "prefill" in failure_name and "disagg" not in deployment_name:
continue
scenarios[f"{deployment_name}-{failure_name}"] = Scenario(
deployment=deployment_spec, load=load, failures=failure, model=model
)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import logging
import multiprocessing
import time
from contextlib import contextmanager
import pytest
from tests.fault_tolerance.deploy.client import client
from tests.fault_tolerance.deploy.parse_results import main as parse_results
from tests.fault_tolerance.deploy.scenarios import scenarios
from tests.utils.managed_deployment import ManagedDeployment
@pytest.fixture(params=scenarios.keys())
def scenario(request):
return scenarios[request.param]
@contextmanager
def _clients(
logger,
num_clients,
request,
deployment_spec,
namespace,
model,
requests_per_client,
input_token_length,
output_token_length,
max_retries,
max_request_rate,
):
procs = []
ctx = multiprocessing.get_context("spawn")
for i in range(num_clients):
procs.append(
ctx.Process(
target=client,
args=(
deployment_spec,
namespace,
model,
request.node.name,
i,
requests_per_client,
input_token_length,
output_token_length,
max_retries,
max_request_rate,
),
)
)
procs[-1].start()
yield procs
for proc in procs:
logger.debug(f"{proc} waiting for join")
proc.join()
logger.debug(f"{proc} joined")
def _inject_failures(failures, logger, deployment: ManagedDeployment): # noqa: F811
for failure in failures:
time.sleep(failure.time)
pods = deployment.get_pods(failure.pod_name)[failure.pod_name]
num_pods = len(pods)
if not pods:
continue
replicas = failure.replicas
if not replicas:
replicas = num_pods
logger.info(f"Injecting failure for: {failure}")
for x in range(replicas):
pod = pods[x % num_pods]
if failure.command == "delete_pod":
deployment.get_pod_logs(failure.pod_name, pod, ".before_delete")
pod.delete(force=True)
else:
processes = deployment.get_processes(pod)
for process in processes:
if failure.command in process.command:
logger.info(
f"Terminating {failure.pod_name} Pid {process.pid} Command {process.command}"
)
process.kill(failure.signal)
global_result_list = []
@pytest.fixture(autouse=True)
def results_table(request, scenario): # noqa: F811
yield
parse_results(
logs_dir=None,
log_paths=[request.node.name],
tablefmt="fancy_grid",
sla=scenario.load.sla,
)
global_result_list.append(request.node.name)
@pytest.fixture(autouse=True, scope="session")
def results_summary():
yield
parse_results(
logs_dir=None,
log_paths=global_result_list,
tablefmt="fancy_grid",
)
@pytest.mark.e2e
@pytest.mark.slow
@pytest.mark.filterwarnings("ignore::DeprecationWarning")
async def test_fault_scenario(
scenario, # noqa: F811
request,
image,
namespace,
):
"""
Test dynamo serve deployments with injected failures
"""
logger = logging.getLogger(request.node.name)
scenario.deployment.disable_grove()
scenario.deployment.name = "fault-tolerance-test"
if image:
scenario.deployment.set_image(image)
if scenario.model:
scenario.deployment.set_model(scenario.model)
model = scenario.model
else:
model = scenario.deployment["VllmDecodeWorker"].model
scenario.deployment.set_logging(True, "info")
async with ManagedDeployment(
namespace=namespace,
log_dir=request.node.name,
deployment_spec=scenario.deployment,
) as deployment:
with _clients(
logger,
scenario.load.clients,
request,
scenario.deployment,
namespace,
model,
scenario.load.requests_per_client,
scenario.load.input_token_length,
scenario.load.output_token_length,
scenario.load.max_retries,
scenario.load.max_request_rate,
):
_inject_failures(scenario.failures, logger, deployment)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import asyncio
import logging
import os
import re
import shlex
import time
from dataclasses import dataclass
from typing import Any, Optional
import kr8s
import kubernetes
import requests
import yaml
from kr8s.objects import Pod as kr8s_Pod
from kr8s.objects import Service as kr8s_Service
from kubernetes_asyncio import client, config
class ServiceSpec:
"""Wrapper around a single service in the deployment spec."""
def __init__(self, service_name: str, service_spec: dict):
self._name = service_name
self._spec = service_spec
@property
def name(self) -> str:
"""The service name (read-only)"""
return self._name
# ----- Image -----
@property
def image(self) -> Optional[str]:
"""Container image for the service"""
try:
return self._spec["extraPodSpec"]["mainContainer"]["image"]
except KeyError:
return None
@image.setter
def image(self, value: str):
if "extraPodSpec" not in self._spec:
self._spec["extraPodSpec"] = {"mainContainer": {}}
if "mainContainer" not in self._spec["extraPodSpec"]:
self._spec["extraPodSpec"]["mainContainer"] = {}
self._spec["extraPodSpec"]["mainContainer"]["image"] = value
# ----- Replicas -----
@property
def replicas(self) -> int:
return self._spec.get("replicas", 0)
@replicas.setter
def replicas(self, value: int):
self._spec["replicas"] = value
@property
def model(self) -> Optional[str]:
"""Model being served by this service"""
try:
args_list = self._spec["extraPodSpec"]["mainContainer"]["args"]
except KeyError:
return None
args_str = " ".join(args_list)
parts = shlex.split(args_str)
for i, part in enumerate(parts):
if part == "--model":
return parts[i + 1] if i + 1 < len(parts) else None
return None
@model.setter
def model(self, value: str):
if "extraPodSpec" not in self._spec:
return
if "mainContainer" not in self._spec["extraPodSpec"]:
return
args_list = self._spec["extraPodSpec"]["mainContainer"].get("args", [])
args_str = " ".join(args_list)
parts = shlex.split(args_str)
model_index = None
for i, part in enumerate(parts):
if part == "--model":
model_index = i
break
if model_index is not None:
if model_index + 1 < len(parts):
parts[model_index + 1] = value
else:
return
else:
return
self._spec["extraPodSpec"]["mainContainer"]["args"] = [" ".join(parts)]
# ----- GPUs -----
@property
def gpus(self) -> int:
try:
return int(self._spec["resources"]["limits"]["gpu"])
except KeyError:
return 0
@gpus.setter
def gpus(self, value: int):
if "resources" not in self._spec:
self._spec["resources"] = {}
if "limits" not in self._spec["resources"]:
self._spec["resources"]["limits"] = {}
self._spec["resources"]["limits"]["gpu"] = str(value)
@property
def tensor_parallel_size(self) -> int:
"""Get tensor parallel size from vLLM arguments"""
try:
args_list = self._spec["extraPodSpec"]["mainContainer"]["args"]
except KeyError:
return 1 # Default tensor parallel size
args_str = " ".join(args_list)
parts = shlex.split(args_str)
for i, part in enumerate(parts):
if part == "--tensor-parallel-size":
return int(parts[i + 1]) if i + 1 < len(parts) else 1
return 1
@tensor_parallel_size.setter
def tensor_parallel_size(self, value: int):
if "extraPodSpec" not in self._spec:
return
if "mainContainer" not in self._spec["extraPodSpec"]:
return
args_list = self._spec["extraPodSpec"]["mainContainer"].get("args", [])
args_str = " ".join(args_list)
parts = shlex.split(args_str)
# Find existing tensor-parallel-size argument
tp_index = None
for i, part in enumerate(parts):
if part == "--tensor-parallel-size":
tp_index = i
break
if tp_index is not None:
# Update existing value
if tp_index + 1 < len(parts):
parts[tp_index + 1] = str(value)
else:
parts.append(str(value))
else:
# Add new argument
parts.extend(["--tensor-parallel-size", str(value)])
self._spec["extraPodSpec"]["mainContainer"]["args"] = [" ".join(parts)]
# Auto-adjust GPU count to match tensor parallel size
self.gpus = value
class DeploymentSpec:
def __init__(
self, base: str, endpoint="/v1/chat/completions", port=8000, system_port=9090
):
"""Load the deployment YAML file"""
with open(base, "r") as f:
self._deployment_spec = yaml.safe_load(f)
self._endpoint = endpoint
self._port = port
self._system_port = system_port
@property
def name(self) -> str:
"""Deployment name"""
return self._deployment_spec["metadata"]["name"]
@name.setter
def name(self, value: str):
self._deployment_spec["metadata"]["name"] = value
@property
def port(self) -> int:
"""Deployment port"""
return self._port
@property
def system_port(self) -> int:
"""Deployment port"""
return self._system_port
@property
def endpoint(self) -> str:
return self._endpoint
@property
def namespace(self) -> str:
"""Deployment namespace"""
return self._deployment_spec["metadata"]["namespace"]
@namespace.setter
def namespace(self, value: str):
self._deployment_spec["metadata"]["namespace"] = value
def disable_grove(self):
if "annotations" not in self._deployment_spec["metadata"]:
self._deployment_spec["metadata"]["annotations"] = {}
self._deployment_spec["metadata"]["annotations"][
"nvidia.com/enable-grove"
] = "false"
def set_model(self, model: str, service_name: Optional[str] = None):
if service_name is None:
services = self.services
else:
services = [self[service_name]]
for service in services:
service.model = model
def set_image(self, image: str, service_name: Optional[str] = None):
if service_name is None:
services = self.services
else:
services = [self[service_name]]
for service in services:
service.image = image
def set_tensor_parallel(self, tp_size: int, service_names: Optional[list] = None):
"""Scale deployment for different tensor parallel configurations
Args:
tp_size: Target tensor parallel size
service_names: List of service names to update (defaults to worker services)
"""
if service_names is None:
# Auto-detect worker services (services with GPU requirements)
service_names = [svc.name for svc in self.services if svc.gpus > 0]
for service_name in service_names:
service = self[service_name]
service.tensor_parallel_size = tp_size
service.gpus = tp_size
def set_logging(self, enable_jsonl: bool = True, log_level: str = "debug"):
"""Configure logging for the deployment
Args:
enable_jsonl: Enable JSON line logging (sets DYN_LOGGING_JSONL=true)
log_level: Set log level (sets DYN_LOG to specified level)
"""
spec = self._deployment_spec
if "envs" not in spec["spec"]:
spec["spec"]["envs"] = []
# Remove any existing logging env vars to avoid duplicates
spec["spec"]["envs"] = [
env
for env in spec["spec"]["envs"]
if env.get("name") not in ["DYN_LOGGING_JSONL", "DYN_LOG"]
]
if enable_jsonl:
spec["spec"]["envs"].append({"name": "DYN_LOGGING_JSONL", "value": "true"})
if log_level:
spec["spec"]["envs"].append({"name": "DYN_LOG", "value": log_level})
def get_logging_config(self) -> dict:
"""Get current logging configuration
Returns:
dict with 'jsonl_enabled' and 'log_level' keys
"""
envs = self._deployment_spec.get("spec", {}).get("envs", [])
jsonl_enabled = False
log_level = None
for env in envs:
if env.get("name") == "DYN_LOGGING_JSONL":
jsonl_enabled = env.get("value") in ["true", "1"]
elif env.get("name") == "DYN_LOG":
log_level = env.get("value")
return {"jsonl_enabled": jsonl_enabled, "log_level": log_level}
@property
def services(self) -> list:
"""List of ServiceSpec objects"""
return [
ServiceSpec(svc, spec)
for svc, spec in self._deployment_spec["spec"]["services"].items()
]
def __getitem__(self, service_name: str) -> ServiceSpec:
"""Allow dict-like access: d['Frontend']"""
return ServiceSpec(
service_name, self._deployment_spec["spec"]["services"][service_name]
)
def spec(self):
return self._deployment_spec
def save(self, out_file: str):
"""Save updated deployment to file"""
with open(out_file, "w") as f:
yaml.safe_dump(self._deployment_spec, f, default_flow_style=False)
class PodProcess:
def __init__(self, pod: kr8s_Pod, line: str):
self.pid = int(re.split(r"\s+", line)[1])
self.command = " ".join(
re.split(r"\s+", line)[10:]
) # Columns 10+ are the command
self._pod = pod
def kill(self, signal=None):
"""Kill this process in the given pod"""
if not signal:
if self.pid == 1:
signal = "SIGINT"
else:
signal = "SIGKILL"
return self._pod.exec(["kill", f"-{signal}", str(self.pid)])
def wait(self, timeout: int = 60):
"""Wait for this process to exit in the given pod"""
# Simple implementation; adjust as needed
for _ in range(timeout):
try:
result = self._pod.exec(
["kill", "-0", str(self.pid)]
) # Check if process exists
if result.returncode != 0:
return True # Process exited
time.sleep(1)
except Exception:
return True
return False # Timed out
@dataclass
class ManagedDeployment:
log_dir: str
deployment_spec: DeploymentSpec
namespace: str
frontend_service_name: Optional[str] = "Frontend"
_custom_api: Optional[Any] = None
_core_api: Optional[Any] = None
_in_cluster: bool = False
_logger: logging.Logger = logging.getLogger()
_port_forward: Optional[Any] = None
_deployment_name: Optional[str] = None
_apps_v1: Optional[Any] = None
def __post_init__(self):
self._deployment_name = self.deployment_spec.name
async def _init_kubernetes(self):
"""Initialize kubernetes client"""
try:
# Try in-cluster config first (for pods with service accounts)
await config.load_incluster_config()
self._in_cluster = True
except Exception:
# Fallback to kube config file (for local development)
await config.load_kube_config()
k8s_client = client.ApiClient()
self._custom_api = client.CustomObjectsApi(k8s_client)
self._core_api = client.CoreV1Api(k8s_client)
self._apps_v1 = client.AppsV1Api()
async def _wait_for_pods(self, label, expected, timeout=300):
for _ in range(timeout):
assert self._core_api is not None, "Kubernetes API not initialized"
pods = await self._core_api.list_namespaced_pod(
self.namespace, label_selector=label
)
running = sum(
1
for pod in pods.items
if any(
cond.type == "Ready" and cond.status == "True"
for cond in (pod.status.conditions or [])
)
)
if running == expected:
return True
await asyncio.sleep(1)
raise Exception(f"Didn't Reach Expected Pod Count {label}=={expected}")
async def _scale_statfulset(self, name, label, replicas):
body = {"spec": {"replicas": replicas}}
assert self._apps_v1 is not None, "Kubernetes API not initialized"
await self._apps_v1.patch_namespaced_stateful_set_scale(
name, self.namespace, body
)
await self._wait_for_pods(label, replicas)
async def _restart_stateful(self, name, label):
self._logger.info(f"Restarting {name} {label}")
await self._scale_statfulset(name, label, 0)
assert self._core_api is not None, "Kubernetes API not initialized"
nats_pvc = await self._core_api.list_namespaced_persistent_volume_claim(
self.namespace, label_selector=label
)
for pvc in nats_pvc.items:
await self._core_api.delete_namespaced_persistent_volume_claim(
pvc.metadata.name, self.namespace
)
await self._scale_statfulset(name, label, 1)
self._logger.info(f"Restarted {name} {label}")
async def _wait_for_ready(self, timeout: int = 1800, sleep=1, log_interval=60):
"""
Wait for the custom resource to be ready.
Args:
timeout: Maximum time to wait in seconds, default to 30 mins (image pulling can take a while)
"""
start_time = time.time()
self._logger.info(f"Waiting for Deployment {self._deployment_name}")
attempt = 0
while (time.time() - start_time) < timeout:
try:
attempt += 1
assert self._custom_api is not None, "Kubernetes API not initialized"
status = await self._custom_api.get_namespaced_custom_object(
group="nvidia.com",
version="v1alpha1",
namespace=self.namespace,
plural="dynamographdeployments",
name=self._deployment_name,
)
# Check both conditions:
# 1. Ready condition is True
# 2. State is successful
status_obj = status.get("status", {})
conditions = status_obj.get("conditions", [])
current_state = status_obj.get("state", "unknown")
ready_condition = False
for condition in conditions:
if (
condition.get("type") == "Ready"
and condition.get("status") == "True"
):
ready_condition = True
break
state_successful = status_obj.get("state") == "successful"
if ready_condition and state_successful:
self._logger.info(f"Current deployment state: {current_state}")
self._logger.info(f"Current conditions: {conditions}")
self._logger.info(
f"Elapsed time: {time.time() - start_time:.1f}s / {timeout}s"
)
self._logger.info(f"Deployment {self._deployment_name} is ready")
return True
else:
if attempt % log_interval == 0:
self._logger.info(f"Current deployment state: {current_state}")
self._logger.info(f"Current conditions: {conditions}")
self._logger.info(
f"Elapsed time: {time.time() - start_time:.1f}s / {timeout}s"
)
self._logger.info(
f"Deployment not ready yet - Ready condition: {ready_condition}, State successful: {state_successful}"
)
except kubernetes.client.rest.ApiException as e:
self._logger.info(
f"API Exception while checking deployment status: {e}"
)
self._logger.info(f"Status code: {e.status}, Reason: {e.reason}")
except Exception as e:
self._logger.info(
f"Unexpected exception while checking deployment status: {e}"
)
await asyncio.sleep(sleep)
raise TimeoutError("Deployment failed to become ready within timeout")
async def _restart_nats(self):
NATS_STS_NAME = "dynamo-platform-nats"
NATS_LABEL = "app.kubernetes.io/component=nats"
await self._restart_stateful(NATS_STS_NAME, NATS_LABEL)
async def _restart_etcd(self):
ETCD_STS_NAME = "dynamo-platform-etcd"
ETCD_LABEL = "app.kubernetes.io/component=etcd"
await self._restart_stateful(ETCD_STS_NAME, ETCD_LABEL)
async def _create_deployment(self):
"""
Create a DynamoGraphDeployment from either a dict or yaml file path.
Args:
deployment: Either a dict containing the deployment spec or a path to a yaml file
"""
# Extract service names
self._services = self.deployment_spec.services
self._logger.info(
f"Starting Deployment {self._deployment_name} with spec {self.deployment_spec}"
)
try:
assert self._custom_api is not None, "Kubernetes API not initialized"
await self._custom_api.create_namespaced_custom_object(
group="nvidia.com",
version="v1alpha1",
namespace=self.namespace,
plural="dynamographdeployments",
body=self.deployment_spec.spec(),
)
self._logger.info(self.deployment_spec.spec())
self._logger.info(f"Deployment Started {self._deployment_name}")
except kubernetes.client.rest.ApiException as e:
if e.status == 409: # Already exists
self._logger.info(f"Deployment {self._deployment_name} already exists")
else:
self._logger.info(
f"Failed to create deployment {self._deployment_name}: {e}"
)
raise
def get_processes(self, pod) -> list:
"""Get list of processes in the given pod"""
result = pod.exec(["ps", "-aux"])
lines = result.stdout.decode().splitlines()
# Skip header line
processes = [PodProcess(pod, line) for line in lines[1:]]
return processes
def get_service(self, service_name=None):
if not service_name:
service_name = ""
full_service_name = f"{self._deployment_name}-{service_name.lower()}"
return kr8s_Service.get(full_service_name, namespace=self.namespace)
def get_pods(self, service_name=None):
result = {}
service_list = []
if not service_name:
service_list = [service.name for service in self.deployment_spec.services]
else:
service_list = [service_name]
for service in service_list:
# List pods for this service using the selector label
# nvidia.com/selector: deployment-name-service
label_selector = (
f"nvidia.com/selector={self._deployment_name}-{service.lower()}"
)
pods = []
for pod in kr8s.get(
"pods", namespace=self.namespace, label_selector=label_selector
):
pods.append(pod)
result[service] = pods
return result
def get_pod_logs(self, service, pod, suffix=""):
directory = os.path.join(self.log_dir, service)
os.makedirs(directory, exist_ok=True)
try:
with open(os.path.join(directory, f"{pod.name}{suffix}.yaml"), "w") as f:
f.write(pod.to_yaml())
except Exception as e:
self._logger.error(e)
try:
with open(os.path.join(directory, f"{pod.name}{suffix}.log"), "w") as f:
f.write("\n".join(pod.logs()))
except Exception as e:
self._logger.error(e)
try:
previous_logs = pod.logs(previous=True)
with open(
os.path.join(directory, f"{pod.name}{suffix}.previous.log"), "w"
) as f:
f.write("\n".join(previous_logs))
except Exception as e:
self._logger.debug(e)
self._get_pod_metrics(pod, service, suffix)
def _get_service_logs(self, service_name=None, suffix=""):
service_pods = self.get_pods(service_name)
for service, pods in service_pods.items():
for i, pod in enumerate(pods):
self.get_pod_logs(service, pod, suffix)
def _get_pod_metrics(self, pod, service_name, suffix=""):
directory = os.path.join(self.log_dir, service_name)
os.makedirs(directory, exist_ok=True)
port = None
if service_name == self.frontend_service_name:
port = self.deployment_spec.port
else:
port = self.deployment_spec.system_port
pf = self.port_forward(pod, port)
if not pf:
self._logger.error(f"Unable to get metrics for {service_name}")
return
content = None
try:
url = f"http://localhost:{pf.local_port}/metrics"
response = requests.get(url, timeout=30)
content = None
try:
content = response.text
except ValueError:
pass
except Exception as e:
self._logger.error(str(e))
if content:
with open(
os.path.join(directory, f"{pod.name}.metrics{suffix}.log"), "w"
) as f:
f.write(content)
async def _delete_deployment(self):
"""
Delete the DynamoGraphDeployment CR.
"""
try:
if self._deployment_name and self._custom_api is not None:
await self._custom_api.delete_namespaced_custom_object(
group="nvidia.com",
version="v1alpha1",
namespace=self.namespace,
plural="dynamographdeployments",
name=self._deployment_name,
)
except client.exceptions.ApiException as e:
if e.status != 404: # Ignore if already deleted
raise
def port_forward(self, pod, remote_port, max_connection_attempts=3):
"""Attempt to connect to a pod and return the port-forward object on success."""
port_forward = pod.portforward(
remote_port=remote_port,
local_port=0,
address="0.0.0.0",
)
port_forward.start()
for _ in range(max_connection_attempts):
if port_forward.local_port == 0:
time.sleep(1)
continue
test_url = f"http://localhost:{port_forward.local_port}/"
try:
# Send HEAD request to test connection
response = requests.head(test_url, timeout=5)
if response.status_code in (200, 404): # 404 is acceptable
return port_forward
except (requests.ConnectionError, requests.Timeout) as e:
self._logger.warning(f"Connection test failed for pod {pod.name}: {e}")
# Retry port-forward
port_forward.stop()
port_forward.start()
time.sleep(1)
# All attempts failed
port_forward.stop()
return None
async def _cleanup(self):
try:
self._get_service_logs()
finally:
await self._delete_deployment()
async def __aenter__(self):
try:
self._logger = logging.getLogger(self.__class__.__name__)
self.deployment_spec.namespace = self.namespace
self._deployment_name = self.deployment_spec.name
logging.getLogger("httpx").setLevel(logging.WARNING)
await self._init_kubernetes()
await self._delete_deployment()
await self._restart_etcd()
await self._restart_nats()
await self._create_deployment()
await self._wait_for_ready()
except:
await self._cleanup()
raise
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._cleanup()
async def main():
LOG_FORMAT = "[TEST] %(asctime)s %(levelname)s %(name)s: %(message)s"
DATE_FORMAT = "%Y-%m-%dT%H:%M:%S"
# Configure logging
logging.basicConfig(
level=logging.INFO,
format=LOG_FORMAT,
datefmt=DATE_FORMAT, # ISO 8601 UTC format
)
deployment_spec = DeploymentSpec(
"/workspace/components/backends/vllm/deploy/agg.yaml"
)
deployment_spec.disable_grove()
print(deployment_spec._deployment_spec)
deployment_spec.name = "foo"
deployment_spec.set_image("nvcr.io/nvidia/ai-dynamo/vllm-runtime:0.4.1")
# Configure logging
deployment_spec.set_logging(enable_jsonl=True, log_level="debug")
print(f"Logging config: {deployment_spec.get_logging_config()}")
async with ManagedDeployment(
namespace="test", log_dir=".", deployment_spec=deployment_spec
):
time.sleep(60)
if __name__ == "__main__":
asyncio.run(main())
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment