Unverified Commit f414bc5a authored by Indrajit Bhosale's avatar Indrajit Bhosale Committed by GitHub
Browse files

feat: Add legacy client along with aiperf for FT tests (#3415)


Signed-off-by: default avatarIndrajit Bhosale <iamindrajitb@gmail.com>
parent 6bd3ae2a
......@@ -46,7 +46,10 @@ def _single_request(
start_time = time.time()
results = []
while retry_attempts:
# Convert retries to total attempts (1 initial attempt + N retries)
attempts_remaining = 1 + max(0, retry_attempts)
while attempts_remaining:
start_request_time = time.time()
try:
......@@ -72,12 +75,15 @@ def _single_request(
}
)
if response.status_code != 200:
time.sleep(retry_delay)
retry_attempts -= 1
continue
else:
# Success - exit immediately
if response.status_code == 200:
break
# Failure - retry if we have attempts left
attempts_remaining -= 1
if attempts_remaining == 0:
break
time.sleep(retry_delay)
except (requests.RequestException, requests.Timeout) as e:
results.append(
......@@ -87,10 +93,13 @@ def _single_request(
"request_elapsed_time": time.time() - start_request_time,
}
)
# Exception - retry if we have attempts left
attempts_remaining -= 1
if attempts_remaining == 0:
break
logger.warning("Retrying due to Request failed: %s", e)
time.sleep(retry_delay)
retry_attempts -= 1
continue
return {
"time": datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
......
......@@ -138,7 +138,9 @@ pytest tests/fault_tolerance/deploy/test_deployment.py -s -v --namespace ${NAMES
### Test Results Directory
For each test scenario a directory of log files is created and post-processed to summarize the test.
For each test scenario a directory of log files is created and post-processed to summarize the test. The directory structure differs based on which client type is used.
#### AI-Perf Client Output Structure (Default)
```
test_fault_scenario[sglang-agg-tp-1-dp-1-frontend]
......@@ -163,7 +165,6 @@ test_fault_scenario[sglang-agg-tp-1-dp-1-frontend]
├── decode/ # Or VllmDecodeWorker for vLLM backend
│ └── [same structure as Frontend]
└── test.log.txt
```
| File/Directory Name | Description |
......@@ -178,6 +179,46 @@ test_fault_scenario[sglang-agg-tp-1-dp-1-frontend]
| **{Service}/*.yaml** | Pod specification and status transitions |
| **test.log.txt** | Primary test execution log (deployment timing, fault injection, recovery events) |
#### Legacy Client Output Structure (with `--client-type legacy`)
```
test_fault_scenario[sglang-agg-tp-1-dp-1-frontend]
.
├── client_0.log.txt # JSONL format: one request per line
├── client_1.log.txt # Direct HTTP request/response logs
├── client_2.log.txt
├── client_3.log.txt
├── client_4.log.txt
├── client_5.log.txt
├── client_6.log.txt
├── client_7.log.txt
├── client_8.log.txt
├── client_9.log.txt
├── Frontend/
│ ├── fault-tolerance-test-frontend-576bd784dc-jv68q.log
│ ├── fault-tolerance-test-frontend-576bd784dc-jv68q.metrics.log
│ ├── fault-tolerance-test-frontend-576bd784dc-jv68q.previous.log # Pre-restart logs
│ └── fault-tolerance-test-frontend-576bd784dc-jv68q.yaml
├── decode/ # Or VllmDecodeWorker for vLLM backend
│ └── [same structure as Frontend]
└── test.log.txt
```
| File Name | Description |
|--------------------------------|------------------------------------------------------------------------------------------------|
| **client_N.log.txt** | JSONL format logs with one request/response per line (per-request retry support) |
| **{Service}/*.log** | Current container log for pod (Frontend, decode, etc.) |
| **{Service}/*.previous.log** | Previous container log before restart (contains pre-fault logs) |
| **{Service}/*.metrics.log** | Prometheus metrics from `/metrics` endpoint |
| **{Service}/*.yaml** | Pod specification and status transitions |
| **test.log.txt** | Primary test execution log (deployment timing, fault injection, recovery events) |
**Example JSONL content in `client_N.log.txt`:**
```json
{"time": "2025-10-03T10:30:45", "results": [{"status": 200, "request_elapsed_time": 1.23, "url": "http://localhost:8000/v1/chat/completions", "pod": "frontend-pod"}], "total_time": 1.25}
{"time": "2025-10-03T10:30:47", "results": [{"status": 200, "request_elapsed_time": 1.18, "url": "http://localhost:8000/v1/chat/completions", "pod": "frontend-pod"}], "total_time": 1.20}
```
### Summary Results
Results are parsed from AI-Perf metrics and presented in table format after each test. The parsing script (`parse_results.py`) extracts comprehensive metrics for each scenario:
......@@ -458,3 +499,100 @@ curl -sL https://aka.ms/InstallAzureCLIDeb
az aks install-cli
az login
```
## Dual Client Implementation for Fault Tolerance Tests
### Overview
This document describes the implementation of dual client support for fault tolerance tests, allowing tests to use either the **AI-Perf** client or the **legacy custom client**.
### Motivation
A requirement to support both clients simultaneously for:
- Comparing performance and results between implementations
- Gradual migration path from legacy to AI-Perf
- Supporting different use cases (AI-Perf for comprehensive metrics, legacy for simple testing)
### Architecture
The implementation uses a **factory pattern** to cleanly separate client implementations and parsers while providing a unified interface.
```
┌─────────────────────────────────────────────────────────────┐
│ test_deployment.py │
│ (Test Runner) │
└──────────────────────┬──────────────────────┬────────────────┘
│ │
├──────────────────────┤
│ │
┌────────────▼─────────┐ ┌─────────▼──────────┐
│ client_factory.py │ │ parse_factory.py │
│ (Client Selection) │ │ (Parser Selection) │
└──────┬───────┬───────┘ └──────┬──────┬──────┘
│ │ │ │
┌───────▼───┐ ┌─▼──────────┐ ┌──▼──────▼───────────┐
│ client.py │ │legacy_ │ │parse_ │legacy_ │
│ (AI-Perf) │ │client.py │ │results │parse_ │
└───────────┘ └────────────┘ │.py │results.py │
└─────────┴───────────┘
```
### Usage
#### Running Tests with Command-Line Option
The client type can be dynamically selected using the `--client-type` pytest argument:
##### **Using AI-Perf Client (Default)**
```bash
# Default - no flag needed
pytest tests/fault_tolerance/deploy/test_deployment.py -s -v \
--namespace ${NAMESPACE} \
--image ${IMAGE}
# Or explicitly specify
pytest tests/fault_tolerance/deploy/test_deployment.py -s -v \
--namespace ${NAMESPACE} \
--image ${IMAGE} \
--client-type aiperf
```
##### **Using Legacy Client**
```bash
pytest tests/fault_tolerance/deploy/test_deployment.py -s -v \
--namespace ${NAMESPACE} \
--image ${IMAGE} \
--client-type legacy
```
##### **Single Test with Legacy Client**
```bash
pytest tests/fault_tolerance/deploy/test_deployment.py::test_fault_scenario[vllm-agg-tp-1-dp-1-none] -s -v \
--namespace test-ft \
--image your-image:tag \
--client-type legacy
```
Legacy Output Format
```bash
PASSED[TEST] 2025-10-03T21:02:21 INFO root: Using legacy parser for results
Test Group: vllm-agg-tp-1-dp-2
╒═══════════════════╤═══════════╤═══════════╤══════════╤═══════════╤══════════╤═══════════╤═══════════╤════════════╕
│ Failure │ Startup │ Success │ Failed │ Success │ Failed │ Latency │ Latency │ Recovery │
│ │ │ Before │ Before │ After │ After │ Before │ After │ │
╞═══════════════════╪═══════════╪═══════════╪══════════╪═══════════╪══════════╪═══════════╪═══════════╪════════════╡
│ decode_worker_pod │ 178.00 │ 149.00 │ 0.00 │ 1349.00 │ 2.00 │ 1.19 │ 1.19 │ 163.90 │
╘═══════════════════╧═══════════╧═══════════╧══════════╧═══════════╧══════════╧═══════════╧═══════════╧════════════╛
================================================================================
[TEST] 2025-10-03T21:02:22 INFO root: Using legacy parser for results
Test Group: vllm-agg-tp-1-dp-2
╒═══════════════════╤═══════════╤═══════════╤══════════╤═══════════╤══════════╤═══════════╤═══════════╤════════════╕
│ Failure │ Startup │ Success │ Failed │ Success │ Failed │ Latency │ Latency │ Recovery │
│ │ │ Before │ Before │ After │ After │ Before │ After │ │
╞═══════════════════╪═══════════╪═══════════╪══════════╪═══════════╪══════════╪═══════════╪═══════════╪════════════╡
│ decode_worker_pod │ 178.00 │ 149.00 │ 0.00 │ 1349.00 │ 2.00 │ 1.19 │ 1.19 │ 163.90 │
╘═══════════════════╧═══════════╧═══════════╧══════════╧═══════════╧══════════╧═══════════╧═══════════╧════════════╛
```
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Factory module for selecting client implementation (legacy or AI-Perf)."""
from typing import Callable
def get_client_function(client_type: str) -> Callable:
"""Get the appropriate client function based on client type.
This factory function returns the correct client implementation without
requiring the caller to know the internal module structure.
Args:
client_type: Type of client to use. Valid options:
- "aiperf": Use AI-Perf for load generation (default)
- "legacy": Use legacy custom HTTP client
Returns:
Client function that matches the signature:
client(
deployment_spec,
namespace,
model,
log_dir,
index,
requests_per_client,
input_token_length,
output_token_length,
max_retries,
retry_delay_or_rate, # Differs between implementations
)
Raises:
ValueError: If client_type is not recognized
Example:
>>> client_func = get_client_function("aiperf")
>>> client_func(deployment_spec, namespace, model, ...)
>>> legacy_func = get_client_function("legacy")
>>> legacy_func(deployment_spec, namespace, model, ...)
"""
if client_type == "aiperf":
from tests.fault_tolerance.deploy.client import client as aiperf_client
return aiperf_client
elif client_type == "legacy":
from tests.fault_tolerance.deploy.legacy_client import client as legacy_client
return legacy_client
else:
raise ValueError(
f"Unknown client type: '{client_type}'. "
f"Valid options are: 'aiperf', 'legacy'"
)
def get_supported_client_types() -> list[str]:
"""Get list of all supported client types.
Returns:
List of valid client type strings
"""
return ["aiperf", "legacy"]
def validate_client_type(client_type: str) -> bool:
"""Validate that a client type is supported.
Args:
client_type: Client type string to validate
Returns:
True if valid, False otherwise
"""
return client_type in get_supported_client_types()
def get_client_description(client_type: str) -> str:
"""Get a human-readable description of a client type.
Args:
client_type: Client type to describe
Returns:
Description string
Raises:
ValueError: If client_type is not recognized
"""
descriptions = {
"aiperf": (
"AI-Perf client: Uses the AI-Perf CLI tool for load generation. "
"Provides comprehensive metrics including P50/P90/P99 latencies, "
"TTFT (Time to First Token), ITL (Inter-Token Latency), and throughput. "
"Outputs results in JSON/CSV format with retry support at the test level."
),
"legacy": (
"Legacy custom client: Direct HTTP request loop with per-request retry logic. "
"Logs results in JSONL format with basic latency and status tracking. "
"Includes rate limiting and round-robin pod selection."
),
}
if client_type not in descriptions:
raise ValueError(
f"Unknown client type: '{client_type}'. "
f"Valid options are: {', '.join(get_supported_client_types())}"
)
return descriptions[client_type]
......@@ -19,6 +19,13 @@ import pytest
def pytest_addoption(parser):
parser.addoption("--image", type=str, default=None)
parser.addoption("--namespace", type=str, default="fault-tolerance-test")
parser.addoption(
"--client-type",
type=str,
default=None,
choices=["aiperf", "legacy"],
help="Client type for load generation: 'aiperf' (default) or 'legacy'",
)
@pytest.fixture
......@@ -29,3 +36,9 @@ def image(request):
@pytest.fixture
def namespace(request):
return request.config.getoption("--namespace")
@pytest.fixture
def client_type(request):
"""Get client type from command line or use scenario default."""
return request.config.getoption("--client-type")
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Legacy custom client implementation for fault tolerance testing.
This is the original client implementation that was used before migrating to AI-Perf.
It sends direct HTTP requests and logs results in JSONL format.
"""
import json
import logging
import os
import random
import time
from copy import deepcopy
from datetime import datetime
from typing import Any, Dict
import requests
from tests.utils.managed_deployment import ManagedDeployment
LOG_FORMAT = "[TEST] %(asctime)s %(levelname)s %(name)s: %(message)s"
DATE_FORMAT = "%Y-%m-%dT%H:%M:%S"
# Base payload template for chat completions
payload = {
"model": "",
"messages": [
{
"role": "user",
"content": "",
}
],
"max_tokens": 0,
"temperature": 0.1,
"ignore_eos": True,
"min_tokens": 0,
"stream": False,
}
# Configure logging
logging.basicConfig(
level=logging.INFO,
format=LOG_FORMAT,
datefmt=DATE_FORMAT,
)
def _get_random_prompt(length):
"""Generate a random prompt of specified token length.
Args:
length: Approximate number of tokens to generate
Returns:
String containing random words
"""
word_list = [f"{i}" for i in range(10)]
return " ".join(random.choices(word_list, k=length))
def _single_request(
url,
pod,
payload,
model,
logger,
retry_attempts=1,
input_token_length=100,
output_token_length=100,
timeout=30,
retry_delay=1,
):
"""Execute a single HTTP request with retry logic.
Args:
url: Full URL to send request to
pod: Pod name for logging
payload: Base payload template
model: Model name to use
logger: Logger instance
retry_attempts: Number of retry attempts on failure
input_token_length: Number of input tokens
output_token_length: Number of output tokens
timeout: Request timeout in seconds
retry_delay: Delay between retries in seconds
Returns:
Dictionary containing request results and timing
"""
prompt = _get_random_prompt(input_token_length)
payload_copy = deepcopy(payload)
payload_copy["messages"][0]["content"] = prompt
payload_copy["max_tokens"] = output_token_length
payload_copy["min_tokens"] = output_token_length
payload_copy["model"] = model
response = None
end_time = None
start_time = time.time()
results = []
# Convert retries to total attempts (1 initial attempt + N retries)
attempts_remaining = 1 + max(0, retry_attempts)
while attempts_remaining:
start_request_time = time.time()
response = None
try:
response = requests.post(
url,
json=payload_copy,
timeout=timeout,
)
end_time = time.time()
content = None
if response.status_code == 200:
try:
content = response.json()
except ValueError:
pass
results.append(
{
"status": response.status_code,
"result": content,
"request_elapsed_time": end_time - start_request_time,
"url": url,
"pod": pod,
}
)
# Success - exit immediately
if response.status_code == 200:
break
# Failure - retry if we have attempts left
attempts_remaining -= 1
if attempts_remaining == 0:
break
time.sleep(retry_delay)
except (requests.RequestException, requests.Timeout) as e:
results.append(
{
"status": str(e),
"result": None,
"request_elapsed_time": time.time() - start_request_time,
"url": url,
"pod": pod,
}
)
# Exception - retry if we have attempts left
attempts_remaining -= 1
if attempts_remaining == 0:
break
time.sleep(retry_delay)
return {
"time": datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
"results": results,
"total_time": time.time() - start_time,
"url": url,
"pod": pod,
}
def client(
deployment_spec,
namespace,
model,
log_dir,
index,
requests_per_client,
input_token_length,
output_token_length,
max_retries,
max_request_rate,
retry_delay=1,
):
"""Legacy custom client for fault tolerance testing.
This client sends individual HTTP requests with rate limiting and logs
results in JSONL format. Each client runs independently and logs to
its own file.
Args:
deployment_spec: Deployment specification object
namespace: Kubernetes namespace
model: Model name to test
log_dir: Directory for output logs
index: Client index for identification
requests_per_client: Number of requests to send
input_token_length: Number of input tokens per request
output_token_length: Number of output tokens per request
max_retries: Maximum retry attempts per request
max_request_rate: Maximum requests per second (for rate limiting)
retry_delay: Delay in seconds between retries
"""
logger = logging.getLogger(f"CLIENT: {index}")
logging.getLogger("httpx").setLevel(logging.WARNING)
managed_deployment = ManagedDeployment(log_dir, deployment_spec, namespace)
pod_ports: Dict[str, Any] = {}
min_elapsed_time = (1 / max_request_rate) if max_request_rate > 0 else 0.0
try:
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, f"client_{index}.log.txt")
with open(log_path, "w") as log:
for i in range(requests_per_client):
# Get available pods
pods = managed_deployment.get_pods(
managed_deployment.frontend_service_name
)
port = 0
pod_name = None
pods_ready = []
# Filter ready pods and cleanup stale port forwards
for pod in pods[managed_deployment.frontend_service_name]:
if pod.ready():
pods_ready.append(pod)
else:
if pod.name in pod_ports:
pod_ports[pod.name].stop()
del pod_ports[pod.name]
# Setup port forwarding for selected pod
if pods_ready:
pod = pods_ready[i % len(pods_ready)]
if pod.name not in pod_ports:
port_forward = managed_deployment.port_forward(
pod, deployment_spec.port
)
if port_forward:
pod_ports[pod.name] = port_forward
if pod.name in pod_ports:
port = pod_ports[pod.name].local_port
pod_name = pod.name
# Construct URL
url = f"http://localhost:{port}/{deployment_spec.endpoint}"
# Execute request
result = _single_request(
url,
pod_name,
payload,
model,
logger,
max_retries,
input_token_length=input_token_length,
output_token_length=output_token_length,
retry_delay=retry_delay,
)
# Log result
logger.info(
f"Request: {i} Pod {pod_name} Local Port {port} "
f"Status: {result['results'][-1]['status']} "
f"Latency: {result['results'][-1]['request_elapsed_time']}"
)
# Write to JSONL log file
log.write(json.dumps(result) + "\n")
log.flush()
# Rate limiting
if result["total_time"] < min_elapsed_time:
time.sleep(min_elapsed_time - result["total_time"])
except Exception as e:
logger.error(str(e))
finally:
# Cleanup port forwards
for pf_name, port_forward in pod_ports.items():
try:
port_forward.stop()
except Exception:
pass
logger.info("Exiting")
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Parser for legacy client results (JSONL format) in fault tolerance tests."""
import argparse
import json
import logging
import os
import re
from datetime import datetime
from typing import Any, Dict, List, Optional
import pandas as pd
from tabulate import tabulate
def parse_test_log(file_path):
"""Parse test log for startup time and failure info.
Args:
file_path: Path to test.log.txt
Returns:
Tuple of (startup_time_seconds, fault_time_datetime, start_cmd)
"""
start_time = None
ready_time = None
fault_time = None
start_cmd: Optional[List[str]] = None
if not os.path.isfile(file_path):
return None, None, None
with open(file_path, "r") as f:
for line in f:
line = line.strip()
if "Starting Deployment fault-tolerance-test with spec" in line:
start_time = datetime.fromisoformat(
line.split(" ")[1].replace("T", " ")
)
start_cmd = []
elif "Deployment fault-tolerance-test is ready" in line:
ready_time = datetime.fromisoformat(
line.split(" ")[1].replace("T", " ")
)
elif "Injecting failure for:" in line:
fault_time = datetime.fromisoformat(
line.split(" ")[1].replace("T", " ")
)
startup_time = (
(ready_time - start_time).total_seconds() if start_time and ready_time else None
)
return startup_time, fault_time, start_cmd
def parse_client_logs(test_dir, expected_length=100):
"""Parse JSONL client logs from legacy client output.
Args:
test_dir: Directory containing client_N.log.txt files
expected_length: Expected output length to validate success
Returns:
pandas DataFrame with all client request results
"""
all_logs = []
for file in os.listdir(test_dir):
if file.startswith("client_") and file.endswith(".log.txt"):
with open(os.path.join(test_dir, file), "r") as f:
request_number = 0
for line in f:
request_number += 1
try:
data = json.loads(line.strip())
except json.JSONDecodeError:
continue
for result in data["results"]:
log_entry = {
"time": datetime.fromisoformat(
data["time"].replace("T", " ")
),
"status": result["status"],
"request_elapsed_time": result["request_elapsed_time"],
"request_number": request_number - 1,
"client": file.split("_")[1].split(".")[0],
}
# Check if request was successful
if (
"result" in result
and result["result"]
and "choices" in result["result"]
and result["result"]["choices"]
):
log_entry["success"] = True
# Extract content from response
if "content" in result["result"]["choices"][0]["message"]:
content = result["result"]["choices"][0]["message"][
"content"
]
elif (
"reasoning_content"
in result["result"]["choices"][0]["message"]
):
content = result["result"]["choices"][0]["message"][
"reasoning_content"
]
else:
content = ""
# Validate content length
if not content or len(content) < expected_length:
log_entry["success"] = False
else:
log_entry["success"] = False
all_logs.append(log_entry)
if len(all_logs):
df = pd.DataFrame(all_logs)
df.sort_values("time", inplace=True)
return df
return None
def calculate_metrics(df, fault_time, sla=None):
"""Calculate metrics before and after fault injection.
Args:
df: DataFrame with client request results
fault_time: Datetime when fault was injected
sla: Optional SLA threshold for latency violations
Returns:
Tuple of metrics (success_before, failure_before, success_after,
failure_after, avg_before, std_before, avg_after, std_after,
violations_before, violations_after)
"""
if fault_time:
before_fault = df[df["time"] <= fault_time]
after_fault = df[df["time"] > fault_time]
else:
before_fault = df
after_fault = None
# Latency metrics (only successful requests)
successful_before = before_fault[before_fault["success"]]
avg_before = successful_before["request_elapsed_time"].mean()
std_before = successful_before["request_elapsed_time"].std()
success_before_count = before_fault["success"].sum()
failure_before_count = len(before_fault) - success_before_count
avg_after, std_after, success_after_count, failure_after_count = (
None,
None,
None,
None,
)
if after_fault is not None and not after_fault.empty:
successful_after = after_fault[after_fault["success"]]
avg_after = successful_after["request_elapsed_time"].mean()
std_after = successful_after["request_elapsed_time"].std()
success_after_count = after_fault["success"].sum()
failure_after_count = len(after_fault) - success_after_count
# SLA violations (only successful requests exceeding the SLA)
if sla:
violations_before = (successful_before["request_elapsed_time"] > sla).sum()
violations_after = (
(successful_after["request_elapsed_time"] > sla).sum()
if after_fault is not None and not after_fault.empty
else None
)
else:
violations_before = None
violations_after = None
return (
success_before_count,
failure_before_count,
success_after_count,
failure_after_count,
avg_before,
std_before,
avg_after,
std_after,
violations_before,
violations_after,
)
def parse_process_log(log_dir, process_name):
"""Parse process logs to find ready times for recovery calculation.
Args:
log_dir: Directory containing process log files
process_name: Name of process to look for (Frontend, VllmDecodeWorker, etc.)
Returns:
Dictionary mapping replica names to list of (timestamp, message, relative_time) tuples
"""
process_ready_pattern = {
"Frontend": re.compile(r"added model"),
"VllmDecodeWorker": re.compile(
r"VllmWorker for (?P<model_name>.*?) has been initialized"
),
"VllmPrefillWorker": re.compile(
r"VllmWorker for (?P<model_name>.*?) has been initialized"
),
"decode": re.compile(
r"Model registration succeeded|Decode worker handler initialized|Worker handler initialized"
),
"prefill": re.compile(
r"Model registration succeeded|Prefill worker handler initialized|Worker handler initialized"
),
"TRTLLMWorker": re.compile(
r"TrtllmWorker for (?P<model_name>.*?) has been initialized|Model registration succeeded"
),
"TRTLLMDecodeWorker": re.compile(
r"TrtllmWorker for (?P<model_name>.*?) has been initialized|Model registration succeeded"
),
"TRTLLMPrefillWorker": re.compile(
r"TrtllmWorker for (?P<model_name>.*?) has been initialized|Model registration succeeded"
),
}
if not os.path.isdir(log_dir):
return {}
ready_times: Dict[str, List] = {}
for entry in os.listdir(log_dir):
if entry.endswith(".log") and "metrics" not in entry:
replica_number = entry.split(".")[0]
if replica_number not in ready_times:
ready_times[replica_number] = []
process_start_time = None
with open(os.path.join(log_dir, entry), "r") as f:
for line in f:
line = line.strip()
if not line:
continue
# Try to parse as JSONL first
try:
json_data = json.loads(line)
if "time" in json_data:
timestamp = datetime.fromisoformat(
json_data["time"].replace("Z", "")
)
log_message = json_data.get("message", "")
else:
continue
except (json.JSONDecodeError, ValueError, KeyError):
# Fall back to readable format parsing
clean_line = re.sub(
r"\x1b\[.*?m", "", line
) # Remove ANSI codes
if not clean_line:
continue
parts = clean_line.split()
if len(parts) < 2:
continue
try:
timestamp = datetime.fromisoformat(
parts[0].replace("Z", "")
)
except ValueError:
continue
log_message = " ".join(parts[1:])
if not process_start_time:
process_start_time = timestamp
relative_time = (timestamp - process_start_time).total_seconds()
# Check for process ready patterns
if process_name in process_ready_pattern:
if process_ready_pattern[process_name].search(log_message):
if "previous" in entry:
location = 0
else:
location = -1
ready_times[replica_number].insert(
location, (timestamp, log_message, relative_time)
)
return ready_times
def calculate_recovery_time(test_dir, failure_type, fault_time):
"""Calculate recovery time after fault injection.
Args:
test_dir: Directory containing test logs
failure_type: Type of failure injected
fault_time: Datetime when fault was injected
Returns:
Recovery time in seconds or None if not applicable
"""
if not fault_time:
return None
processes = [
"Frontend",
"VllmDecodeWorker",
"VllmPrefillWorker",
"decode",
"prefill",
"TRTLLMWorker",
"TRTLLMDecodeWorker",
"TRTLLMPrefillWorker",
]
process_start = {}
start_time = None
for process in processes:
starts = parse_process_log(os.path.join(test_dir, process), process)
if starts:
process_start[process] = starts
last_recovery_time = 0
for process, replicas in process_start.items():
for replica, container_starts in replicas.items():
for starts in container_starts:
start_time = starts[0]
recovery_time = (start_time - fault_time).total_seconds()
if recovery_time > last_recovery_time:
last_recovery_time = recovery_time
if last_recovery_time == 0:
return None
return last_recovery_time
def process_test_directory(test_dir, sla):
"""Process a single test directory with legacy client results.
Args:
test_dir: Path to test directory
sla: Optional SLA threshold for latency
Returns:
Dictionary with test results or None if invalid
"""
if "test_fault_scenario" not in test_dir:
return {}
test_name = test_dir.split("test_fault_scenario[", 1)[1].rstrip("]")
failure_type = test_name.split("-")[-1]
test_prefix = "-".join(test_name.split("-")[:-1])
startup_time, fault_time, start_cmd = parse_test_log(
os.path.join(test_dir, "test.log.txt")
)
df = parse_client_logs(test_dir)
if df is None or df.empty:
return None
(
success_before,
failure_before,
success_after,
failure_after,
avg_before,
std_before,
avg_after,
std_after,
violations_before,
violations_after,
) = calculate_metrics(df, fault_time, sla)
recovery_time = calculate_recovery_time(test_dir, failure_type, fault_time)
return {
"test": test_prefix,
"cmd": start_cmd,
"failure": failure_type,
"start_time": startup_time,
"success_before_requests": success_before,
"failed_before_requests": failure_before,
"success_after_requests": success_after,
"failed_after_requests": failure_after,
"avg_latency_before": avg_before,
"std_latency_before": std_before,
"avg_latency_after": avg_after,
"std_latency_after": std_after,
"violations_before": violations_before,
"violations_after": violations_after,
"recovery_time": recovery_time,
}
def main(logs_dir, tablefmt, log_paths=None, sla=None):
"""Main entry point for parsing legacy client results.
Args:
logs_dir: Base directory containing test results
tablefmt: Table format for output (e.g., "fancy_grid")
log_paths: Optional list of specific log paths to process
sla: Optional SLA threshold for latency violations
"""
results = []
if log_paths:
# Process multiple log paths
for log_path in log_paths:
result = process_test_directory(log_path, sla)
if result:
results.append(result)
elif logs_dir:
# Process all test directories in logs_dir
for entry in os.listdir(logs_dir):
if entry.startswith("test_fault_scenario[") and os.path.isdir(
os.path.join(logs_dir, entry)
):
result = process_test_directory(os.path.join(logs_dir, entry), sla)
if result:
results.append(result)
# Group results by test prefix
grouped: dict[str, list[dict[str, Any]]] = {}
commands = {}
for res in results:
test_prefix = res["test"]
if test_prefix not in grouped:
grouped[test_prefix] = []
commands[test_prefix] = res["cmd"]
grouped[test_prefix].append(res)
# Define order for failure types
order = [
"none",
"frontend",
"frontend_pod",
"decode_worker",
"decode_worker_pod",
"prefill_worker",
"prefill_worker_pod",
"vllm_decode_engine_core",
"vllm_prefill_engine_core",
"sglang_decode_scheduler",
"sglang_decode_detokenizer",
"sglang_prefill_scheduler",
"sglang_prefill_detokenizer",
"trtllm_decode_engine_core",
"trtllm_prefill_engine_core",
]
# Print grouped tables
for test_prefix, group in grouped.items():
# Reorder results by failure type
new_group = []
for failure in order:
for res in group:
if failure == res["failure"]:
new_group.append(res)
group = new_group
# Define table headers
if sla:
headers = [
"Failure",
"Startup",
"Success\nBefore",
"Failed\nBefore",
"Success\nAfter",
"Failed\nAfter",
"Latency\nBefore",
"Latency\nAfter",
"Violations\nBefore",
"Violations\nAfter",
"Recovery",
]
else:
headers = [
"Failure",
"Startup",
"Success\nBefore",
"Failed\nBefore",
"Success\nAfter",
"Failed\nAfter",
"Latency\nBefore",
"Latency\nAfter",
"Recovery",
]
rows = []
for res in group:
if sla:
row = [
res["failure"],
res["start_time"],
res["success_before_requests"],
res["failed_before_requests"],
res["success_after_requests"],
res["failed_after_requests"],
res["avg_latency_before"],
res["avg_latency_after"],
res["violations_before"],
res["violations_after"],
res["recovery_time"],
]
else:
row = [
res["failure"],
res["start_time"],
res["success_before_requests"],
res["failed_before_requests"],
res["success_after_requests"],
res["failed_after_requests"],
res["avg_latency_before"],
res["avg_latency_after"],
res["recovery_time"],
]
rows.append(row)
print(f"\nTest Group: {test_prefix}")
print(
tabulate(
rows,
headers,
tablefmt=tablefmt,
floatfmt=".2f",
missingval="N/A",
numalign="right",
stralign="center",
)
)
print("\n" + "=" * 80)
if __name__ == "__main__":
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
parser = argparse.ArgumentParser(description="Parse legacy client test results")
parser.add_argument("--log-dir", default=".", help="Path to the logs directory")
parser.add_argument(
"--format", choices=["fancy", "markdown"], default="fancy", help="Table format"
)
parser.add_argument(
"--sla", type=float, default=None, help="SLA threshold for latency"
)
args = parser.parse_args()
# Map format choices to tabulate formats
tablefmt = "fancy_grid" if args.format == "fancy" else "pipe"
main(args.log_dir, tablefmt, sla=args.sla)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Factory module for auto-detecting and parsing test results."""
import logging
import os
from pathlib import Path
from typing import Any, Dict, List, Optional
def detect_result_type(log_dir: str) -> Optional[str]:
"""Auto-detect the type of test results in a directory.
Checks for characteristic files to determine if results are from:
- AI-Perf client: client_N/attempt_M/profile_export_aiperf.json
- Legacy client: client_N.log.txt with JSONL format
Args:
log_dir: Directory containing test results
Returns:
"aiperf" if AI-Perf results detected
"legacy" if legacy client results detected
None if unable to detect or directory doesn't exist
"""
if not os.path.exists(log_dir):
logging.warning(f"Directory does not exist: {log_dir}")
return None
if not os.path.isdir(log_dir):
logging.warning(f"Not a directory: {log_dir}")
return None
log_path = Path(log_dir)
# Check for AI-Perf results
# Pattern: client_N/attempt_M/profile_export_aiperf.json
aiperf_indicators = 0
legacy_indicators = 0
for item in log_path.iterdir():
if item.is_dir() and item.name.startswith("client_"):
# Check for AI-Perf structure
for attempt_dir in item.iterdir():
if attempt_dir.is_dir() and attempt_dir.name.startswith("attempt_"):
# Look for AI-Perf result files
if (attempt_dir / "profile_export_aiperf.json").exists():
aiperf_indicators += 1
break
if (attempt_dir / "profile_export_aiperf.csv").exists():
aiperf_indicators += 1
break
if (attempt_dir / "genai_perf.log").exists():
aiperf_indicators += 1
break
# Check for legacy client results
# Pattern: client_N.log.txt with JSONL content
if (
item.is_file()
and item.name.startswith("client_")
and item.name.endswith(".log.txt")
):
legacy_indicators += 1
# Determine result type based on indicators
if aiperf_indicators > 0 and legacy_indicators == 0:
return "aiperf"
elif legacy_indicators > 0 and aiperf_indicators == 0:
return "legacy"
elif aiperf_indicators > 0 and legacy_indicators > 0:
# Mixed results - prioritize AI-Perf as it's newer
logging.warning(
f"Mixed result types detected in {log_dir}. "
f"Found {aiperf_indicators} AI-Perf indicators and {legacy_indicators} legacy indicators. "
f"Using AI-Perf parser."
)
return "aiperf"
else:
# No clear indicators
logging.warning(
f"Unable to detect result type in {log_dir}. "
f"No client result files found."
)
return None
def parse_test_results(
log_dir: Optional[str] = None,
log_paths: Optional[List[str]] = None,
tablefmt: str = "grid",
sla: Optional[float] = None,
force_parser: Optional[str] = None,
) -> Any:
"""Auto-detect and parse test results using the appropriate parser.
This function automatically detects whether results are from the legacy
client (JSONL format) or AI-Perf client (JSON format) and routes to the
correct parser.
Args:
log_dir: Base directory for logs (for single directory processing)
log_paths: List of log directories to process (for multiple directories)
tablefmt: Table format for output (e.g., "fancy_grid", "pipe")
sla: Optional SLA threshold for latency violations
force_parser: Optional override to force using a specific parser
("aiperf" or "legacy"). If not provided, auto-detection is used.
Returns:
Results from the appropriate parser
Raises:
ValueError: If force_parser is invalid or unable to detect result type
Example:
>>> # Auto-detect and parse single directory
>>> parse_test_results(log_dir="test_fault_scenario[...]")
>>> # Auto-detect and parse multiple directories
>>> parse_test_results(log_paths=["test1", "test2"])
>>> # Force use of legacy parser
>>> parse_test_results(log_dir="test_dir", force_parser="legacy")
"""
# Validate force_parser if provided
if force_parser is not None:
if force_parser not in ["aiperf", "legacy"]:
raise ValueError(
f"Invalid force_parser value: '{force_parser}'. "
f"Valid options are: 'aiperf', 'legacy'"
)
# Determine which parser to use
parser_type = None
if force_parser:
# Use forced parser without detection
parser_type = force_parser
logging.info(f"Using forced parser: {parser_type}")
else:
# Auto-detect parser type
if log_paths:
# Detect from first log path
if log_paths:
parser_type = detect_result_type(log_paths[0])
# Validate all paths use same type
for log_path in log_paths[1:]:
detected = detect_result_type(log_path)
if detected != parser_type:
logging.warning(
f"Inconsistent result types detected. "
f"Using {parser_type} for all paths."
)
elif log_dir:
# Detect from single directory
parser_type = detect_result_type(log_dir)
else:
raise ValueError("Must provide either log_dir or log_paths")
if parser_type is None:
raise ValueError(
"Unable to auto-detect result type. "
"Use force_parser='aiperf' or force_parser='legacy' to specify explicitly."
)
# Route to appropriate parser
logging.info(f"Using {parser_type} parser for results")
if parser_type == "aiperf":
from tests.fault_tolerance.deploy.parse_results import main as parse_aiperf
if log_paths:
return parse_aiperf(
logs_dir=None,
log_paths=log_paths,
tablefmt=tablefmt,
sla=sla,
)
else:
return parse_aiperf(
logs_dir=log_dir,
log_paths=None,
tablefmt=tablefmt,
sla=sla,
)
elif parser_type == "legacy":
from tests.fault_tolerance.deploy.legacy_parse_results import (
main as parse_legacy,
)
if log_paths:
return parse_legacy(
logs_dir=None,
log_paths=log_paths,
tablefmt=tablefmt,
sla=sla,
)
else:
return parse_legacy(
logs_dir=log_dir,
log_paths=None,
tablefmt=tablefmt,
sla=sla,
)
else:
raise ValueError(f"Unknown parser type: {parser_type}")
def get_result_info(log_dir: str) -> Dict[str, Any]:
"""Get information about test results in a directory.
Args:
log_dir: Directory containing test results
Returns:
Dictionary with result information:
{
"type": "aiperf" or "legacy" or None,
"client_count": number of clients detected,
"has_test_log": whether test.log.txt exists,
"details": additional format-specific details
}
"""
info: Dict[str, Any] = {
"type": None,
"client_count": 0,
"has_test_log": False,
"details": {},
}
if not os.path.exists(log_dir) or not os.path.isdir(log_dir):
return info
log_path = Path(log_dir)
# Check for test.log.txt
info["has_test_log"] = (log_path / "test.log.txt").exists()
# Detect result type
info["type"] = detect_result_type(log_dir)
# Count clients and gather details
if info["type"] == "aiperf":
attempt_counts = []
for item in log_path.iterdir():
if item.is_dir() and item.name.startswith("client_"):
info["client_count"] += 1
# Count attempts for this client
attempts = len(
[
d
for d in item.iterdir()
if d.is_dir() and d.name.startswith("attempt_")
]
)
attempt_counts.append(attempts)
info["details"]["attempt_counts"] = attempt_counts
info["details"]["total_attempts"] = sum(attempt_counts)
elif info["type"] == "legacy":
for item in log_path.iterdir():
if (
item.is_file()
and item.name.startswith("client_")
and item.name.endswith(".log.txt")
):
info["client_count"] += 1
return info
def print_result_info(log_dir: str) -> None:
"""Print human-readable information about test results.
Args:
log_dir: Directory containing test results
"""
info = get_result_info(log_dir)
print(f"\nTest Results Information: {log_dir}")
print("=" * 60)
print(f"Result Type: {info['type'] or 'Unknown'}")
print(f"Client Count: {info['client_count']}")
print(f"Has Test Log: {info['has_test_log']}")
if info["details"]:
print("\nDetails:")
for key, value in info["details"].items():
print(f" {key}: {value}")
print("=" * 60)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="Auto-detect and parse fault tolerance test results"
)
parser.add_argument(
"log_dir", nargs="?", default=None, help="Directory containing test results"
)
parser.add_argument(
"--log-paths", nargs="+", help="Multiple log directories to process"
)
parser.add_argument(
"--format", choices=["fancy", "markdown"], default="fancy", help="Table format"
)
parser.add_argument(
"--sla", type=float, default=None, help="SLA threshold for latency"
)
parser.add_argument(
"--force-parser",
choices=["aiperf", "legacy"],
default=None,
help="Force use of specific parser (skip auto-detection)",
)
parser.add_argument(
"--info",
action="store_true",
help="Print information about results without parsing",
)
args = parser.parse_args()
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
# Map format choices to tabulate formats
tablefmt = "fancy_grid" if args.format == "fancy" else "pipe"
# Info mode
if args.info:
if args.log_dir:
print_result_info(args.log_dir)
elif args.log_paths:
for log_path in args.log_paths:
print_result_info(log_path)
else:
print("Error: Must provide log_dir or --log-paths")
else:
# Parse mode
try:
parse_test_results(
log_dir=args.log_dir,
log_paths=args.log_paths,
tablefmt=tablefmt,
sla=args.sla,
force_parser=args.force_parser,
)
except Exception as e:
logging.error(f"Failed to parse results: {e}")
exit(1)
......@@ -100,6 +100,8 @@ class Load:
output_token_length: int = 100
max_retries: int = 3 # Increased for fault tolerance
sla: Optional[float] = None
client_type: str = "aiperf" # "aiperf" or "legacy"
max_request_rate: float = 1.0 # Rate limiting for legacy client (requests/sec)
@dataclass
......@@ -297,6 +299,83 @@ def _create_backend_failures(backend, deploy_type="disagg"):
return failures
def create_aiperf_load(
clients: int = 10,
requests_per_client: int = 150,
input_token_length: int = 100,
output_token_length: int = 100,
max_retries: int = 3,
sla: Optional[float] = None,
max_request_rate: float = 1.0,
) -> Load:
"""Create a Load configuration for AI-Perf client.
Args:
clients: Number of concurrent clients (default: 10)
requests_per_client: Number of requests per client (default: 150)
input_token_length: Input token count (default: 100)
output_token_length: Output token count (default: 100)
max_retries: Maximum retry attempts - AI-Perf retries entire test (default: 3)
sla: Optional SLA threshold for latency (default: None)
max_request_rate: Rate limiting for requests/sec (default: 1.0)
Returns:
Load instance configured for AI-Perf client
Example:
>>> load = create_aiperf_load(clients=20, requests_per_client=200)
"""
return Load(
clients=clients,
requests_per_client=requests_per_client,
input_token_length=input_token_length,
output_token_length=output_token_length,
max_retries=max_retries,
sla=sla,
client_type="aiperf",
max_request_rate=max_request_rate,
)
def create_legacy_load(
clients: int = 10,
requests_per_client: int = 100,
input_token_length: int = 100,
output_token_length: int = 100,
max_retries: int = 1,
sla: Optional[float] = None,
max_request_rate: float = 1.0,
) -> Load:
"""Create a Load configuration for legacy custom client.
Args:
clients: Number of concurrent clients (default: 10)
requests_per_client: Number of requests per client (default: 100, fewer than AI-Perf)
input_token_length: Input token count (default: 100)
output_token_length: Output token count (default: 100)
max_retries: Maximum retry attempts - legacy retries per request (default: 1)
sla: Optional SLA threshold for latency (default: None)
max_request_rate: Rate limiting for requests/sec (default: 1.0)
Returns:
Load instance configured for legacy client
Example:
>>> load = create_legacy_load(clients=10, max_request_rate=2.0)
"""
return Load(
clients=clients,
requests_per_client=requests_per_client,
input_token_length=input_token_length,
output_token_length=output_token_length,
max_retries=max_retries,
sla=sla,
client_type="legacy",
max_request_rate=max_request_rate,
)
# Default load configuration (using AI-Perf)
load = Load()
# model = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B"
......
......@@ -8,52 +8,99 @@ from contextlib import contextmanager
import pytest
from tests.fault_tolerance.deploy.client import client
from tests.fault_tolerance.deploy.parse_results import main as parse_results
from tests.fault_tolerance.deploy.scenarios import scenarios
from tests.fault_tolerance.deploy.client_factory import get_client_function
from tests.fault_tolerance.deploy.parse_factory import parse_test_results
from tests.fault_tolerance.deploy.scenarios import Load, scenarios
from tests.utils.managed_deployment import ManagedDeployment
@pytest.fixture(params=scenarios.keys())
def scenario(request):
return scenarios[request.param]
def scenario(request, client_type):
"""Get scenario and optionally override client type from command line.
If --client-type is specified, it overrides the scenario's default client type.
"""
scenario_obj = scenarios[request.param]
# Override client type if specified on command line
if client_type is not None:
# Create a copy of the load config with overridden client type
import copy
scenario_obj = copy.deepcopy(scenario_obj)
scenario_obj.load.client_type = client_type
# Adjust retry settings based on client type
if client_type == "legacy":
# Legacy uses per-request retries
if scenario_obj.load.max_retries > 1:
scenario_obj.load.max_retries = 1
elif client_type == "aiperf":
# AI-Perf uses full test retries
if scenario_obj.load.max_retries < 3:
scenario_obj.load.max_retries = 3
return scenario_obj
@contextmanager
def _clients(
logger,
num_clients,
request,
deployment_spec,
namespace,
model,
requests_per_client,
input_token_length,
output_token_length,
max_retries,
retry_delay=5, # Default 5 seconds between retries
load_config: Load,
):
"""Start client processes using factory pattern for client selection.
Args:
logger: Logger instance
request: Pytest request fixture
deployment_spec: Deployment specification
namespace: Kubernetes namespace
model: Model name to test
load_config: Load configuration object containing client settings
"""
# Get appropriate client function based on configuration
client_func = get_client_function(load_config.client_type)
logger.info(
f"Starting {load_config.clients} clients using '{load_config.client_type}' client"
)
procs = []
ctx = multiprocessing.get_context("spawn")
for i in range(num_clients):
# Determine retry_delay_or_rate based on client type
if load_config.client_type == "legacy":
# Legacy client uses max_request_rate for rate limiting
retry_delay_or_rate = load_config.max_request_rate
else:
# AI-Perf client uses retry_delay between attempts (default 5s)
retry_delay_or_rate = 5
for i in range(load_config.clients):
procs.append(
ctx.Process(
target=client,
target=client_func,
args=(
deployment_spec,
namespace,
model,
request.node.name,
i,
requests_per_client,
input_token_length,
output_token_length,
max_retries,
retry_delay,
load_config.requests_per_client,
load_config.input_token_length,
load_config.output_token_length,
load_config.max_retries,
retry_delay_or_rate,
),
)
)
procs[-1].start()
logger.debug(f"Started client {i} (PID: {procs[-1].pid})")
yield procs
for proc in procs:
......@@ -101,24 +148,50 @@ global_result_list = []
@pytest.fixture(autouse=True)
def results_table(request, scenario): # noqa: F811
"""Parse and display results for individual test using factory pattern.
Automatically detects result type (AI-Perf or legacy) and uses
the appropriate parser.
"""
yield
parse_results(
logs_dir=None,
log_paths=[request.node.name],
tablefmt="fancy_grid",
sla=scenario.load.sla,
)
# Use factory to auto-detect and parse results
try:
parse_test_results(
log_dir=None,
log_paths=[request.node.name],
tablefmt="fancy_grid",
sla=scenario.load.sla,
# force_parser can be set based on client_type if needed
# force_parser=scenario.load.client_type,
)
except Exception as e:
logging.error(f"Failed to parse results for {request.node.name}: {e}")
global_result_list.append(request.node.name)
@pytest.fixture(autouse=True, scope="session")
def results_summary():
"""Parse and display combined results for all tests in session.
Automatically detects result types and uses appropriate parsers.
"""
yield
parse_results(
logs_dir=None,
log_paths=global_result_list,
tablefmt="fancy_grid",
)
if not global_result_list:
logging.info("No test results to summarize")
return
# Use factory to auto-detect and parse combined results
try:
parse_test_results(
log_dir=None,
log_paths=global_result_list,
tablefmt="fancy_grid",
)
except Exception as e:
logging.error(f"Failed to parse combined results: {e}")
@pytest.mark.e2e
......@@ -178,14 +251,10 @@ async def test_fault_scenario(
) as deployment:
with _clients(
logger,
scenario.load.clients,
request,
scenario.deployment,
namespace,
model,
scenario.load.requests_per_client,
scenario.load.input_token_length,
scenario.load.output_token_length,
scenario.load.max_retries,
scenario.load, # Pass entire Load config object
):
_inject_failures(scenario.failures, logger, deployment)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment