Unverified Commit ab5a31b5 authored by Alec's avatar Alec Committed by GitHub
Browse files

test(planner): isolate planner-family suites [DYN-2534] (#7723)

parent cc22114d
......@@ -37,7 +37,7 @@ spec:
memory: "100Gi"
extraPodSpec:
mainContainer:
image: my-registry/vllm-runtime:my-tag
image: my-registry/dynamo-frontend:my-tag
workingDir: /workspace/examples/backends/vllm
command:
- /bin/sh
......
......@@ -37,7 +37,7 @@ spec:
memory: "100Gi"
extraPodSpec:
mainContainer:
image: my-registry/vllm-runtime:my-tag
image: my-registry/dynamo-frontend:my-tag
workingDir: /workspace/examples/backends/vllm
command:
- /bin/sh
......
......@@ -40,7 +40,7 @@ spec:
memory: "100Gi"
extraPodSpec:
mainContainer:
image: my-registry/vllm-runtime:my-tag
image: my-registry/dynamo-frontend:my-tag
workingDir: /workspace/examples/backends/vllm
command:
- /bin/sh
......@@ -72,17 +72,17 @@ spec:
failureThreshold: 10
extraPodSpec:
mainContainer:
image: my-registry/vllm-runtime:my-tag
image: my-registry/dynamo-planner:my-tag
ports:
- name: metrics
containerPort: 9085
command:
- python3
- -m
- dynamo.planner
- python3
- -m
- dynamo.planner
args:
- --config
- '{"environment": "kubernetes", "backend": "vllm", "ttft": 200, "itl": 10, "profile_results_dir": "/workspace/tests/planner/profiling_results/H200_TP1P_TP1D/", "throughput_adjustment_interval": 60, "metric_reporting_prometheus_port": 9085, "no_correction": true}'
- '{"environment": "kubernetes", "backend": "vllm", "ttft": 200, "itl": 10, "profile_results_dir": "/workspace/components/src/dynamo/planner/tests/data/profiling_results/H200_TP1P_TP1D/", "throughput_adjustment_interval": 60, "metric_reporting_prometheus_port": 9085, "no_correction": true}'
VllmDecodeWorker:
envFromSecret: hf-token-secret
componentType: worker
......
......@@ -37,7 +37,7 @@ spec:
memory: "100Gi"
extraPodSpec:
mainContainer:
image: my-registry/vllm-runtime:my-tag
image: my-registry/dynamo-frontend:my-tag
workingDir: /workspace/examples/backends/vllm
command:
- /bin/sh
......
......@@ -17,25 +17,25 @@ spec:
app: vllm-runtime-image-cache
spec:
imagePullSecrets:
- name: nvcr-imagepullsecret
- name: nvcr-imagepullsecret
containers:
- name: image-cache
image: my-registry/vllm-runtime:my-tag
command:
- /bin/sh
- -c
- "sleep infinity"
resources:
requests:
cpu: "10m"
memory: "64Mi"
limits:
cpu: "100m"
memory: "128Mi"
- name: image-cache
image: my-registry/vllm-runtime:my-tag
command:
- /bin/sh
- -c
- "sleep infinity"
resources:
requests:
cpu: "10m"
memory: "64Mi"
limits:
cpu: "100m"
memory: "128Mi"
tolerations:
- key: node-role.kubernetes.io/master
operator: Exists
effect: NoSchedule
- key: node-role.kubernetes.io/control-plane
operator: Exists
effect: NoSchedule
- key: node-role.kubernetes.io/master
operator: Exists
effect: NoSchedule
- key: node-role.kubernetes.io/control-plane
operator: Exists
effect: NoSchedule
......@@ -12,7 +12,7 @@ spec:
replicas: 1
extraPodSpec:
mainContainer:
image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:my-tag
image: nvcr.io/nvidia/ai-dynamo/dynamo-frontend:my-tag
workingDir: /workspace/examples/backends/vllm
command:
- python3
......@@ -26,11 +26,11 @@ spec:
replicas: 1
extraPodSpec:
mainContainer:
image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:my-tag
image: nvcr.io/nvidia/ai-dynamo/dynamo-planner:my-tag
command:
- python3
- -m
- dynamo.planner
- python3
- -m
- dynamo.planner
args:
- --config
- '{"environment": "kubernetes", "backend": "vllm", "enable_load_scaling": true, "enable_throughput_scaling": false, "pre_deployment_sweeping_mode": "none", "load_adjustment_interval": 5, "load_min_observations": 5}'
......
......@@ -12,20 +12,20 @@ spec:
replicas: 1
extraPodSpec:
mainContainer:
image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:my-tag
image: nvcr.io/nvidia/ai-dynamo/dynamo-frontend:my-tag
Planner:
componentType: planner
replicas: 1
extraPodSpec:
mainContainer:
image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:my-tag
image: nvcr.io/nvidia/ai-dynamo/dynamo-planner:my-tag
command:
- python3
- -m
- dynamo.planner
- python3
- -m
- dynamo.planner
args:
- --config
- '{"environment": "kubernetes", "backend": "vllm", "throughput_adjustment_interval": 60, "profile_results_dir": "/workspace/tests/planner/profiling_results/H200_TP1P_TP1D", "no_correction": true}'
- '{"environment": "kubernetes", "backend": "vllm", "throughput_adjustment_interval": 60, "profile_results_dir": "/workspace/components/src/dynamo/planner/tests/data/profiling_results/H200_TP1P_TP1D", "no_correction": true}'
VllmDecodeWorker:
envFromSecret: hf-token-secret
componentType: worker
......
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# Run SLA planner scaling end-to-end test
# This script:
# 1. Deploys the disaggregated planner if not already running
# 2. Sets up port forwarding to localhost:8000
# 3. Waits for the deployment to be ready
# 4. Runs the scaling test (8 req/s -> 18 req/s)
# 5. Cleans up
#
# Supports two modes:
# --mode throughput (default) Uses throughput-based planner
# --mode load Uses load-based planner with regression scaling
set -e
# Configuration
NAMESPACE=${NAMESPACE:-default}
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
TEST_FILE="$SCRIPT_DIR/../test_scaling_e2e.py"
TEST_FILE="$SCRIPT_DIR/scaling_e2e.py"
FRONTEND_PORT=8000
LOCAL_PORT=8000
LOCAL_PORT=""
DEPLOYMENT_NAME="vllm-disagg-planner"
SAVE_RESULTS=false
MODE="throughput"
DEPLOYED_BY_US=false
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
NC='\033[0m'
log_info() {
echo -e "${BLUE}[INFO]${NC} $1"
......@@ -49,7 +37,21 @@ log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# Check prerequisites
find_free_local_port() {
local python_cmd="python3"
if ! command -v python3 &> /dev/null; then
python_cmd="python"
fi
"$python_cmd" - <<'PY'
import socket
with socket.socket() as sock:
sock.bind(("127.0.0.1", 0))
print(sock.getsockname()[1])
PY
}
check_prerequisites() {
log_info "Checking prerequisites..."
......@@ -68,48 +70,40 @@ check_prerequisites() {
exit 1
fi
# Check for aiperf
if ! command -v aiperf &> /dev/null; then
log_error "aiperf not found. This tool is required for load generation."
log_error "Please install the required dependencies by following the instructions in tests/planner/README.md"
log_error "Follow components/src/dynamo/planner/tests/manual/README.md for setup."
exit 1
fi
log_success "Prerequisites check passed"
}
# Check if deployment already exists and is running
check_existing_deployment() {
log_info "Checking for existing deployment..."
# Check for the DynamoGraphDeployment custom resource
if kubectl get dynamographdeployment "$DEPLOYMENT_NAME" -n "$NAMESPACE" &> /dev/null; then
log_info "DynamoGraphDeployment $DEPLOYMENT_NAME already exists - skipping redeployment"
# Check if the DynamoGraphDeployment is ready
local status
status=$(kubectl get dynamographdeployment "$DEPLOYMENT_NAME" -n "$NAMESPACE" -o jsonpath='{.status.state}')
if [ "$status" = "successful" ]; then
# Check if frontend pod is running
# Note: operator automatically prefixes k8s namespace to dynamo-namespace
if kubectl get pods -n "$NAMESPACE" -l "nvidia.com/dynamo-component-type=frontend,nvidia.com/dynamo-namespace=${NAMESPACE}-vllm-disagg-planner" --field-selector=status.phase=Running | grep -q .; then
log_success "Existing deployment is ready"
return 0
else
log_warning "Existing deployment pods are not ready, will redeploy"
return 1
fi
else
log_warning "Existing deployment is not ready (status: $status), will redeploy"
log_warning "Existing deployment pods are not ready, will redeploy"
return 1
fi
else
log_info "No existing deployment found"
log_warning "Existing deployment is not ready (status: $status), will redeploy"
return 1
fi
log_info "No existing deployment found"
return 1
}
# Deploy the planner
deploy_planner() {
log_info "Deploying SLA planner..."
......@@ -118,55 +112,28 @@ deploy_planner() {
exit 1
fi
# Apply the deployment
if kubectl apply -f "$YAML_FILE" -n "$NAMESPACE"; then
log_success "Deployment applied successfully"
else
log_error "Failed to apply deployment"
exit 1
fi
kubectl apply -f "$YAML_FILE" -n "$NAMESPACE"
log_success "Deployment applied successfully"
log_info "Waiting for DynamoGraphDeployment to be processed..."
if kubectl wait --for=condition=Ready dynamographdeployment/"$DEPLOYMENT_NAME" -n "$NAMESPACE" --timeout=600s; then
log_success "DynamoGraphDeployment is ready"
else
log_error "DynamoGraphDeployment failed to become ready within timeout"
exit 1
fi
log_info "Waiting for pods to be running (this may take several minutes for image pulls)..."
kubectl wait --for=condition=Ready dynamographdeployment/"$DEPLOYMENT_NAME" -n "$NAMESPACE" --timeout=600s
log_success "DynamoGraphDeployment is ready"
log_info "Waiting for frontend pod..."
# Note: operator automatically prefixes k8s namespace to dynamo-namespace
if kubectl wait --for=condition=Ready pod -l "nvidia.com/dynamo-component-type=frontend,nvidia.com/dynamo-namespace=${NAMESPACE}-vllm-disagg-planner" -n "$NAMESPACE" --timeout=900s; then
log_success "Frontend pod is ready"
else
log_error "Frontend pod failed to become ready within timeout"
exit 1
fi
kubectl wait --for=condition=Ready pod -l "nvidia.com/dynamo-component-type=frontend,nvidia.com/dynamo-namespace=${NAMESPACE}-vllm-disagg-planner" -n "$NAMESPACE" --timeout=900s
log_success "Frontend pod is ready"
log_info "Waiting for all pods to be running..."
log_info "Waiting for planner pod..."
kubectl wait --for=condition=Ready pod -l "nvidia.com/dynamo-component-type=planner,nvidia.com/dynamo-namespace=${NAMESPACE}-vllm-disagg-planner" -n "$NAMESPACE" --timeout=900s
sleep 30
}
setup_port_forward() {
log_info "Setting up port forwarding..."
# Kill any existing port forward on the same port
if lsof -ti:$LOCAL_PORT &> /dev/null; then
log_warning "Port $LOCAL_PORT is already in use, attempting to free it..."
kill "$(lsof -ti:$LOCAL_PORT)" 2>/dev/null || true
sleep 2
fi
LOCAL_PORT=$(find_free_local_port)
log_info "Using local port $LOCAL_PORT for frontend port-forward"
local frontend_service="vllm-disagg-planner-frontend"
if ! kubectl get service "$frontend_service" -n "$NAMESPACE" &> /dev/null; then
log_error "Frontend service '$frontend_service' not found"
return 1
fi
log_info "Port forwarding to service: $frontend_service"
kubectl port-forward service/"$frontend_service" "$LOCAL_PORT:$FRONTEND_PORT" -n "$NAMESPACE" >/dev/null 2>&1 &
PORT_FORWARD_PID=$!
......@@ -194,11 +161,14 @@ cleanup_port_forward() {
cleanup_deployment() {
log_info "Cleaning up deployment..."
kubectl delete -f "$YAML_FILE" -n "$NAMESPACE" --ignore-not-found
log_info "Waiting for cleanup to complete..."
kubectl wait --for=delete dynamographdeployment/"$DEPLOYMENT_NAME" -n "$NAMESPACE" --timeout=120s || true
}
log_info "Cleanup complete"
cleanup() {
cleanup_port_forward
if [ "$DEPLOYED_BY_US" = true ]; then
cleanup_deployment
fi
}
run_test() {
......@@ -209,19 +179,13 @@ run_test() {
python_cmd="python"
fi
local test_args="--namespace $NAMESPACE --mode $MODE"
local test_args="--namespace $NAMESPACE --mode $MODE --base-url http://localhost:$LOCAL_PORT"
if [ "$SAVE_RESULTS" = true ]; then
test_args="$test_args --save-results"
log_info "Results will be saved to tests/planner/e2e_scaling_results"
log_info "Results will be saved to components/src/dynamo/planner/tests/e2e_scaling_results"
fi
if $python_cmd "$TEST_FILE" $test_args; then
log_success "Scaling test PASSED"
return 0
else
log_error "Scaling test FAILED"
return 1
fi
$python_cmd "$TEST_FILE" $test_args
}
main() {
......@@ -245,70 +209,31 @@ main() {
;;
--help)
echo "Usage: $0 [--namespace NS] [--mode MODE] [--save-results]"
echo ""
echo "Run SLA planner scaling test (graduated 8->18 req/s prefill scaling)"
echo ""
echo "Options:"
echo " --namespace NS Kubernetes namespace (default: default)"
echo " --mode MODE Scaling mode: 'throughput' (default) or 'load'"
echo " --save-results Save results to tests/planner/e2e_scaling_results instead of /tmp"
echo " --help Show this help"
exit 0
;;
*)
log_error "Unknown option: $1"
echo "Use --help for usage information"
exit 1
;;
esac
done
# Select YAML based on mode
if [ "$MODE" = "load" ]; then
YAML_FILE="$SCRIPT_DIR/disagg_planner_load.yaml"
else
YAML_FILE="$SCRIPT_DIR/disagg_planner_throughput.yaml"
fi
log_info "SLA Planner Scaling Test"
log_info "Namespace: $NAMESPACE"
log_info "Mode: $MODE"
log_info "YAML: $YAML_FILE"
log_info "Scenario: Graduated 8->18 req/s (1P1D -> 2P1D prefill scaling, ISL=4000/OSL=150)"
check_prerequisites
trap cleanup EXIT
trap cleanup_port_forward EXIT
# Check if we need to deploy
local deployed_by_us=false
if ! check_existing_deployment; then
deploy_planner
deployed_by_us=true
fi
if ! setup_port_forward; then
log_error "Failed to setup port forwarding"
exit 1
fi
local test_result=0
if ! run_test; then
test_result=1
fi
# Only cleanup deployment if we deployed it
if [ "$deployed_by_us" = true ]; then
cleanup_deployment
fi
if [ $test_result -eq 0 ]; then
log_success "Test completed successfully!"
else
log_error "Test failed!"
DEPLOYED_BY_US=true
fi
exit $test_result
setup_port_forward
run_test
}
main "$@"
\ No newline at end of file
main "$@"
......@@ -2,31 +2,28 @@
# SPDX-License-Identifier: Apache-2.0
"""
End-to-end test for SLA planner scaling behavior.
Manual end-to-end scaling check for the SLA planner.
This test assumes a disaggregated planner deployment is already running
and accessible at localhost:8000. It monitors pod scaling and validates
that the planner correctly scales from 1P1D to 2P1D when load increases
through graduated phases: 8 req/s (baseline) → 15 req/s (moderate) → 25 req/s (prefill scaling trigger).
This script intentionally lives outside the automated test tree so it can be kept in the
planner image without being collected by pytest.
"""
import asyncio
import json
import logging
import subprocess
import sys
import time
import urllib.request
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
from utils.load_generator import LoadGenerator
from dynamo.planner.tests.unit.load_generator import LoadGenerator
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Test configuration constants
HEALTH_CHECK_TIMEOUT = 10
PORT_FORWARD_SETUP_DELAY = 3
FINAL_STABILIZATION_DELAY = 60
......@@ -58,19 +55,17 @@ class KubernetesMonitor:
self.pod_history: List[PodCounts] = []
def _run_kubectl(self, cmd: List[str]) -> Tuple[bool, str]:
"""Run kubectl command and return success status and output."""
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
return result.returncode == 0, result.stdout.strip()
except subprocess.TimeoutExpired:
logger.error(f"kubectl command timed out: {' '.join(cmd)}")
logger.error("kubectl command timed out: %s", " ".join(cmd))
return False, ""
except Exception as e:
logger.error(f"kubectl command failed: {e}")
except OSError as exc:
logger.error("kubectl command failed: %s", exc)
return False, ""
def get_pod_counts(self) -> Optional[PodCounts]:
"""Get current pod counts for prefill and decode workers."""
cmd = [
"kubectl",
"get",
......@@ -101,7 +96,6 @@ class KubernetesMonitor:
"nvidia.com/dynamo-sub-component-type", ""
)
# Only count Running pods
if pod_phase == "Running":
if sub_component == "prefill":
prefill_pods += 1
......@@ -117,19 +111,18 @@ class KubernetesMonitor:
decode_pods=decode_pods,
total_pods=total_pods,
)
self.pod_history.append(counts)
return counts
except Exception as e:
logger.error(f"Failed to parse pod counts: {e}")
except json.JSONDecodeError as exc:
logger.error("Failed to parse pod counts: %s", exc)
return None
async def monitor_scaling(
self, duration: int, interval: int = 10
) -> List[PodCounts]:
"""Monitor pod scaling for a given duration."""
logger.info(f"Monitoring pod scaling for {duration}s (interval: {interval}s)")
logger.info(
"Monitoring pod scaling for %ss (interval: %ss)", duration, interval
)
start_time = time.time()
monitoring_data = []
......@@ -138,123 +131,14 @@ class KubernetesMonitor:
counts = self.get_pod_counts()
if counts:
monitoring_data.append(counts)
logger.info(f"Pod counts: {counts}")
logger.info("Pod counts: %s", counts)
await asyncio.sleep(interval)
return monitoring_data
def wait_for_deployment_ready(self, timeout: int = 300) -> bool:
"""Wait for deployment to be ready."""
logger.info(f"Waiting for deployment {self.deployment_name} to be ready...")
cmd = [
"kubectl",
"wait",
"--for=condition=available",
f"deployment/{self.deployment_name}",
"-n",
self.namespace,
f"--timeout={timeout}s",
]
success, output = self._run_kubectl(cmd)
if success:
logger.info("Deployment is ready")
return True
else:
logger.error(f"Deployment failed to become ready: {output}")
return False
def apply_deployment(self, yaml_file: str) -> bool:
"""Apply Kubernetes deployment from YAML file."""
logger.info(f"Applying deployment from {yaml_file}")
cmd = ["kubectl", "apply", "-f", yaml_file, "-n", self.namespace]
success, output = self._run_kubectl(cmd)
if success:
logger.info("Deployment applied successfully")
return True
else:
logger.error(f"Failed to apply deployment: {output}")
return False
def delete_deployment(self, yaml_file: str) -> bool:
"""Delete Kubernetes deployment."""
logger.info(f"Deleting deployment from {yaml_file}")
cmd = [
"kubectl",
"delete",
"-f",
yaml_file,
"-n",
self.namespace,
"--ignore-not-found",
]
success, output = self._run_kubectl(cmd)
if success:
logger.info("Deployment deleted successfully")
else:
logger.warning(f"Failed to delete deployment: {output}")
return success
def check_service_health(
self, service_name: str | None = None, port: int = 8000
) -> bool:
"""Check if the frontend service is healthy."""
if service_name is None:
service_name = f"{self.deployment_name}-frontend"
# Port forward to check health
cmd = [
"kubectl",
"port-forward",
f"service/{service_name}",
f"{port}:{port}",
"-n",
self.namespace,
]
proc = None
try:
# Start port forwarding in background
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Give it a moment to establish connection
time.sleep(PORT_FORWARD_SETUP_DELAY)
# Try to check health endpoint
try:
response = urllib.request.urlopen(
f"http://localhost:{port}/health", timeout=HEALTH_CHECK_TIMEOUT
)
healthy = response.status == 200
logger.info(f"Service health check: {'OK' if healthy else 'FAILED'}")
except Exception as e:
logger.warning(f"Health check failed: {e}")
healthy = False
return healthy
except Exception as e:
logger.error(f"Failed to check service health: {e}")
return False
finally:
# Ensure port forwarding is terminated
if proc is not None:
proc.terminate()
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
class ScalingE2ETest:
"""End-to-end test for SLA planner scaling behavior."""
"""Manual end-to-end scaling validation for the SLA planner."""
def __init__(
self,
......@@ -267,88 +151,56 @@ class ScalingE2ETest:
self.base_url = base_url
self.save_results = save_results
self.mode = mode
self.k8s_monitor = KubernetesMonitor(namespace)
self.load_generator = LoadGenerator(
base_url=base_url, save_results=save_results
)
self.test_results: Dict[str, Any] = {}
async def run_scaling_test(self) -> Dict:
"""
Run the complete scaling test.
Hardcoded scenario:
- Phase 1 (8 req/s): Should maintain 1P1D
- Phase 2 (18 req/s): Should scale to 2P1D
"""
logger.info(f"Starting scaling integration test (mode={self.mode})")
async def run_scaling_test(self) -> Dict[str, Any]:
logger.info("Starting manual scaling integration test (mode=%s)", self.mode)
test_start_time = time.time()
# Record initial state
initial_counts = self.k8s_monitor.get_pod_counts()
logger.info(f"Test starting with: {initial_counts}")
logger.info("Test starting with: %s", initial_counts)
# Start background monitoring
# Calculate based on actual phases from load generator
if self.mode == "load":
# Load-based: baseline(120s) + transition(30s) + trigger(120s) + buffer
total_test_duration = 120 + 30 + 120 + BUFFER_DURATION
else:
# Throughput: baseline(90s) + transition(30s) + trigger(120s) + buffer
total_test_duration = 90 + 30 + 120 + BUFFER_DURATION
total_test_duration = (
120 + 30 + 120 + BUFFER_DURATION
if self.mode == "load"
else 90 + 30 + 120 + BUFFER_DURATION
)
monitoring_task = asyncio.create_task(
self.k8s_monitor.monitor_scaling(
total_test_duration, interval=MONITORING_INTERVAL
)
)
# Initialize results in case of exception
baseline_results = {}
trigger_results = {}
baseline_results: Dict[str, Any] = {}
trigger_results: Dict[str, Any] = {}
try:
# Use the load generator's built-in scaling test
logger.info(
f"Running scaling scenario (8 req/s -> 18 req/s, mode={self.mode})"
)
load_results = await self.load_generator.run_scaling_test(mode=self.mode)
# Extract load results for analysis (2-phase structure)
phase_results = load_results.get("phase_results", {})
baseline_results = phase_results.get("phase1_baseline", {})
trigger_results = phase_results.get("phase2_prefill_scaling_trigger", {})
# Check final pod counts
final_counts = self.k8s_monitor.get_pod_counts()
logger.info(f"Final pod counts: {final_counts}")
logger.info("Final pod counts: %s", final_counts)
# Wait a bit more to capture any delayed scaling
logger.info("Waiting for potential delayed scaling...")
await asyncio.sleep(FINAL_STABILIZATION_DELAY)
# Get final final counts
final_final_counts = self.k8s_monitor.get_pod_counts()
logger.info(f"Final final pod counts: {final_final_counts}")
except Exception as e:
logger.error(f"Test execution failed: {e}")
raise
logger.info("Final final pod counts: %s", final_final_counts)
finally:
# Stop monitoring
monitoring_task.cancel()
try:
await monitoring_task
except asyncio.CancelledError:
pass
# Compile results
test_results: Dict[str, Any] = {
return {
"test_duration": time.time() - test_start_time,
"config": {
# Document actual test configuration
"baseline_rps": 8.0,
"trigger_rps": 18.0,
"phase_durations": {"baseline": 90, "trigger": 120},
......@@ -365,16 +217,11 @@ class ScalingE2ETest:
"scaling_analysis": self.analyze_scaling_behavior(),
}
return test_results
def analyze_scaling_behavior(self) -> Dict:
"""Analyze the scaling behavior from pod history."""
def analyze_scaling_behavior(self) -> Dict[str, Any]:
if len(self.k8s_monitor.pod_history) < 2:
return {"error": "Insufficient data for analysis"}
history = self.k8s_monitor.pod_history
# Find scaling events
scaling_events = []
for i in range(1, len(history)):
prev = history[i - 1]
......@@ -396,10 +243,8 @@ class ScalingE2ETest:
}
)
# Check if expected scaling occurred
initial = history[0]
final = history[-1]
expected_scaling = {
"initial_1p1d": initial.prefill_pods == 1 and initial.decode_pods == 1,
"final_2p1d": final.prefill_pods == 2 and final.decode_pods == 1,
......@@ -420,57 +265,41 @@ class ScalingE2ETest:
"total_scaling_events": len(scaling_events),
}
def validate_test_results(self, results: Dict) -> Dict:
"""Validate that the test achieved expected scaling behavior."""
def validate_test_results(self, results: Dict[str, Any]) -> Dict[str, Any]:
validation: Dict[str, Any] = {"test_passed": False, "issues": [], "summary": ""}
# Check if we have the expected data
if not results.get("scaling_analysis"):
analysis = results.get("scaling_analysis")
if not analysis:
validation["issues"].append("No scaling analysis data")
return validation
analysis = results["scaling_analysis"]
expected = analysis.get("expected_scaling", {})
# Validate initial state
if not expected.get("initial_1p1d"):
validation["issues"].append("Test did not start with 1P1D configuration")
# Validate final state
if not expected.get("final_2p1d"):
validation["issues"].append(
"Test did not end with expected 2P1D configuration"
)
# Validate scaling occurred
if not expected.get("scaling_occurred"):
validation["issues"].append("No scaling events detected")
# Check if correct scaling occurred
if expected.get("correct_scaling"):
validation["test_passed"] = True
validation[
"summary"
] = "✅ Test PASSED: Successfully scaled from 1P1D to 2P1D"
validation["summary"] = "PASS: Successfully scaled from 1P1D to 2P1D"
else:
validation[
"summary"
] = "❌ Test FAILED: Did not achieve expected 1P1D -> 2P1D scaling"
] = "FAIL: Did not achieve expected 1P1D -> 2P1D scaling"
# Add performance validation across all phases
baseline = results.get("baseline_results", {})
trigger = results.get("trigger_results", {})
if baseline.get("throughput", 0) > 0:
validation["baseline_throughput"] = f"{baseline['throughput']:.2f} req/s"
if trigger.get("throughput", 0) > 0:
validation["trigger_throughput"] = f"{trigger['throughput']:.2f} req/s"
return validation
async def main():
"""Main function for running the e2e test."""
import argparse
parser = argparse.ArgumentParser(description="SLA Planner Scaling E2E Test")
......@@ -481,7 +310,10 @@ async def main():
parser.add_argument(
"--save-results",
action="store_true",
help="Save results to tests/planner/e2e_scaling_results instead of /tmp",
help=(
"Save results to components/src/dynamo/planner/tests/e2e_scaling_results "
"instead of /tmp"
),
)
parser.add_argument(
"--mode",
......@@ -491,7 +323,6 @@ async def main():
)
args = parser.parse_args()
test = ScalingE2ETest(
namespace=args.namespace,
base_url=args.base_url,
......@@ -500,57 +331,28 @@ async def main():
)
try:
# Check that service is accessible
logger.info(f"Checking service availability at {args.base_url}...")
# Run the scaling test
logger.info("Running scaling test...")
results = await test.run_scaling_test()
# Validate results
validation = test.validate_test_results(results)
# Save results
timestamp = int(time.time())
results_file = f"/tmp/scaling_test_results_{timestamp}.json"
with open(results_file, "w") as f:
json.dump({"results": results, "validation": validation}, f, indent=2)
with open(results_file, "w") as handle:
json.dump({"results": results, "validation": validation}, handle, indent=2)
# Print summary
logger.info("=" * 60)
logger.info("TEST SUMMARY")
logger.info("=" * 60)
logger.info(validation["summary"])
if validation["issues"]:
logger.info("\nIssues found:")
for issue in validation["issues"]:
logger.info(f" - {issue}")
if any(k.endswith("_throughput") for k in validation.keys()):
logger.info("\nPerformance:")
if "baseline_throughput" in validation:
logger.info(
f" Baseline (8 req/s): {validation['baseline_throughput']}"
)
if "moderate_throughput" in validation:
logger.info(
f" Moderate (15 req/s): {validation['moderate_throughput']}"
)
if "trigger_throughput" in validation:
logger.info(f" Trigger (25 req/s): {validation['trigger_throughput']}")
logger.info(f"\nDetailed results saved to: {results_file}")
for issue in validation["issues"]:
logger.info("Issue: %s", issue)
logger.info("Detailed results saved to: %s", results_file)
logger.info("=" * 60)
return 0 if validation["test_passed"] else 1
except Exception as e:
logger.error(f"Test failed with error: {e}")
return 1
except Exception:
logger.exception("Test failed unexpectedly")
raise
if __name__ == "__main__":
import sys
sys.exit(asyncio.run(main()))
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# This is a simple manual load-test helper for planner validation.
# To validate:
# 1. Run a 1P1D disaggregated deployment.
# 2. Start planner with the desired config.
# 3. Run ./load_test.sh <num_requests>.
# Expected behavior is scale up and then back down after the burst.
if [ $# -ne 1 ]; then
echo "Usage: $0 <number_of_executions>"
exit 1
fi
executions=$1
echo "Starting $executions non-blocking executions..."
for (( i=1; i<=$executions; i++ )); do
curl localhost:8000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
"messages": [
{
"role": "user",
"content": "Generate a long response to produce sustained planner load."
}
],
"stream": true,
"max_tokens": 500
}' > /dev/null 2>&1 &
done
echo "All $executions executions have been launched!"
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import logging
......
......@@ -353,7 +353,7 @@ async def main():
parser.add_argument(
"--save-results",
action="store_true",
help="Save results to tests/planner/e2e_scaling_results instead of /tmp",
help="Save results to components/src/dynamo/planner/tests/data instead of /tmp",
)
args = parser.parse_args()
......
......@@ -22,6 +22,13 @@ from kubernetes import client
from dynamo.planner.connectors.kubernetes_api import KubernetesAPI
from dynamo.planner.errors import DynamoGraphDeploymentNotFoundError
pytestmark = [
pytest.mark.gpu_0,
pytest.mark.pre_merge,
pytest.mark.unit,
pytest.mark.planner,
]
@pytest.fixture
def mock_config():
......@@ -322,28 +329,26 @@ async def test_wait_for_graph_deployment_ready_on_second_attempt(
)
@pytest.mark.asyncio
async def test_get_graph_deployment(k8s_api, mock_custom_api):
def test_get_graph_deployment(k8s_api, mock_custom_api):
"""Test get_graph_deployment"""
mock_deployment = {"metadata": {"name": "parent-dgd"}}
with patch.object(
k8s_api, "_get_graph_deployment_from_name", return_value=mock_deployment
) as mock_get:
result = await k8s_api.get_graph_deployment("parent-dgd")
result = k8s_api.get_graph_deployment("parent-dgd")
assert result == mock_deployment
mock_get.assert_called_once_with("parent-dgd")
@pytest.mark.asyncio
async def test_get_graph_deployment_not_found(k8s_api, mock_custom_api):
def test_get_graph_deployment_not_found(k8s_api, mock_custom_api):
"""Test get_graph_deployment when deployment is not found"""
k8s_api.custom_api.get_namespaced_custom_object.side_effect = client.ApiException(
status=404
)
with pytest.raises(DynamoGraphDeploymentNotFoundError) as exc_info:
await k8s_api.get_graph_deployment("parent-dgd")
k8s_api.get_graph_deployment("parent-dgd")
exception = exc_info.value
assert exception.deployment_name == "parent-dgd"
......
......@@ -34,6 +34,13 @@ from dynamo.planner.monitoring.dgd_services import (
get_service_from_sub_component_type_or_name,
)
pytestmark = [
pytest.mark.gpu_0,
pytest.mark.pre_merge,
pytest.mark.unit,
pytest.mark.planner,
]
@pytest.fixture
def mock_kube_api():
......@@ -64,8 +71,9 @@ def kubernetes_connector(mock_kube_api_class, monkeypatch):
def test_kubernetes_connector_no_env_var():
with pytest.raises(DeploymentValidationError) as exc_info:
KubernetesConnector("test-dynamo-namespace")
with patch("dynamo.planner.connectors.kubernetes.KubernetesAPI"):
with pytest.raises(DeploymentValidationError) as exc_info:
KubernetesConnector("test-dynamo-namespace")
exception = exc_info.value
assert set(exception.errors) == {
......
......@@ -256,6 +256,7 @@ def _build_load_config(**overrides) -> PlannerConfig:
profile_results_dir=os.path.join(
os.path.dirname(__file__),
"..",
"data",
"profiling_results",
"H200_TP1P_TP1D",
),
......
......@@ -14,7 +14,7 @@ import tempfile
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from utils.load_generator import LoadGenerator
from load_generator import LoadGenerator
pytestmark = [
pytest.mark.gpu_0,
......
......@@ -31,7 +31,6 @@ pytestmark = [
pytest.mark.pre_merge,
pytest.mark.unit,
pytest.mark.planner,
pytest.mark.vllm,
]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment