Unverified Commit 02c822d6 authored by Tzu-Ling Kan's avatar Tzu-Ling Kan Committed by GitHub
Browse files

feat: Using ai-perf for k8 FT tests. (#3289)


Signed-off-by: default avatartzulingk@nvidia.com <tzulingk@nvidia.com>
parent b5782fcd
......@@ -28,11 +28,12 @@ conditions.
The fault tolerance test suite is designed as a set of pytest
configurations that launch typical dynamo deployments in a Kubernetes
environemnt and then inject failures by terminating processes or
pods. To test the recovery time and impact of failures a set number of
clients are launched in parallel. Each client sends a set number of
synchronous requests. Log files are stored for each pod as well as for
each client and inspected using a post-processing script.
environment and then inject failures by terminating processes or
pods. To test the recovery time and impact of failures, a set number of
clients are launched in parallel using **AI-Perf (aiperf)** for load generation.
Each client sends synthetic requests with configurable token patterns.
Log files are stored for each pod as well as for each client and inspected
using a post-processing script that parses AI-Perf metrics.
> [!NOTE]
> Test pass / failure is not an indication of SLA for recovery or resilience
......@@ -82,12 +83,18 @@ Below are some representative examples of the generated scenarios:
The full test matrix is generated from these parameters, creating comprehensive test coverage across all configurations.
#### Client Load
#### Client Load (AI-Perf Configuration)
- **Concurrent Clients**: 10 clients by default, adjustable per scenario.
- **Requests per Client**: 100 requests, simulating sustained load.
- **Input/Output Token Length**: 100 tokens for both input prompts and generated outputs.
- **Request Rate Limit**: Ensures clients do not overwhelm the service, with a maximum of 1 request per second per client.
- **Load Generator**: AI-Perf (`aiperf`) with synthetic token generation
- **Concurrent Clients**: 10 clients by default, adjustable per scenario
- **Requests per Client**: 150 requests per client (configurable)
- **Input/Output Token Configuration**:
- Input tokens: mean=100, stddev=0 (consistent length)
- Output tokens: mean=100, stddev=0 (consistent length)
- **Concurrency**: Sequential requests (concurrency=1) per client
- **Retry Logic**: 3 retry attempts for fault tolerance
- **Streaming Support**: Optional `--streaming` flag for TTFT/ITL metrics
- **No warmup**: warmup-request-count=0 to avoid initial failures
#### Failures
......@@ -131,83 +138,107 @@ pytest tests/fault_tolerance/deploy/test_deployment.py -s -v --namespace ${NAMES
### Test Results Directory
For each test scenario a directory of log files is created and post processed to summarize the test.
For each test scenario a directory of log files is created and post-processed to summarize the test.
```
test_fault_scenario[agg-tp-1-dp-1-none]
test_fault_scenario[sglang-agg-tp-1-dp-1-frontend]
.
├── client_0.log.txt
├── client_1.log.txt
├── client_2.log.txt
├── client_3.log.txt
├── client_4.log.txt
├── client_5.log.txt
├── client_6.log.txt
├── client_7.log.txt
├── client_8.log.txt
├── client_9.log.txt
├── Frontend
├── client_0/
│ └── attempt_0/
│ ├── profile_export_aiperf.json # AI-Perf metrics in JSON format
│ ├── profile_export_aiperf.csv # AI-Perf metrics in CSV format
│ ├── genai_perf.log # AI-Perf execution log
│ └── logs/
│ └── aiperf.log # Detailed AI-Perf logs
├── client_1/
│ ├── attempt_0/ # First attempt (may fail during fault)
│ └── attempt_1/ # Retry attempt after failure
│ └── [same structure as above]
├── [client_2 through client_9...]
├── Frontend/
│ ├── fault-tolerance-test-frontend-576bd784dc-jv68q.log
│ ├── fault-tolerance-test-frontend-576bd784dc-jv68q.metrics.log
│ ├── fault-tolerance-test-frontend-576bd784dc-jv68q.previous.log
│ ├── fault-tolerance-test-frontend-576bd784dc-jv68q.previous.log # Pre-restart logs
│ └── fault-tolerance-test-frontend-576bd784dc-jv68q.yaml
├── test.log.txt
└── VllmDecodeWorker
├── fault-tolerance-test-vllmdecodeworker-56b7bdf447-6tzqq.log
├── fault-tolerance-test-vllmdecodeworker-56b7bdf447-6tzqq.metrics.log
├── fault-tolerance-test-vllmdecodeworker-56b7bdf447-6tzqq.previous.log
└── fault-tolerance-test-vllmdecodeworker-56b7bdf447-6tzqq.yaml
├── decode/ # Or VllmDecodeWorker for vLLM backend
│ └── [same structure as Frontend]
└── test.log.txt
```
| File/Directory Name | Description |
|------------------------------------|------------------------------------------------------------------------------------------------|
| **client_*.log.txt** | Request/response logs for each client instance (contains JSON-formatted request details) |
| **{Service}/*.log | Container log for pod at end of test (Frontend, VllmDecodeWroer, etc.) |
| **{Service}/*.previous.log** | Previous container log for pod in case of crash / exit. (Frontend, VllmDecodeWroer, etc.). Empty if N/A. |
| **{Service}/*.metrics.log** | Metrics as reported by `/metrics` for the service |
| **{Service}/*.yaml** | yaml for pod including status transitions |
| **test.log.txt** | Primary test execution log (contains fault injection timing, process management, and test status)|
| **client_N/attempt_M/** | AI-Perf results for client N, attempt M (supports multiple retry attempts) |
| **profile_export_aiperf.json** | Complete AI-Perf metrics including latencies (P50/P90/P99), throughput, token counts |
| **profile_export_aiperf.csv** | Tabular format of key metrics for easy analysis |
| **genai_perf.log** | AI-Perf execution output (stdout/stderr) |
| **{Service}/*.log** | Current container log for pod (Frontend, decode, etc.) |
| **{Service}/*.previous.log** | Previous container log before restart (contains pre-fault logs) |
| **{Service}/*.metrics.log** | Prometheus metrics from `/metrics` endpoint |
| **{Service}/*.yaml** | Pod specification and status transitions |
| **test.log.txt** | Primary test execution log (deployment timing, fault injection, recovery events) |
### Summary Results
Results are presented in table format after each test providing summary statistics.
Results are parsed from AI-Perf metrics and presented in table format after each test. The parsing script (`parse_results.py`) extracts comprehensive metrics for each scenario:
#### Per-Test Output Format
```
Test Group: agg-tp-1-dp-1
╒═════════════════════════╤═══════════╤═══════════╤══════════╤═══════════╤══════════╤═══════════╤═══════════╤════════════╕
│ Failure │ Startup │ Success │ Failed │ Success │ Failed │ Latency │ Latency │ Recovery │
│ │ │ Before │ Before │ After │ After │ Before │ After │ │
╞═════════════════════════╪═══════════╪═══════════╪══════════╪═══════════╪══════════╪═══════════╪═══════════╪════════════╡
│ none │ 180.00 │ 1500.00 │ 0.00 │ N/A │ N/A │ 1.19 │ N/A │ N/A │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ frontend │ 181.00 │ 153.00 │ 0.00 │ 820.00 │ 527.00 │ 1.21 │ 1.18 │ 3.36 │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ frontend_pod │ 169.00 │ 140.00 │ 0.00 │ 785.00 │ 305.00 │ 1.20 │ 1.18 │ 5.39 │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ decode_worker │ 161.00 │ 140.00 │ 0.00 │ 510.00 │ 850.00 │ 1.21 │ 1.18 │ 154.11 │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ decode_worker_pod │ 181.00 │ 140.00 │ 0.00 │ 511.00 │ 849.00 │ 1.22 │ 1.18 │ 156.47 │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ vllm_decode_engine_core │ 181.00 │ 140.00 │ 0.00 │ 524.00 │ 836.00 │ 1.21 │ 1.19 │ 152.52 │
╘═════════════════════════╧═══════════╧═══════════╧══════════╧═══════════╧══════════╧═══════════╧═══════════╧════════════╛
============================================================
FAULT TOLERANCE TEST SUMMARY - AI-PERF
============================================================
╒═══════════════════════════════════╤════════════════════════════════════════════════════╕
│ Metric │ Value │
╞═══════════════════════════════════╪════════════════════════════════════════════════════╡
│ Test Directory │ test_fault_scenario[sglang-agg-tp-1-dp-1-frontend] │
├───────────────────────────────────┼────────────────────────────────────────────────────┤
│ Number of Clients │ 10 │
├───────────────────────────────────┼────────────────────────────────────────────────────┤
│ === Deployment Metrics === │ │
├───────────────────────────────────┼────────────────────────────────────────────────────┤
│ Startup Time │ 69.00 sec │
├───────────────────────────────────┼────────────────────────────────────────────────────┤
│ Recovery Time │ 2.00 sec │
├───────────────────────────────────┼────────────────────────────────────────────────────┤
│ === Request Metrics === │ │
├───────────────────────────────────┼────────────────────────────────────────────────────┤
│ Total Requests │ 1500 │
├───────────────────────────────────┼────────────────────────────────────────────────────┤
│ Successful Requests │ 1470 │
├───────────────────────────────────┼────────────────────────────────────────────────────┤
│ Failed Requests │ 30 │
├───────────────────────────────────┼────────────────────────────────────────────────────┤
│ Success Rate │ 98.00% │
├───────────────────────────────────┼────────────────────────────────────────────────────┤
│ === Latency Metrics (seconds) === │ │
├───────────────────────────────────┼────────────────────────────────────────────────────┤
│ Mean Latency │ 0.502 │
├───────────────────────────────────┼────────────────────────────────────────────────┤
│ P50 Latency │ 0.396 │
├───────────────────────────────────┼────────────────────────────────────────────────────┤
│ P90 Latency │ 0.422 │
├───────────────────────────────────┼────────────────────────────────────────────────────┤
│ P99 Latency │ 0.761 │
├───────────────────────────────────┼────────────────────────────────────────────────────┤
│ === Throughput Metrics === │ │
├───────────────────────────────────┼────────────────────────────────────────────────────┤
│ Total Throughput │ 19.72 req/s │
├───────────────────────────────────┼────────────────────────────────────────────────────┤
│ Avg Client Throughput │ 1.97 req/s │
╘═══════════════════════════════════╧════════════════════════════════════════════════════╛
```
| Column Name | Description |
| Metric Category | Metrics Included |
|-----------------------|-----------------------------------------------------------------------------|
| **Failure** | Type of fault injection applied during the test (or 'none' for baseline) |
| **Startup** | Time (seconds) taken for the service to become ready after initialization |
| **Succes/nBefore** | Numoer of client requests that succeeded before fault injection |
| **Failed/nBefore** | Number of client requests that failed or were invalid before fault injection |
| **Success/nAftere** | Number of client requests that succeeded after fault injection |
| **Latency Before** | Average request latency (seconds) for successful requests before fault injection |
| **Latency After** | Average request latency (seconds) for successful requests after fault injection |
| **Recovery Time** | Time (seconds) taken for failed components to recover after fault injection |
| **Deployment Metrics**| Startup Time, Recovery Time |
| **Request Metrics** | Total/Successful/Failed Requests, Success Rate |
| **Latency Metrics** | Mean, P50, P90, P99 latencies (in seconds) |
| **Token Metrics** | TTFT (Time to First Token), ITL (Inter-Token Latency) when streaming enabled |
| **Throughput Metrics**| Total and per-client request throughput |
## Example Results
## Example Deployment Architectures
The following results were obtained running on a cluster of A100
nodes.
The following architectures are tested with various failure scenarios:
### Aggregated Workers
......@@ -237,32 +268,9 @@ graph LR
style DecodePool stroke:#000,stroke-width:2px
```
#### Results:
```
Test Group: agg-tp-1-dp-1
╒═════════════════════════╤═══════════╤═══════════╤══════════╤═══════════╤══════════╤═══════════╤═══════════╤════════════╕
│ Failure │ Startup │ Success │ Failed │ Success │ Failed │ Latency │ Latency │ Recovery │
│ │ │ Before │ Before │ After │ After │ Before │ After │ │
╞═════════════════════════╪═══════════╪═══════════╪══════════╪═══════════╪══════════╪═══════════╪═══════════╪════════════╡
│ none │ 180.00 │ 1500.00 │ 0.00 │ N/A │ N/A │ 1.19 │ N/A │ N/A │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ frontend │ 181.00 │ 153.00 │ 0.00 │ 820.00 │ 527.00 │ 1.21 │ 1.18 │ 3.36 │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ frontend_pod │ 169.00 │ 140.00 │ 0.00 │ 785.00 │ 305.00 │ 1.20 │ 1.18 │ 5.39 │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ decode_worker │ 161.00 │ 140.00 │ 0.00 │ 510.00 │ 850.00 │ 1.21 │ 1.18 │ 154.11 │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ decode_worker_pod │ 181.00 │ 140.00 │ 0.00 │ 511.00 │ 849.00 │ 1.22 │ 1.18 │ 156.47 │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ vllm_decode_engine_core │ 181.00 │ 140.00 │ 0.00 │ 524.00 │ 836.00 │ 1.21 │ 1.19 │ 152.52 │
╘═════════════════════════╧═══════════╧═══════════╧══════════╧═══════════╧══════════╧═══════════╧═══════════╧════════════╛
```
#### Summary:
1. Recovery time for the decode worker itself is the largest and a decode worker failure has the largest impact (as expected)
2. Recovery time doesn't include time for the ready probe to return `ready` so even if the process is recovered early (as in the case of the Frontend) requests may fail until the pod is probed.
#### Redundant Workers (Over Provisoned)
......@@ -296,30 +304,6 @@ graph LR
style DecodePool stroke:#000,stroke-width:2px
```
#### Results:
```
Test Group: agg-tp-1-dp-2
╒═════════════════════════╤═══════════╤═══════════╤══════════╤═══════════╤══════════╤═══════════╤═══════════╤════════════╕
│ Failure │ Startup │ Success │ Failed │ Success │ Failed │ Latency │ Latency │ Recovery │
│ │ │ Before │ Before │ After │ After │ Before │ After │ │
╞═════════════════════════╪═══════════╪═══════════╪══════════╪═══════════╪══════════╪═══════════╪═══════════╪════════════╡
│ none │ 181.00 │ 1500.00 │ 0.00 │ N/A │ N/A │ 1.18 │ N/A │ N/A │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ frontend │ 181.00 │ 121.00 │ 0.00 │ 1373.00 │ 6.00 │ 1.21 │ 1.17 │ 4.37 │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ frontend_pod │ 182.00 │ 122.00 │ 0.00 │ 1378.00 │ 0.00 │ 1.21 │ 1.17 │ 5.24 │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ decode_worker │ 169.00 │ 121.00 │ 0.00 │ 1374.00 │ 5.00 │ 1.20 │ 1.18 │ 153.09 │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ decode_worker_pod │ 181.00 │ 125.00 │ 0.00 │ 1369.00 │ 6.00 │ 1.21 │ 1.18 │ 152.72 │
├─────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ vllm_decode_engine_core │ 182.00 │ 120.00 │ 0.00 │ 1375.00 │ 5.00 │ 1.20 │ 1.18 │ 154.75 │
╘═════════════════════════╧═══════════╧═══════════╧══════════╧═══════════╧══════════╧═══════════╧═══════════╧════════════╛
```
#### Summary:
1. By immediately detecting a decode worker failure, Dynamo can limit
the failures and reroute requests to healthy workers with minimal
impact.
......@@ -367,34 +351,6 @@ graph LR
style DecodePool stroke:#000,stroke-width:2px
```
#### Results:
```
Test Group: disagg-tp-1-dp-1
╒══════════════════════════╤═══════════╤═══════════╤══════════╤═══════════╤══════════╤═══════════╤═══════════╤════════════╕
│ Failure │ Startup │ Success │ Failed │ Success │ Failed │ Latency │ Latency │ Recovery │
│ │ │ Before │ Before │ After │ After │ Before │ After │ │
╞══════════════════════════╪═══════════╪═══════════╪══════════╪═══════════╪══════════╪═══════════╪═══════════╪════════════╡
│ none │ 175.00 │ 1500.00 │ 0.00 │ N/A │ N/A │ 1.99 │ N/A │ N/A │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ frontend │ 182.00 │ 100.00 │ 0.00 │ 817.00 │ 583.00 │ 1.91 │ 2.00 │ 4.28 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ frontend_pod │ 181.00 │ 81.00 │ 0.00 │ 1024.00 │ 395.00 │ 2.31 │ 1.96 │ 5.53 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ decode_worker │ 181.00 │ 82.00 │ 0.00 │ 560.00 │ 858.00 │ 2.26 │ 1.98 │ 155.79 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ decode_worker_pod │ 181.00 │ 92.00 │ 0.00 │ 566.00 │ 842.00 │ 2.21 │ 1.83 │ 174.15 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ prefill_worker │ 182.00 │ 84.00 │ 0.00 │ 1346.00 │ 70.00 │ 2.22 │ 1.49 │ 153.53 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ prefill_worker_pod │ 161.00 │ 83.00 │ 0.00 │ 1362.00 │ 55.00 │ 2.21 │ 1.51 │ 154.18 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ vllm_decode_engine_core │ 167.00 │ 81.00 │ 0.00 │ 569.00 │ 850.00 │ 2.33 │ 2.12 │ 153.81 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ vllm_prefill_engine_core │ 182.00 │ 83.00 │ 0.00 │ 568.00 │ 849.00 │ 2.24 │ 2.00 │ 153.84 │
╘══════════════════════════╧═══════════╧═══════════╧══════════╧═══════════╧══════════╧═══════════╧═══════════╧════════════╛
```
#### Summary:
......@@ -456,34 +412,7 @@ graph LR
style DecodePool stroke:#000,stroke-width:2px
```
#### Results:
```
Test Group: disagg-tp-1-dp-2
╒══════════════════════════╤═══════════╤═══════════╤══════════╤═══════════╤══════════╤═══════════╤═══════════╤════════════╕
│ Failure │ Startup │ Success │ Failed │ Success │ Failed │ Latency │ Latency │ Recovery │
│ │ │ Before │ Before │ After │ After │ Before │ After │ │
╞══════════════════════════╪═══════════╪═══════════╪══════════╪═══════════╪══════════╪═══════════╪═══════════╪════════════╡
│ none │ 181.00 │ 1500.00 │ 0.00 │ N/A │ N/A │ 1.47 │ N/A │ N/A │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ frontend │ 182.00 │ 100.00 │ 0.00 │ 1390.00 │ 10.00 │ 1.75 │ 1.43 │ 4.32 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ frontend_pod │ 182.00 │ 91.00 │ 0.00 │ 1409.00 │ 0.00 │ 1.78 │ 1.43 │ 5.48 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ decode_worker │ 182.00 │ 94.00 │ 0.00 │ 1404.00 │ 2.00 │ 1.78 │ 1.58 │ 154.30 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ decode_worker_pod │ 181.00 │ 100.00 │ 0.00 │ 1394.00 │ 6.00 │ 1.75 │ 1.57 │ 153.00 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ prefill_worker │ 172.00 │ 90.00 │ 0.00 │ 1408.00 │ 2.00 │ 1.78 │ 1.44 │ 154.68 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ prefill_worker_pod │ 174.00 │ 100.00 │ 0.00 │ 1398.00 │ 2.00 │ 1.74 │ 1.41 │ 155.59 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ vllm_decode_engine_core │ 181.00 │ 91.00 │ 0.00 │ 1403.00 │ 6.00 │ 1.79 │ 1.56 │ 157.54 │
├──────────────────────────┼───────────┼───────────┼──────────┼───────────┼──────────┼───────────┼───────────┼────────────┤
│ vllm_prefill_engine_core │ 181.00 │ 94.00 │ 0.00 │ 1404.00 │ 2.00 │ 1.77 │ 1.43 │ 154.10 │
╘══════════════════════════╧═══════════╧═══════════╧══════════╧═══════════╧══════════╧═══════════╧═══════════╧════════════╛
```
#### Summary:
......@@ -512,7 +441,7 @@ Then run the development container mounting the workspace and your kube config.
### Run the tests
```
```bash
pytest tests/fault_tolerance/deploy/test_deployment.py -s -v --namespace ${NAMESPACE} --image ${IMAGE}
```
......
......@@ -13,14 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""AI-Perf client implementation for fault tolerance testing."""
import json
import logging
import os
import random
import subprocess
import time
from copy import deepcopy
from datetime import datetime
from typing import Any, Dict
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import requests
......@@ -29,194 +30,499 @@ from tests.utils.managed_deployment import ManagedDeployment
LOG_FORMAT = "[TEST] %(asctime)s %(levelname)s %(name)s: %(message)s"
DATE_FORMAT = "%Y-%m-%dT%H:%M:%S"
payload = {
"model": "",
"messages": [
{
"role": "user",
"content": "",
}
],
"max_tokens": 0,
"temperature": 0.1,
# "seed": 10,
"ignore_eos": True,
"min_tokens": 0,
"stream": False,
}
# Configure logging
logging.basicConfig(
level=logging.INFO,
format=LOG_FORMAT,
datefmt=DATE_FORMAT, # ISO 8601 UTC format
datefmt=DATE_FORMAT,
)
def _get_random_prompt(length):
word_list = [f"{i}" for i in range(10)]
return " ".join(random.choices(word_list, k=length))
def get_frontend_port(
managed_deployment: ManagedDeployment,
client_index: int,
deployment_spec: Any,
pod_ports: Dict[str, Any],
logger: logging.Logger,
) -> Tuple[Optional[str], Optional[int], Optional[str]]:
"""
Select a frontend pod using round-robin and setup port forwarding.
Args:
managed_deployment: ManagedDeployment instance
client_index: Client index for round-robin selection
deployment_spec: Deployment specification with port info
pod_ports: Dictionary to track existing port forwards
- Key: pod name (str)
- Value: port forward object from managed_deployment.port_forward()
logger: Logger instance
def _single_request(
url,
pod,
payload,
model,
logger,
retry_attempts=1,
input_token_length=100,
output_token_length=100,
timeout=30,
retry_delay=1,
):
prompt = _get_random_prompt(input_token_length)
payload_copy = deepcopy(payload)
payload_copy["messages"][0]["content"] = prompt
payload_copy["max_tokens"] = output_token_length
payload_copy["min_tokens"] = output_token_length
payload_copy["model"] = model
response = None
end_time = None
start_time = time.time()
results = []
while retry_attempts:
start_request_time = time.time()
response = None
Returns:
Tuple of (pod_name, local_port, pod_instance) or (None, None, None) if failed
"""
pods = managed_deployment.get_pods(managed_deployment.frontend_service_name)
port = 0
pod_name = None
selected_pod = None
# Filter ready pods and cleanup stale port forwards
pods_ready = []
for pod in pods[managed_deployment.frontend_service_name]:
if pod.ready():
pods_ready.append(pod)
else:
# Cleanup port forwards for non-ready pods
if pod.name in pod_ports:
try:
pod_ports[pod.name].stop()
except Exception as e:
logger.debug(f"Error stopping port forward for {pod.name}: {e}")
del pod_ports[pod.name]
if not pods_ready:
logger.error("No ready frontend pods found")
return None, None, None
# Round-robin selection based on client index
selected_pod = pods_ready[client_index % len(pods_ready)]
pod_name = selected_pod.name
# Setup or reuse port forward
if pod_name not in pod_ports:
# Get port from deployment_spec (default: 8000)
port_value = getattr(deployment_spec, "_port", 8000)
port_forward = managed_deployment.port_forward(selected_pod, port_value)
if port_forward:
pod_ports[pod_name] = port_forward
port = port_forward.local_port
else:
logger.error(f"Failed to create port forward for pod {pod_name}")
return None, None, None
else:
# Reuse existing port forward
port = pod_ports[pod_name].local_port
logger.debug(f"Selected pod {pod_name} with local port {port}")
return pod_name, port, selected_pod
def wait_for_model_availability(
url: str,
endpoint: str,
model: str,
logger: logging.Logger,
max_attempts: int = 15,
attempt_timeouts: Optional[List[float]] = None,
) -> bool:
"""
Wait for model to be available before running AI-Perf.
Args:
url: Base URL for the service
endpoint: API endpoint path
model: Model name to test
logger: Logger instance
max_attempts: Maximum number of attempts to check availability
attempt_timeouts: List of timeout values for each attempt
Returns:
True if model is available, False otherwise
"""
if attempt_timeouts is None:
# Default: Start with 60s timeout, then gradually decrease
attempt_timeouts = [60, 60, 45, 30, 30, 20, 20, 15, 15, 15, 10, 10, 10, 10, 10]
test_url = f"{url}{endpoint}"
for attempt in range(max_attempts):
try:
response = requests.post(
test_payload = {
"model": model,
"messages": [{"role": "user", "content": "test"}],
"max_tokens": 1,
"stream": False,
}
timeout_val = attempt_timeouts[min(attempt, len(attempt_timeouts) - 1)]
logger.info(
f"Testing model availability at {test_url} (attempt {attempt+1}/{max_attempts}, timeout={timeout_val}s)"
)
response = requests.post(test_url, json=test_payload, timeout=timeout_val)
if response.status_code == 200:
logger.info(f"Model '{model}' is available and responding")
# Give a bit more time for stabilization
logger.info("Model ready, waiting 5s for stabilization...")
time.sleep(5)
return True
elif response.status_code == 404:
logger.warning(
f"Model '{model}' not found (404). Response: {response.text[:200]}"
)
elif response.status_code == 400:
logger.warning(f"Bad request (400). Response: {response.text[:200]}")
else:
logger.warning(
f"Unexpected status code {response.status_code}: {response.text[:200]}"
)
except requests.Timeout as e:
logger.warning(
f"Model availability test timed out (attempt {attempt+1}): {e}"
)
except Exception as e:
logger.warning(f"Model availability test failed (attempt {attempt+1}): {e}")
if attempt < max_attempts - 1:
wait_time = 10 if attempt < 5 else 5
logger.info(f"Waiting {wait_time}s before retry...")
time.sleep(wait_time)
logger.warning(
"Could not confirm model availability after all attempts, proceeding anyway..."
)
return False
def run_aiperf(
url: str,
endpoint: str,
model: str,
pod_name: str,
port: int,
requests_per_client: int,
input_token_length: int,
output_token_length: int,
output_dir: Path,
logger: logging.Logger,
max_retries: int = 1,
retry_delay: float = 1,
) -> bool:
"""
Execute AI-Perf with specified parameters.
Args:
url: Base URL (http://localhost:port)
endpoint: API endpoint path (e.g., "v1/chat/completions")
model: Model name
pod_name: Selected pod name for logging
port: Local port number
requests_per_client: Number of requests to send
input_token_length: Input token count
output_token_length: Output token count
output_dir: Directory for AI-Perf artifacts
logger: Logger instance
max_retries: Maximum number of retry attempts (default: 1)
retry_delay: Delay in seconds between retries (default: 1)
Returns:
True if successful, False otherwise
"""
# Validate required parameters
if not model or not url or not endpoint:
logger.error(
f"Missing required parameter: model={model!r}, url={url!r}, endpoint={endpoint!r}"
)
return False
# Build AI-Perf command
cmd = [
"aiperf",
"profile",
# Model configuration (required)
"--model",
model,
# Endpoint configuration
"--url",
url,
json=payload_copy,
timeout=timeout,
"--endpoint",
endpoint if endpoint.startswith("/") else f"/{endpoint}",
"--endpoint-type",
"chat", # Required: tells AI-Perf the API type
# Enable streaming for TTFT and ITL metrics
"--streaming",
# Request parameters
"--request-count",
str(requests_per_client), # Required: how many requests
"--concurrency",
"1", # Optional: we set to 1 for sequential
# Token configuration
"--synthetic-input-tokens-mean",
str(input_token_length),
"--synthetic-input-tokens-stddev",
"0", # Set to 0 for consistent token counts
"--output-tokens-mean",
str(output_token_length),
"--output-tokens-stddev",
"0", # Set to 0 for consistent token counts
# Skip warmup to avoid initial failures
"--warmup-request-count",
"0",
# Output configuration
"--artifact-dir",
str(output_dir),
"--random-seed",
"100", # For reproducible results
]
# Calculate timeout (same as legacy would for all requests)
timeout = max(requests_per_client * 2 + 60, 300) # At least 5 minutes
# Log execution
logger.info(f"Starting AI-Perf for Pod {pod_name} Local Port {port}")
logger.info(f"Using model name: {model}")
# Wait for model to be available
wait_for_model_availability(url, endpoint, model, logger)
logger.info(f"Command: {' '.join(cmd)}")
# Retry logic for fault tolerance - retry FULL request count until success
max_attempts = max_retries if max_retries > 0 else 1
success = False
all_results = []
for attempt in range(max_attempts):
logger.info(
f"AI-Perf attempt {attempt + 1}/{max_attempts} with {requests_per_client} requests"
)
end_time = time.time()
content = None
# Update output directory for this attempt
attempt_dir = output_dir / f"attempt_{attempt}"
attempt_dir.mkdir(parents=True, exist_ok=True)
# Use the original command but update artifact directory
cmd_attempt = cmd.copy()
artifact_dir_idx = cmd_attempt.index("--artifact-dir") + 1
cmd_attempt[artifact_dir_idx] = str(attempt_dir)
try:
content = response.json()
except ValueError:
pass
result = subprocess.run(
cmd_attempt,
capture_output=True,
text=True,
timeout=timeout,
stdin=subprocess.DEVNULL, # Prevent stdin reading which can cause process suspension
)
# Save logs for this attempt
with open(attempt_dir / "genai_perf.log", "w") as f:
f.write("=== STDOUT ===\n")
f.write(result.stdout)
f.write("\n\n=== STDERR ===\n")
f.write(result.stderr)
results.append(
all_results.append(
{
"status": response.status_code,
"result": content,
"request_elapsed_time": end_time - start_request_time,
"url": url,
"pod": pod,
"attempt": attempt + 1,
"returncode": result.returncode,
"stdout": result.stdout,
"stderr": result.stderr,
}
)
if response.status_code != 200:
if result.returncode == 0:
logger.info(
f"Attempt {attempt + 1} succeeded with all {requests_per_client} requests"
)
log_summary_metrics(attempt_dir, logger, pod_name, port)
success = True
break # Success - we're done!
else:
logger.warning(
f"Attempt {attempt + 1} failed with return code {result.returncode}"
)
logger.debug(
f"Stderr: {result.stderr[:500] if result.stderr else 'No stderr'}"
)
except Exception as e:
logger.error(f"Error in attempt {attempt + 1}: {str(e)}")
all_results.append({"attempt": attempt + 1, "error": str(e)})
# Sleep before next attempt (if not the last attempt)
if not success and attempt < max_attempts - 1:
time.sleep(retry_delay)
retry_attempts -= 1
continue
if success:
logger.info(
f"AI-Perf successfully completed all {requests_per_client} requests for {pod_name}"
)
else:
logger.error(f"AI-Perf failed all {max_attempts} attempts for {pod_name}")
return success
def log_summary_metrics(
output_dir: Path, logger: logging.Logger, pod_name: str, port: int
) -> None:
"""
Log summary metrics from AI-Perf results.
Args:
output_dir: Directory containing AI-Perf artifacts
logger: Logger instance
pod_name: Pod name for logging
port: Port number for logging
"""
# Look for AI-Perf output file
profile_json = output_dir / "profile_export_aiperf.json"
if not profile_json.exists():
# Try alternative names
for name in ["profile_export.json", "profile_results.json"]:
alt_path = output_dir / name
if alt_path.exists():
profile_json = alt_path
break
except (requests.RequestException, requests.Timeout) as e:
results.append(
{
"status": str(e),
"result": None,
"request_elapsed_time": time.time() - start_request_time,
"url": url,
"pod": pod,
}
if profile_json.exists():
try:
with open(profile_json) as f:
metrics = json.load(f)
# Extract key metrics from AI-Perf format
records = metrics.get("records", {})
# Request count from request_count record
request_count_record = records.get("request_count", {})
request_count = (
int(request_count_record.get("avg", 0)) if request_count_record else 0
)
time.sleep(retry_delay)
retry_attempts -= 1
continue
return {
"time": datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
"results": results,
"total_time": time.time() - start_time,
"url": url,
"pod": pod,
}
# Check for errors
error_summary = metrics.get("error_summary", [])
error_count = len(error_summary)
# Latency metrics (in milliseconds)
request_latency = records.get("request_latency", {})
avg_latency = request_latency.get("avg", 0) / 1000.0 # Convert to seconds
p99_latency = request_latency.get("p99", 0) / 1000.0 # Convert to seconds
# Throughput metrics
request_throughput = records.get("request_throughput", {})
throughput = request_throughput.get("avg", 0)
# Log summary
logger.info(
f"Summary: Pod {pod_name} Port {port} "
f"Requests: {request_count} "
f"Errors: {error_count} "
f"Throughput: {throughput:.1f} req/s "
f"Avg Latency: {avg_latency:.3f}s "
f"P99 Latency: {p99_latency:.3f}s"
)
# Log success rate
if request_count > 0:
success_rate = (request_count - error_count) / request_count * 100
logger.info(f"Success rate: {success_rate:.1f}%")
# Also write summary to CSV file for aggregation
csv_path = output_dir / "profile_export_aiperf.csv"
if csv_path.exists():
logger.info(f"AI-Perf results saved to {csv_path}")
except Exception as e:
logger.warning(f"Failed to parse AI-Perf metrics: {e}")
def client(
deployment_spec,
namespace,
model,
log_dir,
index,
requests_per_client,
input_token_length,
output_token_length,
max_retries,
max_request_rate,
retry_delay=1,
namespace: str,
model: str,
log_dir: str,
index: int,
requests_per_client: int,
input_token_length: int,
output_token_length: int,
max_retries: int,
retry_delay: float = 1,
):
"""
Generate load using AI-Perf for fault tolerance testing.
This function sets up port forwarding to a frontend pod and uses AI-Perf
to generate synthetic requests for performance testing and fault tolerance
evaluation.
Args:
deployment_spec: Deployment specification object
namespace: Kubernetes namespace
model: Model name
log_dir: Directory for output logs and AI-Perf artifacts
index: Client index used for round-robin pod selection
requests_per_client: Number of requests to generate
input_token_length: Number of input tokens per request
output_token_length: Number of output tokens per request
max_retries: Maximum retry attempts for AI-Perf execution
retry_delay: Delay in seconds between retry attempts
"""
logger = logging.getLogger(f"CLIENT: {index}")
logging.getLogger("httpx").setLevel(logging.WARNING)
managed_deployment = ManagedDeployment(log_dir, deployment_spec, namespace)
pod_ports: Dict[str, Any] = {}
min_elapsed_time = (1 / max_request_rate) if max_request_rate > 0 else 0.0
try:
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, f"client_{index}.log.txt")
with open(log_path, "w") as log:
for i in range(requests_per_client):
pods = managed_deployment.get_pods(
managed_deployment.frontend_service_name
client_output_dir = Path(log_dir) / f"client_{index}"
client_output_dir.mkdir(parents=True, exist_ok=True)
# Add a startup delay for early clients to give model time to load
if index < 5:
wait_time = 30 - (index * 5) # 30, 25, 20, 15, 10 seconds
logger.info(
f"Client {index} waiting {wait_time}s for model registration..."
)
port = 0
pod_name = None
time.sleep(wait_time)
pods_ready = []
# Select frontend pod and setup port forwarding
pod_name, port, selected_pod = get_frontend_port(
managed_deployment=managed_deployment,
client_index=index,
deployment_spec=deployment_spec,
pod_ports=pod_ports,
logger=logger,
)
for pod in pods[managed_deployment.frontend_service_name]:
if pod.ready():
pods_ready.append(pod)
else:
if pod.name in pod_ports:
pod_ports[pod.name].stop()
del pod_ports[pod.name]
if not pod_name or not port:
logger.error("Failed to select pod or setup port forwarding")
return
if pods_ready:
pod = pods_ready[i % len(pods_ready)]
if pod.name not in pod_ports:
port_forward = managed_deployment.port_forward(
pod, deployment_spec.port
)
if port_forward:
pod_ports[pod.name] = port_forward
if pod.name in pod_ports:
port = pod_ports[pod.name].local_port
pod_name = pod.name
url = f"http://localhost:{port}"
url = f"http://localhost:{port}/{deployment_spec.endpoint}"
# Get endpoint from deployment_spec (default: /v1/chat/completions)
endpoint = getattr(deployment_spec, "_endpoint", "/v1/chat/completions")
result = _single_request(
url,
pod_name,
payload,
model,
logger,
max_retries,
success = run_aiperf(
url=url,
endpoint=endpoint,
model=model,
pod_name=pod_name,
port=port,
requests_per_client=requests_per_client,
input_token_length=input_token_length,
output_token_length=output_token_length,
output_dir=client_output_dir,
logger=logger,
max_retries=max_retries,
retry_delay=retry_delay,
)
logger.info(
f"Request: {i} Pod {pod_name} Local Port {port} Status: {result['results'][-1]['status']} Latency: {result['results'][-1]['request_elapsed_time']}"
)
log.write(json.dumps(result) + "\n")
log.flush()
if result["total_time"] < min_elapsed_time:
time.sleep(min_elapsed_time - result["total_time"])
if not success:
logger.error("AI-Perf execution failed")
except Exception as e:
logger.error(str(e))
logger.error(f"Client error: {str(e)}")
finally:
for pf_name, port_forward in pod_ports.items():
try:
port_forward.stop()
logger.debug(f"Stopped port forward for {pf_name}")
except Exception as e:
logger.debug(f"Error stopping port forward for {pf_name}: {e}")
logger.info("Exiting")
......@@ -13,441 +13,515 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Parser for AI-Perf results in fault tolerance tests."""
import argparse
import json
import logging
import os
import re
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
import numpy as np
from tabulate import tabulate
def parse_test_log(file_path):
def parse_test_log(
file_path: str,
) -> Tuple[Optional[float], Optional[List[str]]]:
"""
Parse test log for startup time and failure info.
Args:
file_path: Path to test.log.txt
Returns:
Tuple of (startup_time_seconds, failure_info)
"""
start_time = None
ready_time = None
fault_time = None
start_cmd: Optional[List[str]] = None
if not os.path.isfile(file_path):
return None, None, None
with open(file_path, "r") as f:
for line in f:
line = line.strip()
if "Starting Deployment fault-tolerance-test with spec" in line:
start_time = datetime.fromisoformat(
line.split(" ")[1].replace("T", " ")
)
start_cmd = []
elif "Deployment fault-tolerance-test is ready" in line:
ready_time = datetime.fromisoformat(
line.split(" ")[1].replace("T", " ")
)
elif "Injecting failure for:" in line:
fault_time = datetime.fromisoformat(
line.split(" ")[1].replace("T", " ")
)
startup_time = (
(ready_time - start_time).total_seconds() if start_time and ready_time else None
)
return startup_time, fault_time, start_cmd
failure_info: Optional[List[str]] = None
if not os.path.isfile(file_path):
return None, None
def parse_client_logs(test_dir, expected_length=100):
all_logs = []
for file in os.listdir(test_dir):
if file.startswith("client_") and file.endswith(".log.txt"):
with open(os.path.join(test_dir, file), "r") as f:
request_number = 0
with open(file_path, "r") as f:
for line in f:
request_number += 1
data = json.loads(line.strip())
for result in data["results"]:
log_entry = {
"time": datetime.fromisoformat(
data["time"].replace("T", " ")
),
"status": result["status"],
"request_elapsed_time": result["request_elapsed_time"],
"request_number": request_number - 1,
"client": file.split("_")[1].split(".")[0],
}
if (
"result" in result
and result["result"]
and "choices" in result["result"]
and result["result"]["choices"]
):
log_entry["success"] = True
if "content" in result["result"]["choices"][0]["message"]:
content = result["result"]["choices"][0]["message"][
"content"
]
elif (
"reasoning_content"
in result["result"]["choices"][0]["message"]
):
content = result["result"]["choices"][0]["message"][
"reasoning_content"
# Extract timestamp using regex to handle different log formats
timestamp_match = re.search(r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})", line)
# Look for deployment start
if "Starting Deployment" in line and timestamp_match:
timestamp = timestamp_match.group(1)
start_time = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S")
# Look for deployment ready
if "Deployment fault-tolerance-test is ready" in line and timestamp_match:
timestamp = timestamp_match.group(1)
ready_time = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S")
# Look for fault injection
if "Injecting failure for:" in line:
# Extract failure details
match = re.search(r"Failure\((.*?)\)", line)
if match:
failure_str = match.group(1)
parts = failure_str.split(", ")
failure_dict = {}
for part in parts:
key_val = part.split("=")
if len(key_val) == 2:
failure_dict[key_val[0]] = key_val[1]
# Build command list from failure info
if failure_dict:
failure_info = [
failure_dict.get("pod_name", "unknown").strip("'\""),
failure_dict.get("command", "unknown").strip("'\""),
]
if not content or len(content) < expected_length:
log_entry["success"] = False
else:
log_entry["success"] = False
all_logs.append(log_entry)
if len(all_logs):
df = pd.DataFrame(all_logs)
df.sort_values("time", inplace=True)
return df
# Calculate startup time in seconds
startup_time = None
if start_time and ready_time:
startup_time = (ready_time - start_time).total_seconds()
return None
return startup_time, failure_info
def calculate_metrics(df, fault_time, sla=None):
if fault_time:
before_fault = df[df["time"] <= fault_time]
after_fault = df[df["time"] > fault_time]
else:
before_fault = df
after_fault = None
# Existing latency metrics (only successful requests)
successful_before = before_fault[before_fault["success"]]
avg_before = successful_before["request_elapsed_time"].mean()
std_before = successful_before["request_elapsed_time"].std()
success_before_count = before_fault["success"].sum()
failure_before_count = len(before_fault) - success_before_count
avg_after, std_after, success_after_count, failure_after_count = (
None,
None,
None,
None,
)
if after_fault is not None and not after_fault.empty:
successful_after = after_fault[after_fault["success"]]
avg_after = successful_after["request_elapsed_time"].mean()
std_after = successful_after["request_elapsed_time"].std()
success_after_count = after_fault["success"].sum()
failure_after_count = len(after_fault) - success_after_count
if sla:
# SLA violations (only successful requests exceeding the SLA)
violations_before = (successful_before["request_elapsed_time"] > sla).sum()
violations_after = (
(successful_after["request_elapsed_time"] > sla).sum()
if after_fault is not None and not after_fault.empty
else None
)
else:
violations_before = None
violations_after = None
return (
success_before_count,
failure_before_count,
success_after_count,
failure_after_count,
avg_before,
std_before,
avg_after,
std_after,
violations_before,
violations_after,
)
def calculate_recovery_time(
failure_info: Optional[List[str]],
process_logs_dir: str,
) -> Optional[float]:
"""
Calculate recovery time by comparing last timestamp in .previous.log with first in current log.
This avoids timezone issues between test.log.txt and container logs.
Args:
failure_info: List with [pod_name, command] from fault injection
process_logs_dir: Directory containing process log files
def parse_process_log(log_dir, process_name):
process_ready_pattern = {
"Frontend": re.compile(r"added model"),
"VllmDecodeWorker": re.compile(
r"VllmWorker for (?P<model_name>.*?) has been initialized"
),
"VllmPrefillWorker": re.compile(
r"VllmWorker for (?P<model_name>.*?) has been initialized"
),
}
if not os.path.isdir(log_dir):
return {}
ready_times: Dict[str, List[Tuple[datetime, str, float]]] = {}
for entry in os.listdir(log_dir):
if entry.endswith(".log") and "metrics" not in entry:
replica_number = entry.split(".")[0]
Returns:
Recovery time in seconds or None if not found
"""
if not failure_info:
return None
if replica_number not in ready_times:
ready_times[replica_number] = []
# Determine component type from failure info (strip any quotes)
component_type = failure_info[0].strip("'\"") # e.g., "Frontend" or "decode"
component_dir = os.path.join(process_logs_dir, component_type)
process_start_time = None
if not os.path.exists(component_dir):
return None
with open(os.path.join(log_dir, entry), "r") as f:
for line in f:
line = line.strip()
if not line:
continue
last_timestamp_before = None
first_timestamp_after = None
# Try to parse as JSONL first
# Find the last timestamp from .previous.log (container before restart)
for log_file in os.listdir(component_dir):
if log_file.endswith(".previous.log"):
log_path = os.path.join(component_dir, log_file)
try:
json_data = json.loads(line)
# Extract timestamp and message from JSON format
if "time" in json_data:
timestamp = datetime.fromisoformat(
json_data["time"].replace("Z", "")
with open(log_path, "r") as f:
# Read last few lines to find last valid timestamp
lines = f.readlines()
for line in reversed(lines[-10:]): # Check last 10 lines
if '"time":"' in line:
try:
log_entry = json.loads(line)
timestamp_str = log_entry.get("time", "")[:19]
last_timestamp_before = datetime.strptime(
timestamp_str, "%Y-%m-%dT%H:%M:%S"
)
log_message = json_data.get("message", "")
else:
continue
except (json.JSONDecodeError, ValueError, KeyError):
# Fall back to readable format parsing
clean_line = re.sub(
r"\x1b\[.*?m", "", line
) # Remove ANSI codes
if not clean_line:
continue
parts = clean_line.split()
if len(parts) < 2:
break
except (json.JSONDecodeError, ValueError):
continue
except IOError as e:
logging.debug(f"Could not read {log_file}: {e}")
# Find the first timestamp from current container log
for log_file in os.listdir(component_dir):
if log_file.endswith(".log") and not log_file.endswith(
(".previous.log", ".metrics.log")
):
log_path = os.path.join(component_dir, log_file)
try:
# Parse timestamp (remove 'Z' for naive datetime)
timestamp = datetime.fromisoformat(
parts[0].replace("Z", "")
with open(log_path, "r") as f:
first_line = f.readline()
if first_line and '"time":"' in first_line:
log_entry = json.loads(first_line)
timestamp_str = log_entry.get("time", "")[:19]
first_timestamp_after = datetime.strptime(
timestamp_str, "%Y-%m-%dT%H:%M:%S"
)
except (json.JSONDecodeError, ValueError, IOError) as e:
logging.debug(f"Could not parse timestamp from {log_file}: {e}")
# Calculate recovery time from container timestamps (both in UTC)
if last_timestamp_before and first_timestamp_after:
recovery_time = (first_timestamp_after - last_timestamp_before).total_seconds()
# Sanity check - recovery should be seconds/minutes, not hours
if recovery_time > 3600: # More than 1 hour is likely wrong
logging.warning(
f"Recovery time {recovery_time}s seems too large, possible timezone issue"
)
except ValueError:
return recovery_time
return None
def parse_aiperf_client_results(log_dir: str) -> Dict[str, Any]:
"""
Parse AI-Perf results from all client directories.
Args:
log_dir: Directory containing client result directories
Returns:
Dictionary with aggregated metrics and client count
"""
all_metrics: Dict[str, Any] = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"latencies": [],
"ttft": [], # Time to First Token
"itl": [], # Inter-Token Latency
"throughputs": [],
"p50_latencies": [],
"p90_latencies": [],
"p99_latencies": [],
"num_clients": 0,
}
# Iterate over actual client directories
for item in sorted(os.listdir(log_dir)):
if not item.startswith("client_") or not os.path.isdir(
os.path.join(log_dir, item)
):
continue
log_message = " ".join(parts[1:])
if not process_start_time:
process_start_time = timestamp
client_dir = Path(log_dir) / item
all_metrics["num_clients"] += 1
# Look for AI-Perf results in attempt directories
profile_json = None
relative_time = (timestamp - process_start_time).total_seconds()
# Check for attempt directories (attempt_0, attempt_1, etc.)
for attempt_dir in sorted(client_dir.glob("attempt_*")):
json_path = attempt_dir / "profile_export_aiperf.json"
if json_path.exists():
profile_json = json_path
break # Use the first successful attempt
# Check for process start lines
if process_name in process_ready_pattern:
if process_ready_pattern[process_name].search(log_message):
if "previous" in entry:
location = 0
if not profile_json:
logging.warning(f"No AI-Perf results found for {item} in {client_dir}")
else:
location = -1
ready_times[replica_number].insert(
location, (timestamp, log_message, relative_time)
try:
with open(profile_json) as f:
client_metrics = json.load(f)
# AI-Perf format has "records" dictionary at the top level
records = client_metrics.get("records", {})
# Extract request count (this is the total requests made)
request_count_record = records.get("request_count", {})
request_count = (
int(request_count_record.get("avg", 0))
if request_count_record
else 0
)
return ready_times
# Check for errors in error_summary
error_summary = client_metrics.get("error_summary", [])
error_count = len(error_summary)
# Check if test was cancelled
was_cancelled = client_metrics.get("was_cancelled", False)
if was_cancelled:
error_count = request_count # Mark all as failed if cancelled
all_metrics["total_requests"] += request_count
all_metrics["successful_requests"] += request_count - error_count
all_metrics["failed_requests"] += error_count
# Extract latency from request_latency record
request_latency = records.get("request_latency", {})
if request_latency:
# Convert milliseconds to seconds for consistency
if "avg" in request_latency:
all_metrics["latencies"].append(request_latency["avg"] / 1000.0)
if "p50" in request_latency:
all_metrics["p50_latencies"].append(
request_latency["p50"] / 1000.0
)
if "p90" in request_latency:
all_metrics["p90_latencies"].append(
request_latency["p90"] / 1000.0
)
if "p99" in request_latency:
all_metrics["p99_latencies"].append(
request_latency["p99"] / 1000.0
)
def calculate_recovery_time(test_dir, failure_type, fault_time):
if not fault_time:
return None
# Time to first token (if available in records)
ttft = records.get("time_to_first_token", {}) or records.get("ttft", {})
if ttft and "avg" in ttft:
all_metrics["ttft"].append(ttft["avg"] / 1000.0) # Convert ms to s
# Inter-token latency (if available in records)
itl = records.get("inter_token_latency", {}) or records.get("itl", {})
if itl and "avg" in itl:
all_metrics["itl"].append(itl["avg"] / 1000.0) # Convert ms to s
# Throughput from request_throughput record
request_throughput = records.get("request_throughput", {})
req_throughput = request_throughput.get("avg", 0)
if req_throughput:
all_metrics["throughputs"].append(req_throughput)
except Exception as e:
logging.error(f"Error parsing {item} results: {e}")
return all_metrics
def print_summary_table(
log_dir: str,
num_clients: int,
startup_time: Optional[float],
recovery_time: Optional[float],
metrics: Dict[str, Any],
tablefmt: str = "grid",
sla: Optional[float] = None,
) -> None:
"""
Print formatted summary table with AI-Perf metrics.
Args:
log_dir: Test directory path
num_clients: Number of client processes
startup_time: Time to start deployment (seconds)
recovery_time: Time to recover from fault (seconds)
metrics: Aggregated metrics from AI-Perf
tablefmt: Table format for output
sla: Service level agreement for latency (optional)
"""
headers = ["Metric", "Value"]
rows = []
processes = [
"Frontend",
"VllmDecodeWorker",
"VllmPrefillWorker",
]
# Test info
rows.append(["Test Directory", log_dir])
rows.append(["Number of Clients", str(num_clients)])
rows.append(["", ""])
process_start = {}
start_time = None
# Deployment metrics
rows.append(["=== Deployment Metrics ===", ""])
if startup_time:
rows.append(["Startup Time", f"{startup_time:.2f} sec"])
else:
rows.append(["Startup Time", "N/A"])
for process in processes:
starts = parse_process_log(os.path.join(test_dir, process), process)
if starts:
process_start[process] = starts
last_recovery_time = 0
for process, replicas in process_start.items():
for replica, container_starts in replicas.items():
for starts in container_starts:
start_time = starts[0]
recovery_time = (start_time - fault_time).total_seconds()
if recovery_time > last_recovery_time:
last_recovery_time = recovery_time
if last_recovery_time == 0:
return None
return last_recovery_time
if recovery_time:
rows.append(["Recovery Time", f"{recovery_time:.2f} sec"])
else:
rows.append(["Recovery Time", "N/A"])
rows.append(["", ""])
# Request metrics
rows.append(["=== Request Metrics ===", ""])
rows.append(["Total Requests", metrics["total_requests"]])
rows.append(["Successful Requests", metrics["successful_requests"]])
rows.append(["Failed Requests", metrics["failed_requests"]])
if metrics["total_requests"] > 0:
success_rate = (
metrics["successful_requests"] / metrics["total_requests"]
) * 100
rows.append(["Success Rate", f"{success_rate:.2f}%"])
rows.append(["", ""])
# Latency metrics
rows.append(["=== Latency Metrics (seconds) ===", ""])
if metrics["latencies"]:
mean_latency = np.mean(metrics["latencies"])
rows.append(["Mean Latency", f"{mean_latency:.3f}"])
# Check SLA if provided
if sla is not None:
sla_status = "✓ PASS" if mean_latency <= sla else "✗ FAIL"
rows.append(["SLA Status", f"{sla_status} (target: {sla:.3f}s)"])
if metrics["p50_latencies"]:
rows.append(["P50 Latency", f"{np.mean(metrics['p50_latencies']):.3f}"])
if metrics["p90_latencies"]:
rows.append(["P90 Latency", f"{np.mean(metrics['p90_latencies']):.3f}"])
if metrics["p99_latencies"]:
rows.append(["P99 Latency", f"{np.mean(metrics['p99_latencies']):.3f}"])
rows.append(["", ""])
# Token generation metrics
rows.append(["=== Token Generation Metrics ===", ""])
if metrics["ttft"]:
rows.append(
["Time to First Token (mean)", f"{np.mean(metrics['ttft']):.3f} sec"]
)
if metrics["itl"]:
rows.append(
["Inter-Token Latency (mean)", f"{np.mean(metrics['itl']):.4f} sec"]
)
rows.append(["", ""])
def process_test_directory(test_dir, sla):
if "test_fault_scenario" not in test_dir:
return {}
test_name = test_dir.split("test_fault_scenario[", 1)[1].rstrip("]")
failure_type = test_name.split("-")[-1]
test_prefix = "-".join(test_name.split("-")[:-1])
# Throughput metrics
rows.append(["=== Throughput Metrics ===", ""])
startup_time, fault_time, start_cmd = parse_test_log(
os.path.join(test_dir, "test.log.txt")
if metrics["throughputs"]:
total_throughput = sum(metrics["throughputs"])
rows.append(["Total Throughput", f"{total_throughput:.2f} req/s"])
rows.append(
["Avg Client Throughput", f"{np.mean(metrics['throughputs']):.2f} req/s"]
)
df = parse_client_logs(test_dir)
if df is None or df.empty:
return None
(
success_before,
failure_before,
success_after,
failure_after,
avg_before,
std_before,
avg_after,
std_after,
violations_before,
violations_after,
) = calculate_metrics(df, fault_time, sla)
recovery_time = calculate_recovery_time(test_dir, failure_type, fault_time)
# Print table
print("\n" + "=" * 60)
print("FAULT TOLERANCE TEST SUMMARY - AI-PERF")
print("=" * 60)
print(tabulate(rows, headers=headers, tablefmt=tablefmt))
print("=" * 60 + "\n")
def process_single_test(
log_dir: str, tablefmt: str = "grid", sla: Optional[float] = None
) -> Dict[str, Any]:
"""
Process a single test log directory.
Args:
log_dir: Directory containing test results
tablefmt: Table format for output
sla: Service level agreement for latency (optional)
Returns:
Dictionary with test results
"""
# Parse test configuration
test_log = os.path.join(log_dir, "test.log.txt")
startup_time, failure_info = parse_test_log(test_log)
# Calculate recovery time only if fault was injected
recovery_time = None
if failure_info:
recovery_time = calculate_recovery_time(failure_info, log_dir)
# Parse AI-Perf results (also counts clients)
metrics = parse_aiperf_client_results(log_dir)
# Extract client count from metrics
num_clients = metrics.get("num_clients", 0)
# Print summary
print_summary_table(
log_dir, num_clients, startup_time, recovery_time, metrics, tablefmt, sla
)
return {
"test": test_prefix,
"cmd": start_cmd,
"failure": failure_type,
"start_time": startup_time,
"success_before_requests": success_before,
"failed_before_requests": failure_before,
"success_after_requests": success_after,
"failed_after_requests": failure_after,
"avg_latency_before": avg_before,
"std_latency_before": std_before,
"avg_latency_after": avg_after,
"std_latency_after": std_after,
"violations_before": violations_before,
"violations_after": violations_after,
"log_dir": log_dir,
"num_clients": num_clients,
"startup_time": startup_time,
"recovery_time": recovery_time,
"metrics": metrics,
}
def main(logs_dir, tablefmt, log_paths=None, sla=None):
results = []
def main(
logs_dir: Optional[str] = None,
log_paths: Optional[List[str]] = None,
tablefmt: str = "grid",
sla: Optional[float] = None,
):
"""
Main parser entry point with support for multiple log paths.
Args:
logs_dir: Base directory for logs (optional)
log_paths: List of log directories to process
tablefmt: Table format for output
sla: Service level agreement for latency (optional)
Returns:
Combined results from all processed tests
"""
# Handle different input formats
if log_paths:
# Process multiple log paths
all_results = []
for log_path in log_paths:
result = process_test_directory(log_path, sla)
if result:
results.append(result)
elif logs_dir:
for entry in os.listdir(logs_dir):
if entry.startswith("test_fault_scenario[") and os.path.isdir(
os.path.join(logs_dir, entry)
):
result = process_test_directory(os.path.join(logs_dir, entry), sla)
if result:
results.append(result)
# Group results by test prefix
grouped: dict[str, list[dict[str, Any]]] = {}
commands = {}
for res in results:
test_prefix = res["test"]
if test_prefix not in grouped:
grouped[test_prefix] = []
commands[test_prefix] = res["cmd"]
grouped[test_prefix].append(res)
order = [
"none",
"frontend",
"frontend_pod",
"decode_worker",
"decode_worker_pod",
"prefill_worker",
"prefill_worker_pod",
"vllm_decode_engine_core",
"vllm_prefill_engine_core",
]
# Print grouped tables
for test_prefix, group in grouped.items():
new_group = []
for failure in order:
for res in group:
if failure == res["failure"]:
new_group.append(res)
group = new_group
if sla:
headers = [
"Failure",
"Startup",
"Success\nBefore",
"Failed\nBefore",
"Success\nAfter",
"Failed\nAfter",
"Latency\nBefore",
"Latency\nAfter",
"Violations\nBefore",
"Violations\nAfter",
"Recovery",
]
if logs_dir:
full_path = os.path.join(logs_dir, log_path)
else:
headers = [
"Failure",
"Startup",
"Success\nBefore",
"Failed\nBefore",
"Success\nAfter",
"Failed\nAfter",
"Latency\nBefore",
"Latency\nAfter",
"Recovery",
]
rows = []
for res in group:
if sla:
row = [
res["failure"],
res["start_time"], # if res["start_time"] is not None else "N/A",
res["success_before_requests"],
res["failed_before_requests"],
res["success_after_requests"],
res["failed_after_requests"],
res["avg_latency_before"],
res["avg_latency_after"],
res["violations_before"],
res["violations_after"],
res["recovery_time"],
]
full_path = log_path
if os.path.isdir(full_path):
print(f"\nProcessing: {full_path}")
results = process_single_test(full_path, tablefmt, sla)
all_results.append(results)
else:
row = [
res["failure"],
res["start_time"], # if res["start_time"] is not None else "N/A",
res["success_before_requests"],
res["failed_before_requests"],
res["success_after_requests"],
res["failed_after_requests"],
res["avg_latency_before"],
res["avg_latency_after"],
res["recovery_time"],
]
rows.append(row)
print(f"Warning: {full_path} is not a valid directory, skipping...")
print(f"\nTest Group: {test_prefix}")
# print(f"\nTest Command: {commands[test_prefix]}")
print(
tabulate(
rows,
headers,
tablefmt=tablefmt,
floatfmt=".2f",
missingval="N/A",
numalign="right",
stralign="center",
# If multiple tests, also print combined summary
if len(all_results) > 1:
print("\n" + "=" * 60)
print("COMBINED TEST SUMMARY")
print("=" * 60)
total_requests = sum(r["metrics"]["total_requests"] for r in all_results)
total_successful = sum(
r["metrics"]["successful_requests"] for r in all_results
)
total_failed = sum(r["metrics"]["failed_requests"] for r in all_results)
print(f"Total Tests: {len(all_results)}")
print(f"Total Requests: {total_requests}")
print(f"Total Successful: {total_successful}")
print(f"Total Failed: {total_failed}")
if total_requests > 0:
print(
f"Overall Success Rate: {(total_successful/total_requests)*100:.2f}%"
)
print("\n" + "=" * 80)
print("=" * 60 + "\n")
return all_results
elif logs_dir:
# Process single directory
return process_single_test(logs_dir, tablefmt, sla)
else:
print("Error: Must provide either logs_dir or log_paths")
return None
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Parse test results")
parser.add_argument("--log-dir", default=".", help="Path to the logs directory")
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
parser = argparse.ArgumentParser(description="Parse fault tolerance test results")
parser.add_argument(
"--format", choices=["fancy", "markdown"], default="fancy", help="Table format"
"log_dir", type=str, help="Directory containing test logs and results"
)
parser.add_argument("--sla", type=float, default=None)
args = parser.parse_args()
# Map format choices to tabulate formats
tablefmt = (
"fancy_grid" if args.format == "fancy" else "pipe"
) # Using pipe for markdown compatibility
if not os.path.isdir(args.log_dir):
logging.error(f"{args.log_dir} is not a valid directory")
exit(1)
main(args.log_dir, tablefmt, args.sla)
main(args.log_dir)
......@@ -13,11 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from dataclasses import dataclass
from typing import Optional
from typing import Dict, Optional, Pattern
from tests.utils.managed_deployment import DeploymentSpec
# Worker name mapping for different backends
WORKER_MAP = {
"vllm": {
"decode": "VllmDecodeWorker",
......@@ -29,6 +31,51 @@ WORKER_MAP = {
},
}
# Process ready patterns for recovery detection
WORKER_READY_PATTERNS: Dict[str, Pattern] = {
# Frontend
"Frontend": re.compile(r"added model"),
# vLLM workers
"VllmDecodeWorker": re.compile(
r"VllmWorker for (?P<model_name>.*?) has been initialized"
),
"VllmPrefillWorker": re.compile(
r"VllmWorker for (?P<model_name>.*?) has been initialized"
),
# SGLang workers - look for their specific initialization messages
"decode": re.compile(
r"Model registration succeeded|Decode worker handler initialized|Worker handler initialized"
),
"prefill": re.compile(
r"Model registration succeeded|Prefill worker handler initialized|Worker handler initialized"
),
}
def get_all_worker_types() -> list[str]:
"""Get all worker type names for both vLLM and SGLang."""
worker_types = ["Frontend"]
for backend in WORKER_MAP.values():
worker_types.extend(backend.values())
# Remove duplicates while preserving order
seen = set()
result = []
for x in worker_types:
if x not in seen:
seen.add(x)
result.append(x)
return result
def get_worker_ready_pattern(worker_name: str) -> Optional[Pattern]:
"""Get the ready pattern for a specific worker type."""
return WORKER_READY_PATTERNS.get(worker_name)
def get_backend_workers(backend: str) -> Dict[str, str]:
"""Get worker mapping for a specific backend."""
return WORKER_MAP.get(backend, {})
@dataclass
class Load:
......@@ -36,8 +83,7 @@ class Load:
requests_per_client: int = 150
input_token_length: int = 100
output_token_length: int = 100
max_retries: int = 1
max_request_rate: float = 1
max_retries: int = 3 # Increased for fault tolerance
sla: Optional[float] = None
......
......@@ -31,7 +31,7 @@ def _clients(
input_token_length,
output_token_length,
max_retries,
max_request_rate,
retry_delay=5, # Default 5 seconds between retries
):
procs = []
ctx = multiprocessing.get_context("spawn")
......@@ -49,7 +49,7 @@ def _clients(
input_token_length,
output_token_length,
max_retries,
max_request_rate,
retry_delay,
),
)
)
......@@ -178,6 +178,5 @@ async def test_fault_scenario(
scenario.load.input_token_length,
scenario.load.output_token_length,
scenario.load.max_retries,
scenario.load.max_request_rate,
):
_inject_failures(scenario.failures, logger, deployment)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment