Unverified Commit ec50db06 authored by Tzu-Ling Kan's avatar Tzu-Ling Kan Committed by GitHub
Browse files

feat: Add sglang k8 FT tests (#3227)


Signed-off-by: default avatartzulingk@nvidia.com <tzulingk@nvidia.com>
parent d4f0d2bc
......@@ -71,15 +71,16 @@ The test suite is organized around three core components: **Deployments**, **Cli
Deployments represent specific graphs that are deployed using the Dynamo Kubernetes Platform.
The following deployment configurations are defined in `scenarios.py`:
Below are some representative examples of the generated scenarios:
| Deployment Name | Description |
|-------------------------|-----------------------------------------------------------------------------|
| `agg-tp-1-dp-1` | Aggregated worker with 1 replica for each service (frontend, decode). |
| `agg-tp-1-dp-2` | Aggregated worker with 2 replicas for each service (frontend, decode). |
| `disagg-tp-1-dp-1` | Disaggregated deployment with 1 replica for each service (frontend, decode, prefill). |
| `disagg-tp-1-dp-2` | Disaggregated deployment with 2 replicas for each service (frontend, decode, prefill). |
| Example Scenario Name | Backend | Type | TP | DP | Description |
|-----------------------------------------------|---------|--------|----|----|---------------------------------------------------------|
| `vllm-agg-tp-1-dp-1` | vllm | agg | 1 | 1 | Basic aggregated worker. |
| `vllm-agg-tp-1-dp-2` | vllm | agg | 1 | 2 | Aggregated worker with Data Parallelism. |
| `sglang-agg-tp-4-dp-1` | sglang | agg | 4 | 1 | Aggregated SGLang worker with Tensor Parallelism. |
| `sglang-disagg-prefill-tp-2-decode-tp-2-dp-1` | sglang | disagg | 2 | 1 | Disaggregated SGLang workers with Tensor Parallelism. |
The full test matrix is generated from these parameters, creating comprehensive test coverage across all configurations.
#### Client Load
......@@ -95,26 +96,30 @@ sending signals to specified processes.
The following failure types are defined in `scenarios.py`:
| Failure Name | Description | Injection Method |
|--------------------------|-----------------------------------------------------------------------------|--------------------------------------------|
| `none` | No failure injection. | N/A |
| `frontend` | Terminate frontend process/pod. | `SIGINT` signal to `dynamo.frontend`. |
| `frontend_pod` | Delete frontend pod. | Kubernetes API pod deletion. |
| `decode_worker` | Terminate decode worker process/pod. | `SIGINT` signal to `dynamo.vllm` |
| `decode_worker_pod` | Delete decode worker pod. | Kubernetes API pod deletion. |
| `prefill_worker` | Terminate prefill worker process/pod. | `SIGINT` signal to`dynamo.vllm` |
| `prefill_worker_pod` | Delete prefill worker pod. | Kubernetes API pod deletion. |
| `vllm_decode_engine_core`| Terminate VLLM decode engine core process. | `SIGKILL` signal to `VLLM::EngineCore` |
| `vllm_prefill_engine_core`| Terminate VLLM prefill engine core process. | `SIGKILL` signal to `VLLM::EngineCore` |
| Failure Name | Description | Injection Method | Applicable Backends |
|-------------------------------|----------------------------------------------------|-------------------------------|---------------------|
| `none` | No failure injection (baseline). | N/A | All |
| `frontend` | Terminate frontend process. | `SIGINT` to `dynamo.frontend` | All |
| `frontend_pod` | Delete frontend pod. | Kubernetes API pod deletion | All |
| `decode_worker` | Terminate decode worker process. | `SIGKILL` to `dynamo.<backend>` | All |
| `decode_worker_pod` | Delete decode worker pod. | Kubernetes API pod deletion | All |
| `prefill_worker` | Terminate prefill worker process. | `SIGKILL` to `dynamo.<backend>` | All |
| `prefill_worker_pod` | Delete prefill worker pod. | Kubernetes API pod deletion | All |
| `vllm_decode_engine_core` | Terminate VLLM decode engine core process. | `SIGKILL` to `VLLM::EngineCore` | vllm only |
| `vllm_prefill_engine_core` | Terminate VLLM prefill engine core process. | `SIGKILL` to `VLLM::EngineCore` | vllm only |
| `sglang_decode_scheduler` | Terminate SGLang decode scheduler process. | `SIGKILL` to `sglang::scheduler`| sglang only |
| `sglang_decode_detokenizer` | Terminate SGLang decode detokenizer process. | `SIGKILL` to `sglang::detokenizer`| sglang only |
| `sglang_prefill_scheduler` | Terminate SGLang prefill scheduler process. | `SIGKILL` to `sglang::scheduler`| sglang only |
| `sglang_prefill_detokenizer` | Terminate SGLang prefill detokenizer process. | `SIGKILL` to `sglang::detokenizer`| sglang only |
#### Example Scenario Breakdown
**Scenario**: `agg-tp-2-dp-1-decode_worker`
**Scenario**: `sglang-agg-tp-2-dp-1-decode_worker`
- **Deployment**: Aggregation with 1 decoder worker replica (`agg-tp-2-dp-1`).
- **Backend**: `sglang`
- **Deployment**: Aggregation with 1 decoder worker replica, using 2 GPUs for tensor parallelism (`agg-tp-2-dp-1`).
- **Client Load**: 10 clients, 100 requests each, max request rate 1/sec.
- **Failure**: Terminates 1 decoder worker process 10 seconds into the test.
- **Failure**: Terminates 1 decoder worker process 30 seconds into the test.
#### Example Scenario Execution:
......
......@@ -18,6 +18,17 @@ from typing import Optional
from tests.utils.managed_deployment import DeploymentSpec
WORKER_MAP = {
"vllm": {
"decode": "VllmDecodeWorker",
"prefill": "VllmPrefillWorker",
},
"sglang": {
"decode": "decode",
"prefill": "prefill",
},
}
@dataclass
class Load:
......@@ -45,66 +56,104 @@ class Scenario:
load: Load
failures: list[Failure]
model: Optional[str] = None
backend: str = "vllm" # Backend type for tracking
# Helper functions to create deployment specs
def _create_deployment_spec(backend, deploy_type, yaml_path):
"""Create a deployment spec with backend information."""
return {"spec": DeploymentSpec(yaml_path), "backend": backend}
def _set_replicas(deployment_spec, backend, deploy_type, replicas):
"""Set replicas for all components in a deployment based on backend type."""
spec = deployment_spec["spec"]
# Frontend is common for all backends
spec["Frontend"].replicas = replicas
if backend in WORKER_MAP:
# always scale decode
spec[WORKER_MAP[backend]["decode"]].replicas = replicas
# scale prefill only for disagg
if deploy_type == "disagg":
spec[WORKER_MAP[backend]["prefill"]].replicas = replicas
def _set_tensor_parallel(deployment_spec, backend, deploy_type, tp_size):
"""Set tensor parallel size for worker components."""
spec = deployment_spec["spec"]
if backend in WORKER_MAP:
decode_worker = WORKER_MAP[backend]["decode"]
prefill_worker = WORKER_MAP[backend]["prefill"]
if deploy_type == "agg":
if hasattr(spec, "set_tensor_parallel"):
spec.set_tensor_parallel(tp_size, [decode_worker])
else:
spec[decode_worker].tensor_parallel_size = tp_size
elif deploy_type == "disagg":
spec[prefill_worker].tensor_parallel_size = tp_size
spec[decode_worker].tensor_parallel_size = tp_size
def _create_deployments_for_backend(backend):
"""Create all deployment specifications for a given backend."""
deployments = {}
# Define the yaml files for agg and disagg deployments
yaml_files = {
"agg": f"components/backends/{backend}/deploy/agg.yaml",
"disagg": f"components/backends/{backend}/deploy/disagg.yaml",
}
# Define the different configurations to test
configurations = [
{"tp": 1, "dp": 1},
{"tp": 1, "dp": 2},
{"tp": 2, "dp": 1},
{"tp": 4, "dp": 1},
]
for deploy_type in ["agg", "disagg"]:
for config in configurations:
tp_size = config["tp"]
dp_replicas = config["dp"]
# Skip creating disagg scenarios for TP > 1 if DP is also > 1 (uncommon case)
if deploy_type == "disagg" and tp_size > 1 and dp_replicas > 1:
continue
# Construct the scenario name
name_parts = [backend, deploy_type]
# Each Deployment Spec contains
# the dynamo deployment configuration
if deploy_type == "agg":
name_parts.append(f"tp-{tp_size}")
elif deploy_type == "disagg":
name_parts.append(f"prefill-tp-{tp_size}-decode-tp-{tp_size}")
name_parts.append(f"dp-{dp_replicas}")
scenario_name = "-".join(name_parts)
# Create and configure the deployment
deployment = _create_deployment_spec(
backend, deploy_type, yaml_files[deploy_type]
)
if tp_size > 1:
_set_tensor_parallel(deployment, backend, deploy_type, tp_size)
if dp_replicas > 1:
_set_replicas(deployment, backend, deploy_type, dp_replicas)
deployments[scenario_name] = deployment
return deployments
deployment_specs = {
"agg-tp-1-dp-1": (
DeploymentSpec("/workspace/components/backends/vllm/deploy/agg.yaml")
),
"disagg-tp-1-dp-1": (
DeploymentSpec("/workspace/components/backends/vllm/deploy/disagg.yaml")
),
}
# TP-2 scenarios
deployment_specs["agg-tp-2-dp-1"] = DeploymentSpec(
"/workspace/components/backends/vllm/deploy/agg.yaml"
)
deployment_specs["agg-tp-2-dp-1"].set_tensor_parallel(2, ["VllmDecodeWorker"])
deployment_specs["disagg-prefill-tp-2-decode-tp-2-dp-1"] = DeploymentSpec(
"/workspace/components/backends/vllm/deploy/disagg.yaml"
)
deployment_specs["disagg-prefill-tp-2-decode-tp-2-dp-1"][
"VllmPrefillWorker"
].tensor_parallel_size = 2
deployment_specs["disagg-prefill-tp-2-decode-tp-2-dp-1"][
"VllmDecodeWorker"
].tensor_parallel_size = 2
# TP-4 scenarios
deployment_specs["agg-tp-4-dp-1"] = DeploymentSpec(
"/workspace/components/backends/vllm/deploy/agg.yaml"
)
deployment_specs["agg-tp-4-dp-1"].set_tensor_parallel(4, ["VllmDecodeWorker"])
deployment_specs["disagg-prefill-tp-4-decode-tp-4-dp-1"] = DeploymentSpec(
"/workspace/components/backends/vllm/deploy/disagg.yaml"
)
deployment_specs["disagg-prefill-tp-4-decode-tp-4-dp-1"][
"VllmPrefillWorker"
].tensor_parallel_size = 4
deployment_specs["disagg-prefill-tp-4-decode-tp-4-dp-1"][
"VllmDecodeWorker"
].tensor_parallel_size = 4
# Derivative Specs With Incremented Replicats
deployment_specs["agg-tp-1-dp-2"] = DeploymentSpec(
"/workspace/components/backends/vllm/deploy/agg.yaml"
)
deployment_specs["agg-tp-1-dp-2"]["Frontend"].replicas = 2
deployment_specs["agg-tp-1-dp-2"]["VllmDecodeWorker"].replicas = 2
deployment_specs["disagg-tp-1-dp-2"] = DeploymentSpec(
"/workspace/components/backends/vllm/deploy/disagg.yaml"
)
deployment_specs["disagg-tp-1-dp-2"]["Frontend"].replicas = 2
deployment_specs["disagg-tp-1-dp-2"]["VllmDecodeWorker"].replicas = 2
deployment_specs["disagg-tp-1-dp-2"]["VllmPrefillWorker"].replicas = 2
# Create all deployment specifications
deployment_specs = {}
deployment_specs.update(_create_deployments_for_backend("vllm"))
deployment_specs.update(_create_deployments_for_backend("sglang"))
# Each failure scenaro contains a list of failure injections
......@@ -114,25 +163,49 @@ deployment_specs["disagg-tp-1-dp-2"]["VllmPrefillWorker"].replicas = 2
#
# Example:
#
# "prefill_worker": [[30, [("dynamo_prefillworker", 1)]]],
# "prefill_worker": [Failure(30, "VllmPrefillWorker", "dynamo.vllm", "SIGKILL")],
#
# terminates 1 prefill worker after 30 seconds
failures = {
def _create_backend_failures(backend):
"""Generate backend-specific failure scenarios."""
workers = WORKER_MAP[backend]
decode_worker = workers["decode"]
prefill_worker = workers["prefill"]
process_name = f"dynamo.{backend}"
failures = {
"frontend": [Failure(30, "Frontend", "dynamo.frontend")],
"frontend_pod": [Failure(30, "Frontend", "delete_pod")],
"decode_worker": [Failure(30, "VllmDecodeWorker", "dynamo.vllm", "SIGKILL")],
"decode_worker_pod": [Failure(30, "VllmDecodeWorker", "delete_pod")],
"prefill_worker": [Failure(30, "VllmPrefillWorker", "dynamo.vllm", "SIGKILL")],
"prefill_worker_pod": [Failure(30, "VllmPrefillWorker", "delete_pod")],
"vllm_decode_engine_core": [
Failure(30, "VllmDecodeWorker", "VLLM::EngineCore", "SIGKILL")
],
"vllm_prefill_engine_core": [
Failure(30, "VllmPrefillWorker", "VLLM::EngineCore", "SIGKILL")
],
"decode_worker": [Failure(30, decode_worker, process_name, "SIGKILL")],
"decode_worker_pod": [Failure(30, decode_worker, "delete_pod")],
"prefill_worker": [Failure(30, prefill_worker, process_name, "SIGKILL")],
"prefill_worker_pod": [Failure(30, prefill_worker, "delete_pod")],
"none": [],
}
}
if backend == "vllm":
failures["vllm_decode_engine_core"] = [
Failure(30, decode_worker, "VLLM::EngineCore", "SIGKILL")
]
failures["vllm_prefill_engine_core"] = [
Failure(30, prefill_worker, "VLLM::EngineCore", "SIGKILL")
]
elif backend == "sglang":
failures["sglang_decode_scheduler"] = [
Failure(30, decode_worker, "sglang::scheduler", "SIGKILL")
]
failures["sglang_decode_detokenizer"] = [
Failure(30, decode_worker, "sglang::detokenizer", "SIGKILL")
]
failures["sglang_prefill_scheduler"] = [
Failure(30, prefill_worker, "sglang::scheduler", "SIGKILL")
]
failures["sglang_prefill_detokenizer"] = [
Failure(30, prefill_worker, "sglang::detokenizer", "SIGKILL")
]
return failures
load = Load()
......@@ -144,10 +217,34 @@ model = None
scenarios = {}
for deployment_name, deployment_spec in deployment_specs.items():
for failure_name, failure in failures.items():
# Map of backend to failure definitions
backend_failure_map = {
"vllm": _create_backend_failures("vllm"),
"sglang": _create_backend_failures("sglang"),
}
for deployment_name, deployment_info in deployment_specs.items():
backend = deployment_info["backend"]
# Validate backend
if backend not in backend_failure_map:
raise ValueError(
f"Unsupported backend: {backend}. Supported backends are: {list(backend_failure_map.keys())}"
)
# Get the appropriate failure set for this backend
failure_set = backend_failure_map[backend]
for failure_name, failure in failure_set.items():
# Skip prefill failures for aggregated deployments
if "prefill" in failure_name and "disagg" not in deployment_name:
continue
scenarios[f"{deployment_name}-{failure_name}"] = Scenario(
deployment=deployment_spec, load=load, failures=failure, model=model
scenario_name = f"{deployment_name}-{failure_name}"
scenarios[scenario_name] = Scenario(
deployment=deployment_info["spec"],
load=load,
failures=failure,
model=model,
backend=backend,
)
......@@ -147,7 +147,18 @@ async def test_fault_scenario(
scenario.deployment.set_model(scenario.model)
model = scenario.model
else:
# Get model from the appropriate worker based on backend
try:
if scenario.backend == "vllm":
model = scenario.deployment["VllmDecodeWorker"].model
elif scenario.backend == "sglang":
model = scenario.deployment["decode"].model
else:
model = None
except (KeyError, AttributeError):
model = None
# Fallback to default if still None
model = model or "Qwen/Qwen3-0.6B"
scenario.deployment.set_logging(True, "info")
......
......@@ -7,8 +7,8 @@ import os
import re
import shlex
import time
from dataclasses import dataclass
from typing import Any, Optional
from dataclasses import dataclass, field
from typing import Any, List, Optional
import kr8s
import kubernetes
......@@ -59,7 +59,7 @@ class ServiceSpec:
@property
def model(self) -> Optional[str]:
"""Model being served by this service"""
"""Model being served by this service (checks both --model and --model-path)"""
try:
args_list = self._spec["extraPodSpec"]["mainContainer"]["args"]
except KeyError:
......@@ -67,7 +67,7 @@ class ServiceSpec:
args_str = " ".join(args_list)
parts = shlex.split(args_str)
for i, part in enumerate(parts):
if part == "--model":
if part in ["--model", "--model-path"]:
return parts[i + 1] if i + 1 < len(parts) else None
return None
......@@ -82,9 +82,10 @@ class ServiceSpec:
args_str = " ".join(args_list)
parts = shlex.split(args_str)
# Try to update --model first, then --model-path
model_index = None
for i, part in enumerate(parts):
if part == "--model":
if part in ["--model", "--model-path"]:
model_index = i
break
......@@ -360,6 +361,7 @@ class ManagedDeployment:
_port_forward: Optional[Any] = None
_deployment_name: Optional[str] = None
_apps_v1: Optional[Any] = None
_active_port_forwards: List[Any] = field(default_factory=list)
def __post_init__(self):
self._deployment_name = self.deployment_spec.name
......@@ -673,40 +675,100 @@ class ManagedDeployment:
raise
def port_forward(self, pod, remote_port, max_connection_attempts=3):
"""Attempt to connect to a pod and return the port-forward object on success."""
"""Attempt to connect to a pod and return the port-forward object on success.
Note: Port forwards run in background threads. When pods are terminated,
the async cleanup may fail, which is expected and can be safely ignored.
"""
try:
# Create port forward - this runs in a background thread
# Use 127.0.0.1 (localhost) instead of 0.0.0.0 to prevent port conflicts
port_forward = pod.portforward(
remote_port=remote_port,
local_port=0,
address="0.0.0.0",
local_port=0, # Auto-assign an available port
address="127.0.0.1", # Use localhost for better isolation and conflict prevention
)
port_forward.start()
for _ in range(max_connection_attempts):
# Try to connect with exponential backoff
backoff_delay = 0.5 # Start with 500ms
for attempt in range(max_connection_attempts):
time.sleep(backoff_delay)
backoff_delay = min(
backoff_delay * 1.5, 5.0
) # Double delay, max 5 seconds
# Check if port is assigned
if port_forward.local_port == 0:
time.sleep(1)
self._logger.debug(
f"Port not yet assigned for pod {pod.name} (attempt {attempt+1}/{max_connection_attempts})"
)
continue
# Try to connect to the port forwarded service
test_url = f"http://localhost:{port_forward.local_port}/"
try:
# Send HEAD request to test connection
response = requests.head(test_url, timeout=5)
if response.status_code in (200, 404): # 404 is acceptable
self._active_port_forwards.append(port_forward)
return port_forward
except (requests.ConnectionError, requests.Timeout) as e:
self._logger.warning(f"Connection test failed for pod {pod.name}: {e}")
self._logger.warning(
f"Connection test failed for pod {pod.name} (attempt {attempt+1}/{max_connection_attempts}): {e}"
)
# Retry port-forward
# Restart port-forward for next attempt (except on last attempt)
if attempt == max_connection_attempts - 1:
continue
try:
port_forward.stop()
port_forward.start()
time.sleep(1)
except Exception as e:
self._logger.debug(
f"Error restarting port forward for pod {pod.name}: {e}"
)
break
# All attempts failed
self._logger.warning(
f"Port forward failed after {max_connection_attempts} attempts for pod {pod.name}"
)
try:
port_forward.stop()
except Exception:
pass # Ignore errors during cleanup
return None
except Exception as e:
self._logger.warning(
f"Failed to create port forward for pod {pod.name}: {e}"
)
return None
async def _cleanup(self):
try:
# Collect logs/metrics first; any PFs opened here will be tracked and stopped below.
self._get_service_logs()
self._logger.info(
f"Cleaning up {len(self._active_port_forwards)} active port forwards"
)
for port_forward in self._active_port_forwards:
try:
port_forward.stop()
except RuntimeError as e:
# Expected error when pod is terminated:
# "anext(): asynchronous generator is already running"
if "anext()" in str(e) or "already running" in str(e):
self._logger.debug(f"Port forward cleanup: {e}")
else:
self._logger.warning(
f"Unexpected error stopping port forward: {e}"
)
except Exception as e:
self._logger.debug(f"Error stopping port forward: {e}")
self._active_port_forwards.clear()
finally:
await self._delete_deployment()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment