Unverified Commit c4abe9bc authored by Tzu-Ling Kan's avatar Tzu-Ling Kan Committed by GitHub
Browse files

feat: Add prompt > seq_len k8 tests. (#3930)


Signed-off-by: default avatartzulingk@nvidia.com <tzulingk@nvidia.com>
parent 94ad5081
......@@ -119,6 +119,17 @@ The following failure types are defined in `scenarios.py`:
| `sglang_prefill_scheduler` | Terminate SGLang prefill scheduler process. | `SIGKILL` to `sglang::scheduler`| sglang only |
| `sglang_prefill_detokenizer` | Terminate SGLang prefill detokenizer process. | `SIGKILL` to `sglang::detokenizer`| sglang only |
#### Token Overflow Tests
In addition to process and pod failures, the suite includes tests for **token overflow**, where the model receives an input prompt larger than its configured `max_seq_len`. These tests are crucial for verifying that the system can gracefully reject invalid requests without crashing.
- **Failure Injection**: Unlike other tests, this failure is injected from the **client side**. The `aiperf` client is configured to send a batch of requests with oversized token lengths.
- **Two-Phase Execution**: These tests run in two distinct phases, creating separate log directories for each:
1. **`overflow` Phase**: Sends oversized requests. The expected outcome is a high rate of failed requests (rejections) as the server correctly identifies and blocks them.
2. **`recovery` Phase**: Immediately after the overflow phase, sends valid, normal-sized requests. The expected outcome is a high success rate, confirming that the system has recovered and remains operational.
The combined results of these two phases demonstrate both the system's ability to reject invalid inputs and its stability after handling them.
#### Example Scenario Breakdown
**Scenario**: `sglang-agg-tp-2-dp-1-decode_worker`
......@@ -392,7 +403,6 @@ graph LR
style DecodePool stroke:#000,stroke-width:2px
```
#### Summary:
......@@ -596,3 +606,5 @@ Test Group: vllm-agg-tp-1-dp-2
╘═══════════════════╧═══════════╧═══════════╧══════════╧═══════════╧══════════╧═══════════╧═══════════╧════════════╛
```
......@@ -418,7 +418,7 @@ def process_test_directory(test_dir, sla):
}
def main(logs_dir, tablefmt, log_paths=None, sla=None):
def main(logs_dir, tablefmt, log_paths=None, sla=None, print_output=True):
"""Main entry point for parsing legacy client results.
Args:
......@@ -426,6 +426,7 @@ def main(logs_dir, tablefmt, log_paths=None, sla=None):
tablefmt: Table format for output (e.g., "fancy_grid")
log_paths: Optional list of specific log paths to process
sla: Optional SLA threshold for latency violations
print_output: If True, print tables and summaries. If False, only return results.
"""
results = []
......@@ -542,9 +543,11 @@ def main(logs_dir, tablefmt, log_paths=None, sla=None):
]
rows.append(row)
print(f"\nTest Group: {test_prefix}")
print(
tabulate(
if print_output:
logging.info(f"\nTest Group: {test_prefix}")
logging.info(
"\n"
+ tabulate(
rows,
headers,
tablefmt=tablefmt,
......@@ -554,7 +557,7 @@ def main(logs_dir, tablefmt, log_paths=None, sla=None):
stralign="center",
)
)
print("\n" + "=" * 80)
logging.info("\n" + "=" * 80)
if __name__ == "__main__":
......
......@@ -103,7 +103,9 @@ def parse_test_results(
log_paths: Optional[List[str]] = None,
tablefmt: str = "grid",
sla: Optional[float] = None,
success_threshold: float = 90.0,
force_parser: Optional[str] = None,
print_output: bool = True,
) -> Any:
"""Auto-detect and parse test results using the appropriate parser.
......@@ -116,8 +118,10 @@ def parse_test_results(
log_paths: List of log directories to process (for multiple directories)
tablefmt: Table format for output (e.g., "fancy_grid", "pipe")
sla: Optional SLA threshold for latency violations
success_threshold: Success rate threshold for pass/fail (default: 90.0)
force_parser: Optional override to force using a specific parser
("aiperf" or "legacy"). If not provided, auto-detection is used.
print_output: If True, print tables and summaries. If False, only return results.
Returns:
Results from the appropriate parser
......@@ -189,6 +193,8 @@ def parse_test_results(
log_paths=log_paths,
tablefmt=tablefmt,
sla=sla,
success_threshold=success_threshold,
print_output=print_output,
)
else:
return parse_aiperf(
......@@ -196,6 +202,8 @@ def parse_test_results(
log_paths=None,
tablefmt=tablefmt,
sla=sla,
success_threshold=success_threshold,
print_output=print_output,
)
elif parser_type == "legacy":
......@@ -209,6 +217,7 @@ def parse_test_results(
log_paths=log_paths,
tablefmt=tablefmt,
sla=sla,
print_output=print_output,
)
else:
return parse_legacy(
......@@ -216,6 +225,7 @@ def parse_test_results(
log_paths=None,
tablefmt=tablefmt,
sla=sla,
print_output=print_output,
)
else:
......@@ -294,18 +304,18 @@ def print_result_info(log_dir: str) -> None:
"""
info = get_result_info(log_dir)
print(f"\nTest Results Information: {log_dir}")
print("=" * 60)
print(f"Result Type: {info['type'] or 'Unknown'}")
print(f"Client Count: {info['client_count']}")
print(f"Has Test Log: {info['has_test_log']}")
logging.info(f"\nTest Results Information: {log_dir}")
logging.info("=" * 60)
logging.info(f"Result Type: {info['type'] or 'Unknown'}")
logging.info(f"Client Count: {info['client_count']}")
logging.info(f"Has Test Log: {info['has_test_log']}")
if info["details"]:
print("\nDetails:")
logging.info("\nDetails:")
for key, value in info["details"].items():
print(f" {key}: {value}")
logging.info(f" {key}: {value}")
print("=" * 60)
logging.info("=" * 60)
if __name__ == "__main__":
......@@ -354,7 +364,7 @@ if __name__ == "__main__":
for log_path in args.log_paths:
print_result_info(log_path)
else:
print("Error: Must provide log_dir or --log-paths")
logging.error("Must provide log_dir or --log-paths")
else:
# Parse mode
try:
......
......@@ -27,6 +27,13 @@ from typing import Any, Dict, List, Optional, Tuple
import numpy as np
from tabulate import tabulate
from tests.fault_tolerance.deploy.scenarios import (
OVERFLOW_SUFFIX,
RECOVERY_SUFFIX,
WORKER_MAP,
TestPhase,
)
def parse_test_log(
file_path: str,
......@@ -169,6 +176,67 @@ def extract_timestamp_from_log(
return None
def extract_test_info_from_dir(
process_logs_dir: str,
) -> Tuple[Optional[str], Optional[str]]:
"""
Extract backend and deployment type from process_logs_dir.
Args:
process_logs_dir: Path like test_fault_scenario[trtllm_agg_token_overflow_2x]
Returns:
Tuple of (backend, deploy_type) or (None, None) if not a token overflow test
"""
test_name = os.path.basename(process_logs_dir)
# Check if this is a token overflow test
if "token_overflow" not in test_name:
return None, None
# Extract the content between brackets
match = re.search(r"\[([^\]]+)\]", test_name)
if not match:
return None, None
test_config = match.group(1)
# Parse backend and deployment type
# Format: {backend}_{deploy_type}_token_overflow_{multiplier}
parts = test_config.split("_")
if len(parts) < 4:
return None, None
backend = parts[0] # vllm, trtllm, sglang
deploy_type = parts[1] # agg or disagg
return backend, deploy_type
def get_decode_worker_dir(backend: str, deploy_type: str) -> Optional[str]:
"""
Get decode worker directory name from WORKER_MAP.
Reuses the exact logic from scenarios.py.
Args:
backend: Backend type (vllm, trtllm, sglang)
deploy_type: Deployment type (agg or disagg)
Returns:
Worker directory name
"""
if backend not in WORKER_MAP:
return None
# For trtllm agg deployments, use different worker name
if backend == "trtllm" and deploy_type == "agg":
return WORKER_MAP[backend]["decode_agg"] # "TRTLLMWorker"
else:
return WORKER_MAP[backend]["decode"]
# "TRTLLMDecodeWorker", "VllmDecodeWorker", or "decode"
def calculate_recovery_time(
failure_info: Optional[List[str]],
process_logs_dir: str,
......@@ -184,12 +252,33 @@ def calculate_recovery_time(
Returns:
Recovery time in seconds or None if not found
"""
if not failure_info:
return None
# Determine component type from failure info (strip any quotes)
if failure_info:
# Regular test - use failure info
component_type = failure_info[0].strip("'\"") # e.g., "Frontend" or "decode"
component_dir = os.path.join(process_logs_dir, component_type)
else:
# Check if this is a mixed token test
backend, deploy_type = extract_test_info_from_dir(process_logs_dir)
if not backend or not deploy_type:
logging.warning(
f"Could not determine backend or deploy type for {process_logs_dir}"
)
return None
# Mixed token test - get decode worker directory
decode_worker_dir = get_decode_worker_dir(backend, deploy_type)
if not decode_worker_dir:
logging.warning(
f"Could not determine decode worker for {backend} {deploy_type}"
)
return None
component_dir = os.path.join(process_logs_dir, decode_worker_dir)
logging.info(
f"Mixed token test - using decode worker directory: {component_dir}"
)
logging.info(f"Component directory: {component_dir}")
if not os.path.exists(component_dir):
......@@ -318,8 +407,9 @@ def parse_aiperf_client_results(log_dir: str) -> Dict[str, Any]:
# Fall back to input config if no requests were recorded
if request_count == 0 and "input_config" in client_metrics:
loadgen_config = client_metrics.get("input_config", {}).get(
"loadgen", {}
input_config = client_metrics.get("input_config", {})
loadgen_config = (
input_config.get("loadgen", {}) if input_config else {}
)
request_count = loadgen_config.get("request_count", 0)
......@@ -352,18 +442,21 @@ def parse_aiperf_client_results(log_dir: str) -> Dict[str, Any]:
all_metrics["p99_latencies"].append(request_latency["p99"] / 1000.0)
# Time to first token
ttft = client_metrics.get("time_to_first_token", {}).get("avg", None)
ttft_record = client_metrics.get("time_to_first_token", {})
ttft = ttft_record.get("avg", None) if ttft_record else None
if ttft:
all_metrics["ttft"].append(ttft / 1000.0) # Convert ms to s
# Inter-token latency
itl = client_metrics.get("inter_token_latency", {}).get("avg", None)
itl_record = client_metrics.get("inter_token_latency", {})
itl = itl_record.get("avg", None) if itl_record else None
if itl:
all_metrics["itl"].append(itl / 1000.0) # Convert ms to s
# Throughput from request_throughput record
req_throughput = client_metrics.get("request_throughput", {}).get(
"avg", 0
throughput_record = client_metrics.get("request_throughput", {})
req_throughput = (
throughput_record.get("avg", 0) if throughput_record else 0
)
if req_throughput:
all_metrics["throughputs"].append(req_throughput)
......@@ -475,16 +568,75 @@ def print_summary_table(
["Avg Client Throughput", f"{np.mean(metrics['throughputs']):.2f} req/s"]
)
# Print table
print("\n" + "=" * 60)
print("FAULT TOLERANCE TEST SUMMARY - AI-PERF")
print("=" * 60)
print(tabulate(rows, headers=headers, tablefmt=tablefmt))
print("=" * 60 + "\n")
# Log table
logging.info(
"\n" + "=" * 60 + "\n"
"FAULT TOLERANCE TEST SUMMARY - AI-PERF\n"
+ "=" * 60
+ "\n"
+ tabulate(rows, headers=headers, tablefmt=tablefmt)
+ "\n"
+ "=" * 60
+ "\n"
)
def _process_test_phase_results(
test_phase: TestPhase,
metrics: Dict[str, Any],
success_threshold: float,
) -> None:
"""Helper function to process and log results for a specific test phase."""
if test_phase == TestPhase.OVERFLOW:
total_reqs = metrics.get("total_requests", 0)
failed_reqs = metrics.get("failed_requests", 0)
if total_reqs > 0:
failure_rate = (failed_reqs / total_reqs) * 100
logging.info(
"\n" + "=" * 60 + "\n"
"Processing OVERFLOW phase - Expecting rejections\n" + "=" * 60 + "\n"
f"\nOverflow Results: {failed_reqs}/{total_reqs} requests rejected ({failure_rate:.1f}%)"
)
if failure_rate < success_threshold:
logging.warning(
f"Expected rejection rate >= {success_threshold}%, got {failure_rate:.1f}%"
)
else:
logging.info("Overflow validation working correctly")
else:
logging.warning("No requests to process, total_requests is 0.")
elif test_phase == TestPhase.RECOVERY:
total_reqs = metrics.get("total_requests", 0)
success_reqs = metrics.get("successful_requests", 0)
if total_reqs > 0:
success_rate = (success_reqs / total_reqs) * 100
logging.info(
"\n" + "=" * 60 + "\n"
"Processing RECOVERY phase - Expecting success\n" + "=" * 60 + "\n"
f"\nRecovery Results: {success_reqs}/{total_reqs} requests succeeded ({success_rate:.1f}%)"
)
if success_rate < success_threshold:
logging.warning(
f"Expected success rate >= {success_threshold}%, got {success_rate:.1f}%"
)
else:
logging.info("System recovered successfully")
else:
logging.warning("No requests to process, total_requests is 0.")
elif test_phase == TestPhase.STANDARD:
# Standard test phase doesn't need special processing
pass
else:
raise ValueError(f"Unknown test phase: {test_phase}")
def process_single_test(
log_dir: str, tablefmt: str = "grid", sla: Optional[float] = None
log_dir: str,
tablefmt: str = "grid",
sla: Optional[float] = None,
success_threshold: float = 90.0,
print_output: bool = True,
) -> Dict[str, Any]:
"""
Process a single test log directory.
......@@ -493,10 +645,20 @@ def process_single_test(
log_dir: Directory containing test results
tablefmt: Table format for output
sla: Service level agreement for latency (optional)
success_threshold: Success rate threshold for pass/fail (default: 90.0)
print_output: If True, print tables and phase headers. If False, only return results.
Returns:
Dictionary with test results
"""
# Detect test phase (overflow or recovery) - check suffix to avoid ambiguity
test_phase = TestPhase.STANDARD
if log_dir.endswith(OVERFLOW_SUFFIX):
test_phase = TestPhase.OVERFLOW
elif log_dir.endswith(RECOVERY_SUFFIX):
test_phase = TestPhase.RECOVERY
# Parse test configuration
test_log = os.path.join(log_dir, "test.log.txt")
startup_time, failure_info = parse_test_log(test_log)
......@@ -512,7 +674,15 @@ def process_single_test(
# Extract client count from metrics
num_clients = metrics.get("num_clients", 0)
# Add phase information to metrics (store as string for JSON serialization)
metrics["test_phase"] = test_phase.name.lower()
# Process and print phase-specific results
if print_output:
_process_test_phase_results(test_phase, metrics, success_threshold)
# Print summary
if print_output:
print_summary_table(
log_dir, num_clients, startup_time, recovery_time, metrics, tablefmt, sla
)
......@@ -526,11 +696,113 @@ def process_single_test(
}
def process_overflow_recovery_test(
overflow_path: str,
recovery_path: str,
tablefmt: str = "fancy_grid",
sla: Optional[float] = None,
success_threshold: float = 90.0,
) -> Dict[str, Any]:
"""
Process paired overflow/recovery test and print combined summary.
Args:
overflow_path: Path to overflow test directory
recovery_path: Path to recovery test directory
tablefmt: Table format for output
sla: Optional SLA threshold
success_threshold: Success rate threshold for pass/fail (default: 90.0)
Returns:
Combined results dictionary
"""
overflow_results = process_single_test(
overflow_path, tablefmt, sla, success_threshold, print_output=False
)
recovery_results = process_single_test(
recovery_path, tablefmt, sla, success_threshold, print_output=False
)
combined_metrics = {
"total_requests": overflow_results["metrics"]["total_requests"]
+ recovery_results["metrics"]["total_requests"],
"successful_requests": overflow_results["metrics"]["successful_requests"]
+ recovery_results["metrics"]["successful_requests"],
"failed_requests": overflow_results["metrics"]["failed_requests"]
+ recovery_results["metrics"]["failed_requests"],
# Performance metrics from recovery phase
"latencies": recovery_results["metrics"].get("latencies", []),
"ttft": recovery_results["metrics"].get("ttft", []),
"itl": recovery_results["metrics"].get("itl", []),
"throughputs": recovery_results["metrics"].get("throughputs", []),
"p50_latencies": recovery_results["metrics"].get("p50_latencies", []),
"p90_latencies": recovery_results["metrics"].get("p90_latencies", []),
"p99_latencies": recovery_results["metrics"].get("p99_latencies", []),
}
base_path = overflow_path
if overflow_path.endswith(OVERFLOW_SUFFIX):
base_path = overflow_path[: -len(OVERFLOW_SUFFIX)]
test_log_path = os.path.join(base_path, "test.log.txt")
startup_time, _ = parse_test_log(test_log_path)
recovery_time = calculate_recovery_time(failure_info=[], process_logs_dir=base_path)
if overflow_results["metrics"]["total_requests"] == 0:
overflow_rate = 0
else:
overflow_rate = (
overflow_results["metrics"]["failed_requests"]
/ overflow_results["metrics"]["total_requests"]
* 100
)
if recovery_results["metrics"]["total_requests"] == 0:
recovery_rate = 0
else:
recovery_rate = (
recovery_results["metrics"]["successful_requests"]
/ recovery_results["metrics"]["total_requests"]
* 100
)
logging.info(
"\n" + "=" * 60 + "\n"
"SESSION SUMMARY - COMBINED OVERFLOW/RECOVERY TEST\n" + "=" * 60 + "\n"
"\nPhase Breakdown:\n"
f" Overflow: {overflow_results['metrics']['failed_requests']}/"
f"{overflow_results['metrics']['total_requests']} rejected ({overflow_rate:.1f}%)\n"
f" Recovery: {recovery_results['metrics']['successful_requests']}/"
f"{recovery_results['metrics']['total_requests']} succeeded ({recovery_rate:.1f}%)"
)
print_summary_table(
log_dir=base_path,
num_clients=overflow_results["num_clients"],
startup_time=startup_time,
recovery_time=recovery_time,
metrics=combined_metrics,
tablefmt=tablefmt,
sla=sla,
)
return {
"log_dir": base_path,
"num_clients": overflow_results["num_clients"],
"startup_time": startup_time,
"recovery_time": recovery_time,
"metrics": combined_metrics,
"overflow_results": overflow_results,
"recovery_results": recovery_results,
}
def main(
logs_dir: Optional[str] = None,
log_paths: Optional[List[str]] = None,
tablefmt: str = "grid",
sla: Optional[float] = None,
success_threshold: float = 90.0,
print_output: bool = True,
):
"""
Main parser entry point with support for multiple log paths.
......@@ -540,6 +812,8 @@ def main(
log_paths: List of log directories to process
tablefmt: Table format for output
sla: Service level agreement for latency (optional)
success_threshold: Success rate threshold for pass/fail (default: 90.0)
print_output: If True, print tables and summaries. If False, only return results.
Returns:
Combined results from all processed tests
......@@ -555,43 +829,75 @@ def main(
full_path = log_path
if os.path.isdir(full_path):
print(f"\nProcessing: {full_path}")
results = process_single_test(full_path, tablefmt, sla)
logging.info(f"\nProcessing: {full_path}")
results = process_single_test(
full_path, tablefmt, sla, success_threshold, print_output
)
all_results.append(results)
else:
print(f"Warning: {full_path} is not a valid directory, skipping...")
# If multiple tests, also print combined summary
if len(all_results) > 1:
print("\n" + "=" * 60)
print("COMBINED TEST SUMMARY")
print("=" * 60)
logging.warning(f"{full_path} is not a valid directory, skipping...")
# If multiple tests, also log combined summary
if len(all_results) > 1 and print_output:
total_requests = sum(r["metrics"]["total_requests"] for r in all_results)
total_successful = sum(
r["metrics"]["successful_requests"] for r in all_results
)
total_failed = sum(r["metrics"]["failed_requests"] for r in all_results)
print(f"Total Tests: {len(all_results)}")
print(f"Total Requests: {total_requests}")
print(f"Total Successful: {total_successful}")
print(f"Total Failed: {total_failed}")
# Build summary message
summary_lines = [
"\n" + "=" * 60,
"COMBINED TEST SUMMARY",
"=" * 60,
f"Total Tests: {len(all_results)}",
f"Total Requests: {total_requests}",
f"Total Successful: {total_successful}",
f"Total Failed: {total_failed}",
]
if total_requests > 0:
print(
summary_lines.append(
f"Overall Success Rate: {(total_successful/total_requests)*100:.2f}%"
)
print("=" * 60 + "\n")
# Check if this is an overflow/recovery pair and show timing info
has_overflow = any(
r["log_dir"].endswith(OVERFLOW_SUFFIX) for r in all_results
)
has_recovery = any(
r["log_dir"].endswith(RECOVERY_SUFFIX) for r in all_results
)
if has_overflow and has_recovery:
# Find startup time from overflow phase
for r in all_results:
if r["log_dir"].endswith(OVERFLOW_SUFFIX) and r.get("startup_time"):
summary_lines.append(f"Startup Time: {r['startup_time']}")
break
# Find recovery time stored in recovery phase
for r in all_results:
if r["log_dir"].endswith(RECOVERY_SUFFIX) and r.get(
"recovery_time"
):
summary_lines.append(
f"Recovery Time (gap between phases): {r['recovery_time']}"
)
break
summary_lines.append("=" * 60 + "\n")
logging.info("\n".join(summary_lines))
return all_results
elif logs_dir:
# Process single directory
return process_single_test(logs_dir, tablefmt, sla)
return process_single_test(
logs_dir, tablefmt, sla, success_threshold, print_output
)
else:
print("Error: Must provide either logs_dir or log_paths")
logging.error("Must provide either logs_dir or log_paths")
return None
......
......@@ -15,10 +15,42 @@
import re
from dataclasses import dataclass
from enum import Enum, auto
from typing import Dict, Optional, Pattern
from typing_extensions import TypedDict
from tests.utils.managed_deployment import DeploymentSpec
class TestPhase(Enum):
"""Enum representing different test phases in fault tolerance testing."""
STANDARD = auto()
OVERFLOW = auto()
RECOVERY = auto()
class DeploymentInfo(TypedDict, total=False):
"""Information about a deployment configuration.
Attributes:
spec: DeploymentSpec object defining the deployment configuration
backend: Backend type - "vllm", "sglang", or "trtllm"
model: Optional model identifier (e.g., "deepseek-ai/DeepSeek-V2-Lite")
is_moe: Optional flag indicating if this is a Mixture-of-Experts model
"""
spec: DeploymentSpec
backend: str
model: str
is_moe: bool
# Test phase suffixes derived from TestPhase enum
OVERFLOW_SUFFIX = f"_{TestPhase.OVERFLOW.name.lower()}"
RECOVERY_SUFFIX = f"_{TestPhase.RECOVERY.name.lower()}"
# Worker name mapping for different backends
WORKER_MAP = {
"vllm": {
......@@ -102,6 +134,13 @@ class Load:
sla: Optional[float] = None
client_type: str = "aiperf" # "aiperf" or "legacy"
max_request_rate: float = 1.0 # Rate limiting for legacy client (requests/sec)
success_threshold: float = 90.0 # Success rate threshold for tests
# For mixed token testing (overflow + recovery)
mixed_token_test: bool = False
overflow_token_length: Optional[int] = None # Tokens for overflow requests
overflow_request_count: int = 15 # Number of overflow requests
normal_request_count: int = 15 # Number of normal requests after overflow
@dataclass
......@@ -113,6 +152,31 @@ class Failure:
replicas: int = 1
@dataclass
class TokenOverflowFailure(Failure):
"""
Failure type for injecting token overflow (prompt > max_seq_len)
"""
overflow_multiplier: float = 2.0 # How much to exceed max_seq_len (e.g., 2.0 = 2x)
max_seq_len: int = 1024
def __init__(
self,
time: int,
max_seq_len: int = 1024,
overflow_multiplier: float = 2.0,
):
super().__init__(
time=time,
pod_name="Client",
command="token_overflow",
)
self.max_seq_len = max_seq_len
self.overflow_multiplier = overflow_multiplier
self.overflow_token_count = int(max_seq_len * overflow_multiplier)
@dataclass
class Scenario:
deployment: DeploymentSpec
......@@ -123,9 +187,17 @@ class Scenario:
# Helper functions to create deployment specs
def _create_deployment_spec(backend, deploy_type, yaml_path):
"""Create a deployment spec with backend information."""
return {"spec": DeploymentSpec(yaml_path), "backend": backend}
def _create_deployment_spec(backend: str, yaml_path: str) -> DeploymentInfo:
"""Create a deployment spec with backend information.
Args:
backend: Backend type ("vllm", "sglang", or "trtllm")
yaml_path: Path to the deployment YAML file
Returns:
DeploymentInfo dictionary with spec and backend
"""
return DeploymentInfo(spec=DeploymentSpec(yaml_path), backend=backend)
def _set_replicas(deployment_spec, backend, deploy_type, replicas):
......@@ -171,9 +243,16 @@ def _set_tensor_parallel(deployment_spec, backend, deploy_type, tp_size):
spec[decode_worker].tensor_parallel_size = tp_size
def _create_deployments_for_backend(backend):
"""Create all deployment specifications for a given backend."""
deployments = {}
def _create_deployments_for_backend(backend: str) -> Dict[str, DeploymentInfo]:
"""Create all deployment specifications for a given backend.
Args:
backend: Backend type ("vllm", "sglang", or "trtllm")
Returns:
Dictionary mapping deployment names to DeploymentInfo objects
"""
deployments: Dict[str, DeploymentInfo] = {}
# Define the yaml files for agg and disagg deployments
yaml_files = {
......@@ -210,9 +289,7 @@ def _create_deployments_for_backend(backend):
scenario_name = "-".join(name_parts)
# Create and configure the deployment
deployment = _create_deployment_spec(
backend, deploy_type, yaml_files[deploy_type]
)
deployment = _create_deployment_spec(backend, yaml_files[deploy_type])
if tp_size > 1:
_set_tensor_parallel(deployment, backend, deploy_type, tp_size)
if dp_replicas > 1:
......@@ -223,9 +300,18 @@ def _create_deployments_for_backend(backend):
return deployments
def _create_moe_deployments_for_backend(backend="vllm"):
"""Create MoE-specific deployment configurations for DeepSeek-V2-Lite."""
deployments = {}
def _create_moe_deployments_for_backend(
backend: str = "vllm",
) -> Dict[str, DeploymentInfo]:
"""Create MoE-specific deployment configurations for DeepSeek-V2-Lite.
Args:
backend: Backend type (default: "vllm")
Returns:
Dictionary mapping deployment names to DeploymentInfo objects
"""
deployments: Dict[str, DeploymentInfo] = {}
# Only test tp=1, dp=2 for now
tp_size = 1
......@@ -241,12 +327,12 @@ def _create_moe_deployments_for_backend(backend="vllm"):
for deploy_type in ["agg", "disagg"]:
scenario_name = f"{backend}-moe-{deploy_type}-tp-{tp_size}-dp-{dp_replicas}"
deployment = {
"spec": DeploymentSpec(yaml_files[deploy_type]),
"backend": backend,
"model": "deepseek-ai/DeepSeek-V2-Lite",
"is_moe": True,
}
deployment = DeploymentInfo(
spec=DeploymentSpec(yaml_files[deploy_type]),
backend=backend,
model="deepseek-ai/DeepSeek-V2-Lite",
is_moe=True,
)
deployments[scenario_name] = deployment
......@@ -254,13 +340,13 @@ def _create_moe_deployments_for_backend(backend="vllm"):
# Create all deployment specifications
deployment_specs = {}
deployment_specs.update(_create_deployments_for_backend("vllm"))
deployment_specs.update(_create_deployments_for_backend("sglang"))
deployment_specs.update(_create_deployments_for_backend("trtllm"))
DEPLOYMENT_SPECS: Dict[str, DeploymentInfo] = {}
DEPLOYMENT_SPECS.update(_create_deployments_for_backend("vllm"))
DEPLOYMENT_SPECS.update(_create_deployments_for_backend("sglang"))
DEPLOYMENT_SPECS.update(_create_deployments_for_backend("trtllm"))
# Add MoE deployments for vLLM only
deployment_specs.update(_create_moe_deployments_for_backend("vllm"))
DEPLOYMENT_SPECS.update(_create_moe_deployments_for_backend("vllm"))
# Each failure scenaro contains a list of failure injections
......@@ -340,6 +426,7 @@ def create_aiperf_load(
max_retries: int = 3,
sla: Optional[float] = None,
max_request_rate: float = 1.0,
success_threshold: float = 90.0,
) -> Load:
"""Create a Load configuration for AI-Perf client.
......@@ -351,6 +438,7 @@ def create_aiperf_load(
max_retries: Maximum retry attempts - AI-Perf retries entire test (default: 3)
sla: Optional SLA threshold for latency (default: None)
max_request_rate: Rate limiting for requests/sec (default: 1.0)
success_threshold: Success rate threshold for pass/fail (default: 90.0)
Returns:
Load instance configured for AI-Perf client
......@@ -367,6 +455,7 @@ def create_aiperf_load(
sla=sla,
client_type="aiperf",
max_request_rate=max_request_rate,
success_threshold=success_threshold,
)
......@@ -378,6 +467,7 @@ def create_legacy_load(
max_retries: int = 1,
sla: Optional[float] = None,
max_request_rate: float = 1.0,
success_threshold: float = 90.0,
) -> Load:
"""Create a Load configuration for legacy custom client.
......@@ -389,6 +479,7 @@ def create_legacy_load(
max_retries: Maximum retry attempts - legacy retries per request (default: 1)
sla: Optional SLA threshold for latency (default: None)
max_request_rate: Rate limiting for requests/sec (default: 1.0)
success_threshold: Success rate threshold for pass/fail (default: 90.0)
Returns:
Load instance configured for legacy client
......@@ -405,6 +496,7 @@ def create_legacy_load(
sla=sla,
client_type="legacy",
max_request_rate=max_request_rate,
success_threshold=success_threshold,
)
......@@ -439,7 +531,7 @@ for backend in ["vllm", "sglang", "trtllm"]:
backend, "disagg"
)
for deployment_name, deployment_info in deployment_specs.items():
for deployment_name, deployment_info in DEPLOYMENT_SPECS.items():
backend = deployment_info["backend"]
# Check if this is an MoE deployment
......@@ -481,3 +573,140 @@ for deployment_name, deployment_info in deployment_specs.items():
model=scenario_model,
backend=backend,
)
# Add token overflow test scenarios
def add_token_overflow_scenarios():
"""
Add test scenarios for token overflow (prompt > max_seq_len) failures
"""
overflow_test_configs = [
# vLLM tests
{
"name": "vllm_agg_token_overflow_2x",
"deployment_key": "vllm-agg-tp-1-dp-1",
"backend": "vllm",
},
{
"name": "vllm_disagg_token_overflow_2x",
"deployment_key": "vllm-disagg-prefill-tp-2-decode-tp-2-dp-1",
"backend": "vllm",
},
# TRT-LLM tests
{
"name": "trtllm_agg_token_overflow_2x",
"deployment_key": "trtllm-agg-tp-1-dp-1",
"backend": "trtllm",
},
{
"name": "trtllm_disagg_token_overflow_2x",
"deployment_key": "trtllm-disagg-prefill-tp-2-decode-tp-2-dp-1",
"backend": "trtllm",
},
# SGLang tests
{
"name": "sglang_agg_token_overflow_2x",
"deployment_key": "sglang-agg-tp-1-dp-1",
"backend": "sglang",
},
{
"name": "sglang_disagg_token_overflow_2x",
"deployment_key": "sglang-disagg-prefill-tp-2-decode-tp-2-dp-1",
"backend": "sglang",
},
]
# Common configuration for all tests
MAX_SEQ_LEN = 1024
OVERFLOW_MULTIPLIER = 2.0
OVERFLOW_REQUESTS = 15 # Number of oversized requests to send
NORMAL_REQUESTS = 15 # Number of normal requests to send after overflow
for config in overflow_test_configs:
# Skip if deployment doesn't exist
if config["deployment_key"] not in DEPLOYMENT_SPECS:
continue
overflow_scenario_name = config["name"]
deployment_info = DEPLOYMENT_SPECS[config["deployment_key"]]
scenario_model = deployment_info.get("model", model)
deployment_spec = deployment_info["spec"]
backend = config["backend"]
is_agg = (
"disagg" not in config["deployment_key"]
) # If not disaggregated, then it's aggregated
workers = WORKER_MAP[backend]
# Get the correct decode worker name
if backend == "trtllm" and is_agg:
decode_worker = workers["decode_agg"]
else:
decode_worker = workers["decode"]
prefill_worker = workers["prefill"]
# Determine argument name based on backend
if backend == "trtllm":
arg_name = "--max-seq-len"
elif backend == "sglang":
arg_name = "--context-length"
else: # vllm
arg_name = "--max-model-len"
# Add arguments to appropriate workers
if is_agg:
# For aggregated, add only to decode worker
deployment_spec.add_arg_to_service(
decode_worker, arg_name, str(MAX_SEQ_LEN)
)
else:
# For disaggregated, add to both prefill and decode workers
deployment_spec.add_arg_to_service(
prefill_worker, arg_name, str(MAX_SEQ_LEN)
)
deployment_spec.add_arg_to_service(
decode_worker, arg_name, str(MAX_SEQ_LEN)
)
# Create overflow failure
overflow_failure = TokenOverflowFailure(
time=30, # Start after 30 seconds
max_seq_len=MAX_SEQ_LEN,
overflow_multiplier=OVERFLOW_MULTIPLIER,
)
# Create mixed load configuration for overflow + recovery testing
overflow_tokens = int(MAX_SEQ_LEN * OVERFLOW_MULTIPLIER)
normal_tokens = 512 # Well within MAX_SEQ_LEN
# Total requests = overflow + normal
total_requests = OVERFLOW_REQUESTS + NORMAL_REQUESTS
# Mixed load that tests both rejection and recovery
mixed_load = Load(
clients=3,
requests_per_client=total_requests,
input_token_length=normal_tokens,
output_token_length=50,
# Mixed token test configuration
mixed_token_test=True,
overflow_token_length=overflow_tokens,
overflow_request_count=OVERFLOW_REQUESTS,
normal_request_count=NORMAL_REQUESTS,
)
scenarios[overflow_scenario_name] = Scenario(
deployment=deployment_spec,
load=mixed_load,
failures=[overflow_failure],
model=scenario_model,
backend=backend,
)
# Add the token overflow scenarios
add_token_overflow_scenarios()
......@@ -3,6 +3,7 @@
import logging
import multiprocessing
import re
import time
from contextlib import contextmanager
......@@ -10,7 +11,14 @@ import pytest
from tests.fault_tolerance.deploy.client_factory import get_client_function
from tests.fault_tolerance.deploy.parse_factory import parse_test_results
from tests.fault_tolerance.deploy.scenarios import Load, scenarios
from tests.fault_tolerance.deploy.parse_results import process_overflow_recovery_test
from tests.fault_tolerance.deploy.scenarios import (
OVERFLOW_SUFFIX,
RECOVERY_SUFFIX,
Load,
TokenOverflowFailure,
scenarios,
)
from tests.utils.managed_deployment import ManagedDeployment
......@@ -80,6 +88,69 @@ def _clients(
# AI-Perf client uses retry_delay between attempts (default 5s)
retry_delay_or_rate = 5
# Check if this is a mixed token test (overflow + recovery)
# If mixed_token_test is True, run two phases; otherwise run normally
if hasattr(load_config, "mixed_token_test") and load_config.mixed_token_test:
logger.info(
f"Mixed token test: {load_config.overflow_request_count} overflow requests "
f"({load_config.overflow_token_length} tokens) + "
f"{load_config.normal_request_count} normal requests "
f"({load_config.input_token_length} tokens)"
)
# First phase: Send overflow requests
for i in range(load_config.clients):
proc_overflow = ctx.Process(
target=client_func,
args=(
deployment_spec,
namespace,
model,
request.node.name + OVERFLOW_SUFFIX,
i,
load_config.overflow_request_count, # 15 overflow requests
load_config.overflow_token_length, # 2x max_seq_len tokens
load_config.output_token_length,
load_config.max_retries,
retry_delay_or_rate,
),
)
proc_overflow.start()
procs.append(proc_overflow)
logger.debug(f"Started overflow client {i} (PID: {proc_overflow.pid})")
# Wait for overflow requests to complete
for proc in procs:
proc.join()
logger.info("Overflow requests completed. Starting recovery phase...")
# Second phase: Send normal requests to test recovery
procs_recovery = []
for i in range(load_config.clients):
proc_normal = ctx.Process(
target=client_func,
args=(
deployment_spec,
namespace,
model,
request.node.name + RECOVERY_SUFFIX,
i,
load_config.normal_request_count, # 15 normal requests
load_config.input_token_length, # Normal token count
load_config.output_token_length,
load_config.max_retries,
retry_delay_or_rate,
),
)
proc_normal.start()
procs_recovery.append(proc_normal)
logger.debug(f"Started recovery client {i} (PID: {proc_normal.pid})")
# Add recovery processes to main list
procs.extend(procs_recovery)
else:
# Normal test - single phase
for i in range(load_config.clients):
procs.append(
ctx.Process(
......@@ -113,6 +184,13 @@ def _inject_failures(failures, logger, deployment: ManagedDeployment): # noqa:
for failure in failures:
time.sleep(failure.time)
# Handle TokenOverflowFailure differently - it's a client-side injection
if isinstance(failure, TokenOverflowFailure):
# The actual overflow is handled by the client configuration
# which uses the input_token_length from the Load config
# This is just logging for visibility
continue
pods = deployment.get_pods(failure.pod_name)[failure.pod_name]
num_pods = len(pods)
......@@ -155,41 +233,109 @@ def results_table(request, scenario): # noqa: F811
"""
yield
# Determine log paths based on whether this is a mixed token test
log_paths = []
if hasattr(scenario.load, "mixed_token_test") and scenario.load.mixed_token_test:
# For mixed token tests, we have separate overflow and recovery directories
overflow_dir = f"{request.node.name}{OVERFLOW_SUFFIX}"
recovery_dir = f"{request.node.name}{RECOVERY_SUFFIX}"
log_paths = [overflow_dir, recovery_dir]
logging.info("Mixed token test detected. Looking for results in:")
logging.info(f" - Overflow phase: {overflow_dir}")
logging.info(f" - Recovery phase: {recovery_dir}")
else:
# Standard test with single directory
log_paths = [request.node.name]
# Use factory to auto-detect and parse results
try:
parse_test_results(
log_dir=None,
log_paths=[request.node.name],
log_paths=log_paths,
tablefmt="fancy_grid",
sla=scenario.load.sla,
success_threshold=scenario.load.success_threshold,
print_output=True,
# force_parser can be set based on client_type if needed
# force_parser=scenario.load.client_type,
)
except Exception as e:
logging.error(f"Failed to parse results for {request.node.name}: {e}")
except Exception:
logging.exception("Failed to parse results for %s", request.node.name)
global_result_list.append(request.node.name)
# Add all directories to global list for session summary
global_result_list.extend(log_paths)
@pytest.fixture(autouse=True, scope="session")
def results_summary():
"""Parse and display combined results for all tests in session.
Automatically detects result types and uses appropriate parsers.
"""
Session summary that processes all tests but only prints paired tests.
"""
yield
if not global_result_list:
logging.info("No test results to summarize")
return
# Use factory to auto-detect and parse combined results
# Step 1: Group directories
test_groups: dict[str, dict[str, str]] = {}
for log_path in global_result_list:
if log_path.endswith(OVERFLOW_SUFFIX):
base_name = log_path[: -len(OVERFLOW_SUFFIX)]
if base_name not in test_groups:
test_groups[base_name] = {}
test_groups[base_name]["overflow"] = log_path
elif log_path.endswith(RECOVERY_SUFFIX):
base_name = log_path[: -len(RECOVERY_SUFFIX)]
if base_name not in test_groups:
test_groups[base_name] = {}
test_groups[base_name]["recovery"] = log_path
# Step 2: Process all tests (get results) but only print paired ones
try:
# First, silently parse all tests to get results (for any downstream processing)
parse_test_results(
log_dir=None,
log_paths=global_result_list,
tablefmt="fancy_grid",
print_output=False, # Don't print anything
)
for base_name, paths in test_groups.items():
if "overflow" in paths and "recovery" in paths:
# Extract scenario from test name to pass configs
scenario_obj = None
match = re.search(r"\[(.*)\]", base_name)
if match:
scenario_name = match.group(1)
if scenario_name in scenarios:
scenario_obj = scenarios[scenario_name]
logging.info(
f"Found scenario '{scenario_name}' for combined results."
)
if not scenario_obj:
logging.warning(
f"Could not find scenario for '{base_name}'. Using default thresholds."
)
success_threshold = (
scenario_obj.load.success_threshold if scenario_obj else 90.0
)
logging.info(
f"Using success_threshold: {success_threshold} for combined summary of '{base_name}'"
)
# This function will print the combined summary
process_overflow_recovery_test(
overflow_path=paths["overflow"],
recovery_path=paths["recovery"],
tablefmt="fancy_grid",
sla=scenario_obj.load.sla if scenario_obj else None,
success_threshold=success_threshold,
)
except Exception as e:
logging.error(f"Failed to parse combined results: {e}")
......
......@@ -316,6 +316,59 @@ class DeploymentSpec:
def spec(self):
return self._deployment_spec
def add_arg_to_service(self, service_name: str, arg_name: str, arg_value: str):
"""
Add or override a command-line argument for a specific service
Args:
service_name: Name of the service (e.g., "VllmDecodeWorker", "TRTLLMWorker")
arg_name: Argument name (e.g., "--max-model-len", "--max-seq-len")
arg_value: Argument value (e.g., "1024")
"""
# Get the service
if service_name not in self._deployment_spec["spec"]["services"]:
raise ValueError(f"Service '{service_name}' not found in deployment spec")
service = self._deployment_spec["spec"]["services"][service_name]
# Ensure args list exists
if "extraPodSpec" not in service:
service["extraPodSpec"] = {"mainContainer": {}}
if "mainContainer" not in service["extraPodSpec"]:
service["extraPodSpec"]["mainContainer"] = {}
if "args" not in service["extraPodSpec"]["mainContainer"]:
service["extraPodSpec"]["mainContainer"]["args"] = []
args_list = service["extraPodSpec"]["mainContainer"]["args"]
# Convert to list if needed (sometimes it's a single string)
if isinstance(args_list, str):
import shlex
args_list = shlex.split(args_list)
service["extraPodSpec"]["mainContainer"]["args"] = args_list
# Find existing argument
arg_index = None
for i, arg in enumerate(args_list):
if arg == arg_name:
arg_index = i
break
if arg_index is not None:
# Argument found, check if it has a value
if arg_index + 1 < len(args_list) and not args_list[
arg_index + 1
].startswith("-"):
# Has a value, replace it
args_list[arg_index + 1] = arg_value
else:
# No value after the argument, insert the value
args_list.insert(arg_index + 1, arg_value)
else:
# Add new argument
args_list.extend([arg_name, arg_value])
def save(self, out_file: str):
"""Save updated deployment to file"""
with open(out_file, "w") as f:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment