Unverified Commit 353ba5db authored by hhzhang16's avatar hhzhang16 Committed by GitHub
Browse files

test: add tests for replica calculation and planner scaling (#2525)


Signed-off-by: default avatarHannah Zhang <hannahz@nvidia.com>
parent 781fa062
...@@ -115,4 +115,4 @@ kubectl apply -f disagg_planner.yaml -n {$NAMESPACE} ...@@ -115,4 +115,4 @@ kubectl apply -f disagg_planner.yaml -n {$NAMESPACE}
``` ```
> [!NOTE] > [!NOTE]
> The SLA planner requires a frontend that reports metrics at `/metrics` HTTP endpoint with number of requests, ISL, OSL, TTFT, ITL in the correct format. The dynamo frontend provides these metrics automatically. > The SLA planner requires a frontend that reports metrics at the `/metrics` HTTP endpoint with the number of requests, ISL, OSL, TTFT, and ITL in the correct format. The dynamo frontend provides these metrics automatically.
\ No newline at end of file
# E2E test results - don't commit test artifacts to git
e2e_scaling_results/
# Temporary files
*.tmp
*.log
# Python cache
__pycache__/
*.pyc
*.pyo
\ No newline at end of file
...@@ -86,6 +86,7 @@ python benchmarks/sin_load_generator/sin_synth.py \ ...@@ -86,6 +86,7 @@ python benchmarks/sin_load_generator/sin_synth.py \
The dataset starts at 12 requests/s, increases to 36 requests/s at t=300s, decreases back to 12 requests/s at t=600s, and repeats. The dataset starts at 12 requests/s, increases to 36 requests/s at t=300s, decreases back to 12 requests/s at t=600s, and repeats.
The total duration is 30 minutes or 1800 seconds. The total duration is 30 minutes or 1800 seconds.
## Planner Dry Run ## Planner Dry Run
Before testing SLA planner on real deployments, we provide a dry run feature to test the autoscaling behavior on a given dataset. Specifically, in dry run mode, Before testing SLA planner on real deployments, we provide a dry run feature to test the autoscaling behavior on a given dataset. Specifically, in dry run mode,
...@@ -129,3 +130,75 @@ The second plot shows the actual ISL/OSL and the predicted ISL/OSL. The first tw ...@@ -129,3 +130,75 @@ The second plot shows the actual ISL/OSL and the predicted ISL/OSL. The first tw
The third plot shows the actual prefill throughput, number of prefill workers that planner scales, and the safe throughput limit with the number of prefill workers. If the actual throughput is below the safe throughput limit, the deployment has the capacity to adhere the TTFT SLA. Note that in the real deployment, due to other factors such as queueing, load balancing, KV cache transfer latency, and ISL variance, it is not guaranteed that the actual deployment can adhere the TTFT SLA. The third plot shows the actual prefill throughput, number of prefill workers that planner scales, and the safe throughput limit with the number of prefill workers. If the actual throughput is below the safe throughput limit, the deployment has the capacity to adhere the TTFT SLA. Note that in the real deployment, due to other factors such as queueing, load balancing, KV cache transfer latency, and ISL variance, it is not guaranteed that the actual deployment can adhere the TTFT SLA.
The fourth plot, similar to the third plot, shows the actual decode throughput, number of decode workers that planner scales, and the safe throughput limit with the number of decode workers. If the actual throughput is below the safe throughput limit, the deployment has the capacity to adhere the ITL SLA. Note that in the real deployment, due to other factors such as load balancing and OSL variance, it is not guaranteed that the actual deployment can adhere the ITL SLA. The fourth plot, similar to the third plot, shows the actual decode throughput, number of decode workers that planner scales, and the safe throughput limit with the number of decode workers. If the actual throughput is below the safe throughput limit, the deployment has the capacity to adhere the ITL SLA. Note that in the real deployment, due to other factors such as load balancing and OSL variance, it is not guaranteed that the actual deployment can adhere the ITL SLA.
## Scaling Tests
This directory contains comprehensive tests for validating the SLA planner's scaling behavior. The tests validate both the replica calculation logic and end-to-end scaling behavior. The scaling test uses a graduated load approach rather than dataset files, as it proved more reliable for metric generation and scaling triggers.
### Test Types
1. **Unit Tests** (`test_replica_calculation.py`) - Test the mathematical formulas for calculating prefill and decode replicas in isolation
2. **End-to-End Tests** (`scaling/run_scaling_test.sh`) - Test complete workflow including Kubernetes deployment, load generation, and pod scaling validation
### Quick Start
#### Run Unit Tests Only
Test the replica calculation logic without requiring Kubernetes:
```bash
# Set PYTHONPATH to include planner components
PYTHONPATH=components/planner/src python -m pytest tests/planner/test_replica_calculation.py -v
# Or from the tests/planner directory:
cd tests/planner
PYTHONPATH=../../components/planner/src python -m pytest test_replica_calculation.py -v
```
**Note**: The unit tests automatically mock external dependencies (prometheus_client, runtime modules) to ensure they can run in isolation without requiring the full Dynamo environment.
#### Run Full End-to-End Test
Test complete scaling behavior including Kubernetes deployment and load generation:
```bash
./scaling/run_scaling_test.sh
```
With custom namespace:
```bash
./scaling/run_scaling_test.sh --namespace production
```
To save results to `tests/planner/e2e_scaling_results` instead of `/tmp`:
```bash
./scaling/run_scaling_test.sh --save-results
```
**E2E Test Deployment Management:**
- If no deployment exists: creates, tests, and cleans up deployment
- If deployment exists: uses existing deployment and preserves it
- Perfect for development workflows where you want to keep deployments running between tests
**Test Scenario**
The main test scenario validates prefill scaling for H200 with 1P1D → 2P1D configuration:
- **Phase 1**: 8 req/s for 90s (baseline - maintains 1P1D)
- **Phase 2**: 15 req/s for 120s (moderate load - maintains 1P1D)
- **Phase 3**: 25 req/s for 180s (scaling trigger - scales to 2P1D)
- **ISL/OSL**: 4000/150 tokens (optimized for prefill bottleneck)
- **Transition delay**: 30s between phases
- **Total test duration**: ~7 minutes + scaling observation
- **Smart cleanup**: Only removes deployment if test created it (preserves existing deployments)
### Prerequisites
**For Unit Tests:**
- Python dependencies installed
- PYTHONPATH set to include `components/planner/src` (see unit test examples above)
**For E2E Tests:**
- Kubernetes cluster with GPU nodes
- kubectl configured and accessible
- genai-perf available in PATH
- Python dependencies installed
- PYTHONPATH properly configured for planner imports
\ No newline at end of file
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Local conftest.py for planner tests to disable automatic test logging.
This overrides the autouse logger fixture from the parent conftest.py.
"""
import pytest
@pytest.fixture(autouse=True)
def logger(request):
"""Dummy logger fixture that does nothing - overrides the parent one."""
yield
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
metadata:
name: vllm-disagg-planner
annotations:
nvidia.com/enable-grove: "false" # temporarily disable grove because current k8s connector does not work with grove
spec:
envs:
- name: DYNAMO_SERVICE_CONFIG
value: '{"Prometheus":{"global":{"scrape_interval":"5s"},"scrape_configs":[{"job_name":"prometheus","static_configs":[{"targets":["localhost:9090"]}]},{"job_name":"frontend","static_configs":[{"targets":["vllm-disagg-planner-frontend:8000"]}]}]}}'
- name: DYNAMO_NAMESPACE
value: "vllm-disagg-planner"
- name: PROMETHEUS_PORT
value: "8000"
services:
Frontend:
dynamoNamespace: vllm-disagg-planner
componentType: frontend
replicas: 1
extraPodSpec:
mainContainer:
image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:dep-301.6
args:
- "python3 -m dynamo.frontend --http-port 8000"
Planner:
dynamoNamespace: vllm-disagg-planner
envFromSecret: hf-token-secret
componentType: planner
replicas: 1
livenessProbe:
httpGet:
path: /metrics
port: 9085
periodSeconds: 60
timeoutSeconds: 30
failureThreshold: 10
readinessProbe:
httpGet:
path: /metrics
port: 9085
initialDelaySeconds: 60
periodSeconds: 60
timeoutSeconds: 30
failureThreshold: 10
extraPodSpec:
mainContainer:
image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:dep-301.6
workingDir: /workspace/components/planner/src/dynamo/planner
ports:
- name: metrics
containerPort: 9085
command:
- /bin/sh
- -c
args:
- >-
python3 -m planner_sla
--environment=kubernetes
--backend=vllm
--adjustment-interval=60
--profile-results-dir=/workspace/tests/planner/profiling_results/H200_TP1P_TP1D
--prometheus-port=9085
--ttft=0.1
--itl=0.01
--load-predictor=constant
Prometheus: # NOTE: this is set on Prometheus to ensure a service is created for the Prometheus component. This is a workaround and should be managed differently.
dynamoNamespace: vllm-disagg-planner
componentType: prometheus
replicas: 1
envs:
- name: PYTHONPATH
value: "/workspace/components/planner/src"
livenessProbe:
httpGet:
path: /
port: 9090
periodSeconds: 60
timeoutSeconds: 30
failureThreshold: 10
readinessProbe:
httpGet:
path: /
port: 9090
initialDelaySeconds: 30
periodSeconds: 60
timeoutSeconds: 30
failureThreshold: 10
extraPodSpec:
mainContainer:
image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:dep-301.6
workingDir: /workspace/components/backends/vllm
command:
- /bin/sh
- -c
args:
- "python3 -m dynamo.planner.prometheus"
VllmDecodeWorker:
dynamoNamespace: vllm-disagg-planner
envFromSecret: hf-token-secret
componentType: worker
replicas: 1
resources:
limits:
gpu: "1"
extraPodSpec:
mainContainer:
startupProbe:
httpGet:
path: /health
port: 9090
periodSeconds: 30
failureThreshold: 60
image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:dep-301.6
workingDir: /workspace/components/backends/vllm
command:
- /bin/sh
- -c
args:
- "python3 -m dynamo.vllm --model nvidia/Llama-3.1-8B-Instruct-FP8 --migration-limit=3 --max-model-len=8192"
VllmPrefillWorker:
dynamoNamespace: vllm-disagg-planner
envFromSecret: hf-token-secret
componentType: worker
replicas: 1
resources:
limits:
gpu: "1"
extraPodSpec:
mainContainer:
startupProbe:
httpGet:
path: /health
port: 9090
periodSeconds: 30
failureThreshold: 60
image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:dep-301.6
workingDir: /workspace/components/backends/vllm
command:
- /bin/sh
- -c
args:
- python3 -m dynamo.vllm --model nvidia/Llama-3.1-8B-Instruct-FP8 --is-prefill-worker --migration-limit=3 --max-model-len=8192
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# Run SLA planner scaling end-to-end test
# This script:
# 1. Deploys the disaggregated planner if not already running
# 2. Sets up port forwarding to localhost:8000
# 3. Waits for the deployment to be ready
# 4. Runs the hardcoded scaling test (12 req/s -> 24 req/s)
# 5. Cleans up
set -e
# Configuration
NAMESPACE=${NAMESPACE:-default}
YAML_FILE="disagg_planner.yaml"
FRONTEND_PORT=8000
LOCAL_PORT=8000
DEPLOYMENT_NAME="vllm-disagg-planner"
SAVE_RESULTS=false
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
log_info() {
echo -e "${BLUE}[INFO]${NC} $1"
}
log_success() {
echo -e "${GREEN}[SUCCESS]${NC} $1"
}
log_warning() {
echo -e "${YELLOW}[WARNING]${NC} $1"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# Check prerequisites
check_prerequisites() {
log_info "Checking prerequisites..."
if ! command -v kubectl &> /dev/null; then
log_error "kubectl not found. Please install kubectl."
exit 1
fi
if ! command -v python &> /dev/null && ! command -v python3 &> /dev/null; then
log_error "Python not found. Please install Python."
exit 1
fi
if ! kubectl cluster-info &> /dev/null; then
log_error "Cannot connect to Kubernetes cluster."
exit 1
fi
if [ ! -f "test_scaling_e2e.py" ]; then
log_error "test_scaling_e2e.py not found. Make sure you're in the tests/planner directory."
exit 1
fi
# Check for genai-perf
if ! command -v genai-perf &> /dev/null; then
log_warning "genai-perf not found. This tool is required for load generation."
echo -n "Would you like us to install it for you? (y/n): "
read -r response
if [[ "$response" =~ ^[Yy]$ ]]; then
log_info "Installing genai-perf and perf_analyzer..."
# Install specific versions for reproducibility and security
if pip install 'nvidia-ml-py3>=12.0.0' 'genai-perf>=0.0.4' 'tritonclient[all]>=2.48.0'; then
log_success "genai-perf and perf_analyzer installed successfully"
else
log_error "Failed to install genai-perf. Please install it manually: pip install 'nvidia-ml-py3>=12.0.0' 'genai-perf>=0.0.4' 'tritonclient[all]>=2.48.0'"
exit 1
fi
else
log_error "genai-perf is required for the scaling test. Please install it: pip install 'nvidia-ml-py3>=12.0.0' 'genai-perf>=0.0.4' 'tritonclient[all]>=2.48.0'"
exit 1
fi
fi
log_success "Prerequisites check passed"
}
# Check if deployment already exists and is running
check_existing_deployment() {
log_info "Checking for existing deployment..."
# Check for the DynamoGraphDeployment custom resource
if kubectl get dynamographdeployment "$DEPLOYMENT_NAME" -n "$NAMESPACE" &> /dev/null; then
log_info "DynamoGraphDeployment $DEPLOYMENT_NAME already exists - skipping redeployment"
# Check if the DynamoGraphDeployment is ready
local status
status=$(kubectl get dynamographdeployment "$DEPLOYMENT_NAME" -n "$NAMESPACE" -o jsonpath='{.status.state}')
if [ "$status" = "successful" ]; then
# Check if frontend pod is running
if kubectl get pods -n "$NAMESPACE" -l "nvidia.com/dynamo-component-type=frontend,nvidia.com/dynamo-namespace=vllm-disagg-planner" --field-selector=status.phase=Running | grep -q .; then
log_success "Existing deployment is ready"
return 0
else
log_warning "Existing deployment pods are not ready, will redeploy"
return 1
fi
else
log_warning "Existing deployment is not ready (status: $status), will redeploy"
return 1
fi
else
log_info "No existing deployment found"
return 1
fi
}
# Deploy the planner
deploy_planner() {
log_info "Deploying SLA planner..."
if [ ! -f "$YAML_FILE" ]; then
log_error "Deployment file $YAML_FILE not found"
exit 1
fi
# Apply the deployment
if kubectl apply -f "$YAML_FILE" -n "$NAMESPACE"; then
log_success "Deployment applied successfully"
else
log_error "Failed to apply deployment"
exit 1
fi
log_info "Waiting for DynamoGraphDeployment to be processed..."
if kubectl wait --for=condition=Ready dynamographdeployment/"$DEPLOYMENT_NAME" -n "$NAMESPACE" --timeout=600s; then
log_success "DynamoGraphDeployment is ready"
else
log_error "DynamoGraphDeployment failed to become ready within timeout"
exit 1
fi
log_info "Waiting for pods to be running (this may take several minutes for image pulls)..."
log_info "Waiting for frontend pod..."
if kubectl wait --for=condition=Ready pod -l "nvidia.com/dynamo-component-type=frontend,nvidia.com/dynamo-namespace=vllm-disagg-planner" -n "$NAMESPACE" --timeout=900s; then
log_success "Frontend pod is ready"
else
log_error "Frontend pod failed to become ready within timeout"
exit 1
fi
log_info "Waiting for all pods to be running..."
sleep 30
}
setup_port_forward() {
log_info "Setting up port forwarding..."
# Kill any existing port forward on the same port
if lsof -ti:$LOCAL_PORT &> /dev/null; then
log_warning "Port $LOCAL_PORT is already in use, attempting to free it..."
kill "$(lsof -ti:$LOCAL_PORT)" 2>/dev/null || true
sleep 2
fi
local frontend_service="vllm-disagg-planner-frontend"
if ! kubectl get service "$frontend_service" -n "$NAMESPACE" &> /dev/null; then
log_error "Frontend service '$frontend_service' not found"
return 1
fi
log_info "Port forwarding to service: $frontend_service"
kubectl port-forward service/"$frontend_service" "$LOCAL_PORT:$FRONTEND_PORT" -n "$NAMESPACE" >/dev/null 2>&1 &
PORT_FORWARD_PID=$!
log_info "Waiting for port forwarding to be established..."
for i in {1..30}; do
if curl -s http://localhost:$LOCAL_PORT/health &> /dev/null; then
log_success "Port forwarding established and service is healthy"
return 0
fi
sleep 2
done
log_error "Failed to establish port forwarding or service is not healthy"
return 1
}
cleanup_port_forward() {
if [ ! -z "$PORT_FORWARD_PID" ]; then
log_info "Cleaning up port forwarding..."
kill $PORT_FORWARD_PID 2>/dev/null || true
wait $PORT_FORWARD_PID 2>/dev/null || true
fi
}
cleanup_deployment() {
log_info "Cleaning up deployment..."
kubectl delete -f "$YAML_FILE" -n "$NAMESPACE" --ignore-not-found
log_info "Waiting for cleanup to complete..."
kubectl wait --for=delete dynamographdeployment/"$DEPLOYMENT_NAME" -n "$NAMESPACE" --timeout=120s || true
log_info "Cleanup complete"
}
run_test() {
log_info "Running scaling test (graduated 8->15->25 req/s)..."
local python_cmd="python3"
if ! command -v python3 &> /dev/null; then
python_cmd="python"
fi
local test_args="--namespace $NAMESPACE"
if [ "$SAVE_RESULTS" = true ]; then
test_args="$test_args --save-results"
log_info "Results will be saved to tests/planner/e2e_scaling_results"
fi
if $python_cmd test_scaling_e2e.py $test_args; then
log_success "Scaling test PASSED"
return 0
else
log_error "Scaling test FAILED"
return 1
fi
}
main() {
while [[ $# -gt 0 ]]; do
case $1 in
--namespace)
NAMESPACE="$2"
shift 2
;;
--save-results)
SAVE_RESULTS=true
shift
;;
--help)
echo "Usage: $0 [--namespace NS] [--save-results]"
echo ""
echo "Run SLA planner scaling test (graduated 8->15->25 req/s prefill scaling)"
echo ""
echo "Options:"
echo " --namespace NS Kubernetes namespace (default: default)"
echo " --save-results Save results to tests/planner/e2e_scaling_results instead of /tmp"
echo " --help Show this help"
exit 0
;;
*)
log_error "Unknown option: $1"
echo "Use --help for usage information"
exit 1
;;
esac
done
log_info "SLA Planner Scaling Test"
log_info "Namespace: $NAMESPACE"
log_info "Scenario: Graduated 8->15->25 req/s (1P1D -> 2P1D prefill scaling, ISL=4000/OSL=150)"
check_prerequisites
trap cleanup_port_forward EXIT
# Check if we need to deploy
local deployed_by_us=false
if ! check_existing_deployment; then
deploy_planner
deployed_by_us=true
fi
if ! setup_port_forward; then
log_error "Failed to setup port forwarding"
exit 1
fi
local test_result=0
if ! run_test; then
test_result=1
fi
# Only cleanup deployment if we deployed it
if [ "$deployed_by_us" = true ]; then
cleanup_deployment
fi
if [ $test_result -eq 0 ]; then
log_success "Test completed successfully!"
else
log_error "Test failed!"
fi
exit $test_result
}
main "$@"
\ No newline at end of file
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Unit tests for SLA planner replica calculation logic.
These tests focus specifically on the replica calculation formulas without
testing load prediction, interpolation, or correction factors.
"""
import argparse
import math
import os
# Mock dependencies before importing planner modules
import sys
# We'll import the actual Planner class to test its calculation logic
from unittest.mock import MagicMock, Mock, patch
import pytest
# Create mock modules for dependencies that might not be available in test environment
mock_prometheus = MagicMock()
mock_prometheus.Gauge = MagicMock()
mock_prometheus.start_http_server = MagicMock()
mock_runtime = MagicMock()
mock_runtime.logging = MagicMock()
mock_runtime.logging.configure_dynamo_logging = MagicMock()
# Patch them into sys.modules before importing
sys.modules["prometheus_client"] = mock_prometheus
sys.modules["dynamo.runtime"] = mock_runtime
sys.modules["dynamo.runtime.logging"] = mock_runtime.logging
# Now import after mocking
from dynamo.planner.utils.planner_core import Metrics, Planner # noqa: E402
@pytest.fixture
def planner():
"""Set up test environment with mocked dependencies."""
# Create mock arguments
args = argparse.Namespace()
args.adjustment_interval = 60
args.prefill_engine_num_gpu = 1
args.decode_engine_num_gpu = 1
args.min_endpoint = 1
args.max_gpu_budget = 10
args.ttft = 80 # ms
args.itl = 10 # ms
args.backend = "vllm"
args.no_operation = True # Don't actually scale
args.prometheus_port = 0 # 0 means disabled
args.load_predictor = "constant"
args.load_prediction_window_size = 10
args.profile_results_dir = os.path.join(
os.path.dirname(__file__),
"profiling_results/H200_TP1P_TP1D",
)
args.environment = "kubernetes"
# Mock the runtime
mock_runtime = Mock()
# Patch Prometheus Gauge to avoid registry conflicts
with patch("dynamo.planner.utils.planner_core.Gauge") as mock_gauge:
mock_gauge.return_value = Mock()
# Create planner instance
planner = Planner(mock_runtime, args)
# Mock the interpolators to return fixed values for testing
planner.prefill_interpolator = Mock()
planner.decode_interpolator = Mock()
# Mock the predictors to return fixed values
planner.num_req_predictor = Mock()
planner.isl_predictor = Mock()
planner.osl_predictor = Mock()
# Mock the connector since we're not testing actual scaling
planner.connector = Mock()
# Mock prometheus client
planner.prometheus_api_client = Mock()
# Set up some baseline correction factors
planner.p_correction_factor = 1.0
planner.d_correction_factor = 1.0
# Store args for easy access in tests
planner.args = args
yield planner
# Cleanup is automatic with context manager
class TestReplicaCalculation:
"""Test replica calculation formulas in isolation."""
def test_prefill_replica_calculation_basic(self, planner):
"""Test basic prefill replica calculation."""
# Setup test data
next_num_req = 10
next_isl = 3000
prefill_thpt_per_gpu = 40000 # tokens/s/gpu (from the test data)
# Mock the predictor outputs
planner.num_req_predictor.predict_next.return_value = next_num_req
planner.isl_predictor.predict_next.return_value = next_isl
planner.osl_predictor.predict_next.return_value = 150
# Mock interpolator output
planner.prefill_interpolator.interpolate_thpt_per_gpu.return_value = (
prefill_thpt_per_gpu
)
planner.decode_interpolator.find_best_throughput_per_gpu.return_value = (
10000,
0.01,
0.5,
)
# Calculate expected result manually
pred_prefill_load_per_gpu = (
next_num_req
* next_isl
/ planner.args.adjustment_interval
* min(1, planner.p_correction_factor)
)
expected_prefill_replicas = math.ceil(
pred_prefill_load_per_gpu
/ prefill_thpt_per_gpu
/ planner.args.prefill_engine_num_gpu
)
# Set up valid metrics to trigger calculation
planner.last_metrics = Metrics(
num_req=10, isl=3000, osl=150, ttft=80.0, itl=10.0, request_duration=100.0
)
# Mock workers info
async def mock_get_workers_info():
return (["prefill1"], ["decode1"])
planner.get_workers_info = mock_get_workers_info
# Mock interpolation calls for correction factor calculation
planner.prefill_interpolator.interpolate_ttft.return_value = 80.0
planner.decode_interpolator.interpolate_itl.return_value = 10.0
# Run the calculation
import asyncio
asyncio.run(planner.make_adjustments())
# Extract the calculated values from the log calls or by checking the mock calls
# Since we mocked the connector, we can check what replicas were requested
if planner.connector.set_component_replicas.called:
call_args = planner.connector.set_component_replicas.call_args[0][0]
prefill_component = "VllmPrefillWorker"
calculated_prefill_replicas = call_args.get(prefill_component, 1)
print(f"Expected prefill replicas: {expected_prefill_replicas}")
print(f"Calculated prefill replicas: {calculated_prefill_replicas}")
# Allow for small differences due to min_endpoint constraints
assert (
max(expected_prefill_replicas, planner.args.min_endpoint)
== calculated_prefill_replicas
)
def test_decode_replica_calculation_basic(self, planner):
"""Test basic decode replica calculation."""
# Setup test data
next_num_req = 10
next_osl = 150
decode_thpt_per_gpu = 10000 # tokens/s/gpu
# Mock the predictor outputs
planner.num_req_predictor.predict_next.return_value = next_num_req
planner.isl_predictor.predict_next.return_value = 3000
planner.osl_predictor.predict_next.return_value = next_osl
# Mock interpolator outputs
planner.prefill_interpolator.interpolate_thpt_per_gpu.return_value = 40000
planner.decode_interpolator.find_best_throughput_per_gpu.return_value = (
decode_thpt_per_gpu,
0.01,
0.5,
)
# Calculate expected result manually
expected_decode_replicas = math.ceil(
next_num_req
* next_osl
/ planner.args.adjustment_interval
/ decode_thpt_per_gpu
/ planner.args.decode_engine_num_gpu
)
# Set up valid metrics
planner.last_metrics = Metrics(
num_req=10, isl=3000, osl=150, ttft=80.0, itl=10.0, request_duration=100.0
)
# Mock workers info
async def mock_get_workers_info():
return (["prefill1"], ["decode1"])
planner.get_workers_info = mock_get_workers_info
# Mock interpolation calls for correction factor calculation
planner.prefill_interpolator.interpolate_ttft.return_value = 80.0
planner.decode_interpolator.interpolate_itl.return_value = 10.0
# Run the calculation
import asyncio
asyncio.run(planner.make_adjustments())
# Check the results
if planner.connector.set_component_replicas.called:
call_args = planner.connector.set_component_replicas.call_args[0][0]
decode_component = "VllmDecodeWorker"
calculated_decode_replicas = call_args.get(decode_component, 1)
print(f"Expected decode replicas: {expected_decode_replicas}")
print(f"Calculated decode replicas: {calculated_decode_replicas}")
# Allow for small differences due to min_endpoint constraints
assert (
max(expected_decode_replicas, planner.args.min_endpoint)
== calculated_decode_replicas
)
@pytest.mark.parametrize(
"num_req,decode_thpt,expected_p,expected_d",
[
(10, 10000, 1, 1), # low_load_10_req_per_second
(500, 1000, 1, 2), # high_load_500_req_per_second (lower decode throughput)
],
)
def test_scaling_scenario_low_to_high_load(
self, planner, num_req, decode_thpt, expected_p, expected_d
):
"""Test scaling from low to high load scenarios."""
# Reset the planner state
planner.p_correction_factor = 1.0
planner.d_correction_factor = 1.0
# Mock predictor outputs for this case
planner.num_req_predictor.predict_next.return_value = num_req
planner.isl_predictor.predict_next.return_value = 3000
planner.osl_predictor.predict_next.return_value = 150
# Mock interpolator outputs (based on H200 1P1D profiling data)
planner.prefill_interpolator.interpolate_thpt_per_gpu.return_value = (
40000 # tokens/s/gpu
)
planner.decode_interpolator.find_best_throughput_per_gpu.return_value = (
decode_thpt,
0.01,
0.5,
)
# Set up metrics
planner.last_metrics = Metrics(
num_req=num_req,
isl=3000,
osl=150,
ttft=80.0,
itl=10.0,
request_duration=100.0,
)
# Mock workers info
async def mock_get_workers_info():
return (["prefill1"], ["decode1"])
planner.get_workers_info = mock_get_workers_info
# Mock interpolation calls for correction factor calculation
planner.prefill_interpolator.interpolate_ttft.return_value = 80.0
planner.decode_interpolator.interpolate_itl.return_value = 10.0
# Reset the mock
planner.connector.reset_mock()
# Run calculation
import asyncio
asyncio.run(planner.make_adjustments())
# Verify results
if planner.connector.set_component_replicas.called:
call_args = planner.connector.set_component_replicas.call_args[0][0]
prefill_replicas = call_args.get("VllmPrefillWorker", 1)
decode_replicas = call_args.get("VllmDecodeWorker", 1)
print(f"Load {num_req} req/s: P={prefill_replicas}, D={decode_replicas}")
assert (
prefill_replicas == expected_p
), f"Prefill replicas mismatch: expected {expected_p}, got {prefill_replicas}"
assert (
decode_replicas == expected_d
), f"Decode replicas mismatch: expected {expected_d}, got {decode_replicas}"
def test_gpu_budget_constraint(self, planner):
"""Test that GPU budget constraints are properly applied."""
# Set a low GPU budget
planner.args.max_gpu_budget = 3
# Mock predictor outputs that would normally require more GPUs
planner.num_req_predictor.predict_next.return_value = 50 # High load
planner.isl_predictor.predict_next.return_value = 3000
planner.osl_predictor.predict_next.return_value = 150
# Mock interpolator outputs
planner.prefill_interpolator.interpolate_thpt_per_gpu.return_value = 40000
planner.decode_interpolator.find_best_throughput_per_gpu.return_value = (
10000,
0.01,
0.5,
)
# Set up metrics
planner.last_metrics = Metrics(
num_req=50, isl=3000, osl=150, ttft=80.0, itl=10.0, request_duration=100.0
)
# Mock workers info
async def mock_get_workers_info():
return (["prefill1"], ["decode1"])
planner.get_workers_info = mock_get_workers_info
# Mock interpolation calls
planner.prefill_interpolator.interpolate_ttft.return_value = 80.0
planner.decode_interpolator.interpolate_itl.return_value = 10.0
# Run calculation
import asyncio
asyncio.run(planner.make_adjustments())
# Verify that total GPU usage doesn't exceed budget
if planner.connector.set_component_replicas.called:
call_args = planner.connector.set_component_replicas.call_args[0][0]
prefill_replicas = call_args.get("VllmPrefillWorker", 1)
decode_replicas = call_args.get("VllmDecodeWorker", 1)
total_gpus = (
prefill_replicas * planner.args.prefill_engine_num_gpu
+ decode_replicas * planner.args.decode_engine_num_gpu
)
print(
f"GPU budget test: P={prefill_replicas}, D={decode_replicas}, Total GPUs={total_gpus}"
)
assert (
total_gpus <= planner.args.max_gpu_budget
), "Total GPU usage exceeds budget"
def test_min_endpoint_constraint(self, planner):
"""Test that minimum endpoint constraints are respected."""
planner.args.min_endpoint = 2
# Mock predictor outputs that would normally require fewer workers
planner.num_req_predictor.predict_next.return_value = 1 # Very low load
planner.isl_predictor.predict_next.return_value = 100
planner.osl_predictor.predict_next.return_value = 10
# Mock interpolator outputs
planner.prefill_interpolator.interpolate_thpt_per_gpu.return_value = 40000
planner.decode_interpolator.find_best_throughput_per_gpu.return_value = (
10000,
0.01,
0.5,
)
# Set up metrics
planner.last_metrics = Metrics(
num_req=1, isl=100, osl=10, ttft=80.0, itl=10.0, request_duration=100.0
)
# Mock workers info
async def mock_get_workers_info():
return (["prefill1"], ["decode1"])
planner.get_workers_info = mock_get_workers_info
# Mock interpolation calls
planner.prefill_interpolator.interpolate_ttft.return_value = 80.0
planner.decode_interpolator.interpolate_itl.return_value = 10.0
# Run calculation
import asyncio
asyncio.run(planner.make_adjustments())
# Verify minimum constraints are respected
if planner.connector.set_component_replicas.called:
call_args = planner.connector.set_component_replicas.call_args[0][0]
prefill_replicas = call_args.get("VllmPrefillWorker", 1)
decode_replicas = call_args.get("VllmDecodeWorker", 1)
print(f"Min endpoint test: P={prefill_replicas}, D={decode_replicas}")
assert (
prefill_replicas >= planner.args.min_endpoint
), "Prefill replicas below minimum"
assert (
decode_replicas >= planner.args.min_endpoint
), "Decode replicas below minimum"
def test_prefill_correction_factor_clamping(self, planner):
"""Test that prefill correction factor > 1 is clamped to 1."""
# Set a high correction factor > 1
planner.p_correction_factor = 2.5
planner.d_correction_factor = 1.0
# Mock predictor outputs
planner.num_req_predictor.predict_next.return_value = 10
planner.isl_predictor.predict_next.return_value = 3000
planner.osl_predictor.predict_next.return_value = 150
# Mock interpolator outputs
planner.prefill_interpolator.interpolate_thpt_per_gpu.return_value = 40000
planner.decode_interpolator.find_best_throughput_per_gpu.return_value = (
10000,
0.01,
0.5,
)
# Set up metrics
planner.last_metrics = Metrics(
num_req=10, isl=3000, osl=150, ttft=80.0, itl=10.0, request_duration=100.0
)
# Mock workers info
async def mock_get_workers_info():
return (["prefill1"], ["decode1"])
planner.get_workers_info = mock_get_workers_info
# Mock interpolation calls
planner.prefill_interpolator.interpolate_ttft.return_value = 80.0
planner.decode_interpolator.interpolate_itl.return_value = 10.0
# Calculate expected result manually with clamping
# Should use min(1, 2.5) = 1
pred_prefill_load_per_gpu = (
10 * 3000 / planner.args.adjustment_interval * min(1, 2.5) # Should be * 1
)
expected_prefill_replicas = math.ceil(
pred_prefill_load_per_gpu / 40000 / planner.args.prefill_engine_num_gpu
)
# Run calculation
import asyncio
asyncio.run(planner.make_adjustments())
# Verify that correction factor was effectively clamped
if planner.connector.set_component_replicas.called:
call_args = planner.connector.set_component_replicas.call_args[0][0]
prefill_replicas = call_args.get("VllmPrefillWorker", 1)
print(
f"Correction factor clamping test: Expected={expected_prefill_replicas}, Got={prefill_replicas}"
)
assert prefill_replicas == max(
expected_prefill_replicas, planner.args.min_endpoint
), "Prefill correction factor should be clamped to 1"
def test_decode_correction_factor_zero_handling(self, planner):
"""Test handling of d_correction_factor <= 0."""
# Test both 0 and negative values
for correction_factor in [0.0, -1.0]:
with patch.object(planner, "connector") as mock_connector:
planner.p_correction_factor = 1.0
planner.d_correction_factor = correction_factor
# Mock predictor outputs
planner.num_req_predictor.predict_next.return_value = 10
planner.isl_predictor.predict_next.return_value = 3000
planner.osl_predictor.predict_next.return_value = 150
# Mock interpolator outputs
planner.prefill_interpolator.interpolate_thpt_per_gpu.return_value = (
40000
)
planner.decode_interpolator.find_best_throughput_per_gpu.return_value = (
10000,
0.01,
0.5,
)
# Set up metrics
planner.last_metrics = Metrics(
num_req=10,
isl=3000,
osl=150,
ttft=80.0,
itl=10.0,
request_duration=100.0,
)
# Mock workers info
async def mock_get_workers_info():
return (["prefill1"], ["decode1"])
planner.get_workers_info = mock_get_workers_info
# Mock interpolation calls
planner.prefill_interpolator.interpolate_ttft.return_value = 80.0
planner.decode_interpolator.interpolate_itl.return_value = 10.0
# Run calculation
import asyncio
asyncio.run(planner.make_adjustments())
# Should handle gracefully without crashing
# The code should use args.itl directly instead of dividing by 0
if mock_connector.set_component_replicas.called:
call_args = mock_connector.set_component_replicas.call_args[0][0]
decode_replicas = call_args.get("VllmDecodeWorker", 1)
print(
f"Correction factor {correction_factor} test: Decode replicas={decode_replicas}"
)
# Should get a valid result (not crash)
assert (
decode_replicas >= 1
), f"Should handle correction factor {correction_factor} gracefully"
def test_multi_gpu_engines(self, planner):
"""Test replica calculation with multi-GPU engines."""
# Set multi-GPU configuration
planner.args.prefill_engine_num_gpu = 2
planner.args.decode_engine_num_gpu = 4
# Mock predictor outputs
planner.num_req_predictor.predict_next.return_value = 20
planner.isl_predictor.predict_next.return_value = 3000
planner.osl_predictor.predict_next.return_value = 150
# Mock interpolator outputs
planner.prefill_interpolator.interpolate_thpt_per_gpu.return_value = 40000
planner.decode_interpolator.find_best_throughput_per_gpu.return_value = (
5000,
0.01,
0.5,
) # Lower for scaling
# Set up metrics
planner.last_metrics = Metrics(
num_req=20, isl=3000, osl=150, ttft=80.0, itl=10.0, request_duration=100.0
)
# Mock workers info
async def mock_get_workers_info():
return (["prefill1"], ["decode1"])
planner.get_workers_info = mock_get_workers_info
# Mock interpolation calls
planner.prefill_interpolator.interpolate_ttft.return_value = 80.0
planner.decode_interpolator.interpolate_itl.return_value = 10.0
# Calculate expected results manually
pred_prefill_load_per_gpu = 20 * 3000 / planner.args.adjustment_interval * 1.0
expected_prefill_replicas = math.ceil(
pred_prefill_load_per_gpu / 40000 / 2
) # 2 GPUs per engine
expected_decode_replicas = math.ceil(
20 * 150 / planner.args.adjustment_interval / 5000 / 4
) # 4 GPUs per engine
# Run calculation
import asyncio
asyncio.run(planner.make_adjustments())
if planner.connector.set_component_replicas.called:
call_args = planner.connector.set_component_replicas.call_args[0][0]
prefill_replicas = call_args.get("VllmPrefillWorker", 1)
decode_replicas = call_args.get("VllmDecodeWorker", 1)
print(
f"Multi-GPU test: P={prefill_replicas} (expected ~{expected_prefill_replicas}), D={decode_replicas} (expected ~{expected_decode_replicas})"
)
# Verify calculations account for multiple GPUs per engine
assert prefill_replicas == max(
expected_prefill_replicas, planner.args.min_endpoint
)
assert decode_replicas == max(
expected_decode_replicas, planner.args.min_endpoint
)
def test_complex_gpu_budget_scaling(self, planner):
"""Test complex GPU budget scaling with proportional reduction and decode adjustment."""
# Set tight GPU budget that will trigger complex scaling
planner.args.max_gpu_budget = 5
planner.args.prefill_engine_num_gpu = 2
planner.args.decode_engine_num_gpu = 2
planner.args.min_endpoint = 1
# High load that would normally require more GPUs
planner.num_req_predictor.predict_next.return_value = 100
planner.isl_predictor.predict_next.return_value = 3000
planner.osl_predictor.predict_next.return_value = 150
# Lower throughput to trigger higher replica needs
planner.prefill_interpolator.interpolate_thpt_per_gpu.return_value = 10000
planner.decode_interpolator.find_best_throughput_per_gpu.return_value = (
1000,
0.01,
0.5,
)
# Set up metrics
planner.last_metrics = Metrics(
num_req=100, isl=3000, osl=150, ttft=80.0, itl=10.0, request_duration=100.0
)
# Mock workers info
async def mock_get_workers_info():
return (["prefill1"], ["decode1"])
planner.get_workers_info = mock_get_workers_info
# Mock interpolation calls
planner.prefill_interpolator.interpolate_ttft.return_value = 80.0
planner.decode_interpolator.interpolate_itl.return_value = 10.0
# Run calculation
import asyncio
asyncio.run(planner.make_adjustments())
if planner.connector.set_component_replicas.called:
call_args = planner.connector.set_component_replicas.call_args[0][0]
prefill_replicas = call_args.get("VllmPrefillWorker", 1)
decode_replicas = call_args.get("VllmDecodeWorker", 1)
# Verify total GPU usage doesn't exceed budget
total_gpus = (
prefill_replicas * planner.args.prefill_engine_num_gpu
+ decode_replicas * planner.args.decode_engine_num_gpu
)
print(
f"Complex GPU budget test: P={prefill_replicas}, D={decode_replicas}, Total GPUs={total_gpus}"
)
assert (
total_gpus <= planner.args.max_gpu_budget
), "Total GPU usage should not exceed budget"
assert (
prefill_replicas >= planner.args.min_endpoint
), "Should respect min_endpoint for prefill"
assert (
decode_replicas >= planner.args.min_endpoint
), "Should respect min_endpoint for decode"
# No need for unittest.main() with pytest!
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
End-to-end test for SLA planner scaling behavior.
This test assumes a disaggregated planner deployment is already running
and accessible at localhost:8000. It monitors pod scaling and validates
that the planner correctly scales from 1P1D to 2P1D when load increases
through graduated phases: 8 req/s (baseline) → 15 req/s (moderate) → 25 req/s (prefill scaling trigger).
"""
import asyncio
import json
import logging
import subprocess
import time
import urllib.request
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
from utils.load_generator import LoadGenerator
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Test configuration constants
HEALTH_CHECK_TIMEOUT = 10
PORT_FORWARD_SETUP_DELAY = 3
FINAL_STABILIZATION_DELAY = 60
MONITORING_INTERVAL = 15
BUFFER_DURATION = 90
@dataclass
class PodCounts:
"""Track pod counts at a specific time."""
timestamp: float
prefill_pods: int
decode_pods: int
total_pods: int
def __str__(self):
return f"P={self.prefill_pods}, D={self.decode_pods}, Total={self.total_pods}"
class KubernetesMonitor:
"""Monitor Kubernetes deployment and pod scaling."""
def __init__(
self, namespace: str = "default", deployment_name: str = "vllm-disagg-planner"
):
self.namespace = namespace
self.deployment_name = deployment_name
self.pod_history: List[PodCounts] = []
def _run_kubectl(self, cmd: List[str]) -> Tuple[bool, str]:
"""Run kubectl command and return success status and output."""
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
return result.returncode == 0, result.stdout.strip()
except subprocess.TimeoutExpired:
logger.error(f"kubectl command timed out: {' '.join(cmd)}")
return False, ""
except Exception as e:
logger.error(f"kubectl command failed: {e}")
return False, ""
def get_pod_counts(self) -> Optional[PodCounts]:
"""Get current pod counts for prefill and decode workers."""
cmd = [
"kubectl",
"get",
"pods",
"-n",
self.namespace,
"--selector",
f"nvidia.com/dynamo-namespace={self.deployment_name}",
"-o",
"json",
]
success, output = self._run_kubectl(cmd)
if not success:
logger.warning("Failed to get pod counts")
return None
try:
data = json.loads(output)
prefill_pods = 0
decode_pods = 0
total_pods = 0
for pod in data.get("items", []):
pod_phase = pod.get("status", {}).get("phase", "")
pod_labels = pod.get("metadata", {}).get("labels", {})
component = pod_labels.get("nvidia.com/dynamo-component", "")
# Only count Running pods
if pod_phase == "Running":
if component == "VllmPrefillWorker":
prefill_pods += 1
elif component == "VllmDecodeWorker":
decode_pods += 1
else:
continue
total_pods += 1
counts = PodCounts(
timestamp=time.time(),
prefill_pods=prefill_pods,
decode_pods=decode_pods,
total_pods=total_pods,
)
self.pod_history.append(counts)
return counts
except Exception as e:
logger.error(f"Failed to parse pod counts: {e}")
return None
async def monitor_scaling(
self, duration: int, interval: int = 10
) -> List[PodCounts]:
"""Monitor pod scaling for a given duration."""
logger.info(f"Monitoring pod scaling for {duration}s (interval: {interval}s)")
start_time = time.time()
monitoring_data = []
while time.time() - start_time < duration:
counts = self.get_pod_counts()
if counts:
monitoring_data.append(counts)
logger.info(f"Pod counts: {counts}")
await asyncio.sleep(interval)
return monitoring_data
def wait_for_deployment_ready(self, timeout: int = 300) -> bool:
"""Wait for deployment to be ready."""
logger.info(f"Waiting for deployment {self.deployment_name} to be ready...")
cmd = [
"kubectl",
"wait",
"--for=condition=available",
f"deployment/{self.deployment_name}",
"-n",
self.namespace,
f"--timeout={timeout}s",
]
success, output = self._run_kubectl(cmd)
if success:
logger.info("Deployment is ready")
return True
else:
logger.error(f"Deployment failed to become ready: {output}")
return False
def apply_deployment(self, yaml_file: str) -> bool:
"""Apply Kubernetes deployment from YAML file."""
logger.info(f"Applying deployment from {yaml_file}")
cmd = ["kubectl", "apply", "-f", yaml_file, "-n", self.namespace]
success, output = self._run_kubectl(cmd)
if success:
logger.info("Deployment applied successfully")
return True
else:
logger.error(f"Failed to apply deployment: {output}")
return False
def delete_deployment(self, yaml_file: str) -> bool:
"""Delete Kubernetes deployment."""
logger.info(f"Deleting deployment from {yaml_file}")
cmd = [
"kubectl",
"delete",
"-f",
yaml_file,
"-n",
self.namespace,
"--ignore-not-found",
]
success, output = self._run_kubectl(cmd)
if success:
logger.info("Deployment deleted successfully")
else:
logger.warning(f"Failed to delete deployment: {output}")
return success
def check_service_health(
self, service_name: str | None = None, port: int = 8000
) -> bool:
"""Check if the frontend service is healthy."""
if service_name is None:
service_name = f"{self.deployment_name}-frontend"
# Port forward to check health
cmd = [
"kubectl",
"port-forward",
f"service/{service_name}",
f"{port}:{port}",
"-n",
self.namespace,
]
proc = None
try:
# Start port forwarding in background
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Give it a moment to establish connection
time.sleep(PORT_FORWARD_SETUP_DELAY)
# Try to check health endpoint
try:
response = urllib.request.urlopen(
f"http://localhost:{port}/health", timeout=HEALTH_CHECK_TIMEOUT
)
healthy = response.status == 200
logger.info(f"Service health check: {'OK' if healthy else 'FAILED'}")
except Exception as e:
logger.warning(f"Health check failed: {e}")
healthy = False
return healthy
except Exception as e:
logger.error(f"Failed to check service health: {e}")
return False
finally:
# Ensure port forwarding is terminated
if proc is not None:
proc.terminate()
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
class ScalingE2ETest:
"""End-to-end test for SLA planner scaling behavior."""
def __init__(
self,
namespace: str = "default",
base_url: str = "http://localhost:8000",
save_results: bool = False,
):
self.namespace = namespace
self.base_url = base_url
self.save_results = save_results
self.k8s_monitor = KubernetesMonitor(namespace)
self.load_generator = LoadGenerator(
base_url=base_url, save_results=save_results
)
self.test_results: Dict[str, Any] = {}
async def run_scaling_test(self) -> Dict:
"""
Run the complete scaling test.
Hardcoded scenario:
- Phase 1 (12 req/s): Should maintain 1P1D
- Phase 2 (24 req/s): Should scale to 2P1D
"""
logger.info("Starting scaling integration test")
test_start_time = time.time()
# Record initial state
initial_counts = self.k8s_monitor.get_pod_counts()
logger.info(f"Test starting with: {initial_counts}")
# Start background monitoring
# Calculate based on actual phases from load generator
# Phase durations: baseline(90s) + transition(30s) + moderate(120s) + transition(30s) + trigger(180s) + buffer
total_test_duration = 90 + 30 + 120 + 30 + 180 + BUFFER_DURATION
monitoring_task = asyncio.create_task(
self.k8s_monitor.monitor_scaling(
total_test_duration, interval=MONITORING_INTERVAL
)
)
# Initialize results in case of exception
baseline_results = {}
moderate_results = {}
trigger_results = {}
try:
# Use the load generator's built-in scaling test
logger.info("Running scaling scenario (8 req/s -> 15 req/s -> 25 req/s)")
load_results = await self.load_generator.run_scaling_test()
# Extract load results for analysis (3-phase structure)
phase_results = load_results.get("phase_results", {})
baseline_results = phase_results.get("phase1_baseline", {})
moderate_results = phase_results.get("phase2_moderate", {})
trigger_results = phase_results.get("phase3_prefill_scaling_trigger", {})
# Check final pod counts
final_counts = self.k8s_monitor.get_pod_counts()
logger.info(f"Final pod counts: {final_counts}")
# Wait a bit more to capture any delayed scaling
logger.info("Waiting for potential delayed scaling...")
await asyncio.sleep(FINAL_STABILIZATION_DELAY)
# Get final final counts
final_final_counts = self.k8s_monitor.get_pod_counts()
logger.info(f"Final final pod counts: {final_final_counts}")
except Exception as e:
logger.error(f"Test execution failed: {e}")
raise
finally:
# Stop monitoring
monitoring_task.cancel()
try:
await monitoring_task
except asyncio.CancelledError:
pass
# Compile results
test_results: Dict[str, Any] = {
"test_duration": time.time() - test_start_time,
"config": {
# Document actual test configuration
"baseline_rps": 8.0,
"moderate_rps": 15.0,
"trigger_rps": 25.0,
"phase_durations": {"baseline": 90, "moderate": 120, "trigger": 180},
"transition_delay": 30,
},
"initial_pod_counts": initial_counts.__dict__ if initial_counts else None,
"baseline_results": baseline_results,
"moderate_results": moderate_results,
"trigger_results": trigger_results,
"final_pod_counts": final_counts.__dict__ if final_counts else None,
"final_final_pod_counts": final_final_counts.__dict__
if final_final_counts
else None,
"pod_history": [counts.__dict__ for counts in self.k8s_monitor.pod_history],
"scaling_analysis": self.analyze_scaling_behavior(),
}
return test_results
def analyze_scaling_behavior(self) -> Dict:
"""Analyze the scaling behavior from pod history."""
if len(self.k8s_monitor.pod_history) < 2:
return {"error": "Insufficient data for analysis"}
history = self.k8s_monitor.pod_history
# Find scaling events
scaling_events = []
for i in range(1, len(history)):
prev = history[i - 1]
curr = history[i]
if (
curr.prefill_pods != prev.prefill_pods
or curr.decode_pods != prev.decode_pods
):
scaling_events.append(
{
"timestamp": curr.timestamp,
"from": f"P={prev.prefill_pods}, D={prev.decode_pods}",
"to": f"P={curr.prefill_pods}, D={curr.decode_pods}",
"change": {
"prefill": curr.prefill_pods - prev.prefill_pods,
"decode": curr.decode_pods - prev.decode_pods,
},
}
)
# Check if expected scaling occurred
initial = history[0]
final = history[-1]
expected_scaling = {
"initial_1p1d": initial.prefill_pods == 1 and initial.decode_pods == 1,
"final_2p1d": final.prefill_pods == 2 and final.decode_pods == 1,
"scaling_occurred": len(scaling_events) > 0,
"correct_scaling": (
final.prefill_pods == 2
and final.decode_pods == 1
and initial.prefill_pods == 1
and initial.decode_pods == 1
),
}
return {
"scaling_events": scaling_events,
"initial_state": f"P={initial.prefill_pods}, D={initial.decode_pods}",
"final_state": f"P={final.prefill_pods}, D={final.decode_pods}",
"expected_scaling": expected_scaling,
"total_scaling_events": len(scaling_events),
}
def validate_test_results(self, results: Dict) -> Dict:
"""Validate that the test achieved expected scaling behavior."""
validation: Dict[str, Any] = {"test_passed": False, "issues": [], "summary": ""}
# Check if we have the expected data
if not results.get("scaling_analysis"):
validation["issues"].append("No scaling analysis data")
return validation
analysis = results["scaling_analysis"]
expected = analysis.get("expected_scaling", {})
# Validate initial state
if not expected.get("initial_1p1d"):
validation["issues"].append("Test did not start with 1P1D configuration")
# Validate final state
if not expected.get("final_2p1d"):
validation["issues"].append(
"Test did not end with expected 2P1D configuration"
)
# Validate scaling occurred
if not expected.get("scaling_occurred"):
validation["issues"].append("No scaling events detected")
# Check if correct scaling occurred
if expected.get("correct_scaling"):
validation["test_passed"] = True
validation[
"summary"
] = "✅ Test PASSED: Successfully scaled from 1P1D to 2P1D"
else:
validation[
"summary"
] = "❌ Test FAILED: Did not achieve expected 1P1D -> 2P1D scaling"
# Add performance validation across all phases
baseline = results.get("baseline_results", {})
moderate = results.get("moderate_results", {})
trigger = results.get("trigger_results", {})
if baseline.get("throughput", 0) > 0:
validation["baseline_throughput"] = f"{baseline['throughput']:.2f} req/s"
if moderate.get("throughput", 0) > 0:
validation["moderate_throughput"] = f"{moderate['throughput']:.2f} req/s"
if trigger.get("throughput", 0) > 0:
validation["trigger_throughput"] = f"{trigger['throughput']:.2f} req/s"
return validation
async def main():
"""Main function for running the e2e test."""
import argparse
parser = argparse.ArgumentParser(description="SLA Planner Scaling E2E Test")
parser.add_argument("--namespace", default="default", help="Kubernetes namespace")
parser.add_argument(
"--base-url", default="http://localhost:8000", help="Service URL"
)
parser.add_argument(
"--save-results",
action="store_true",
help="Save results to tests/planner/e2e_scaling_results instead of /tmp",
)
# No additional arguments needed - test is hardcoded
args = parser.parse_args()
test = ScalingE2ETest(
namespace=args.namespace, base_url=args.base_url, save_results=args.save_results
)
try:
# Check that service is accessible
logger.info(f"Checking service availability at {args.base_url}...")
# Run the scaling test
logger.info("Running scaling test...")
results = await test.run_scaling_test()
# Validate results
validation = test.validate_test_results(results)
# Save results
timestamp = int(time.time())
results_file = f"/tmp/scaling_test_results_{timestamp}.json"
with open(results_file, "w") as f:
json.dump({"results": results, "validation": validation}, f, indent=2)
# Print summary
logger.info("=" * 60)
logger.info("TEST SUMMARY")
logger.info("=" * 60)
logger.info(validation["summary"])
if validation["issues"]:
logger.info("\nIssues found:")
for issue in validation["issues"]:
logger.info(f" - {issue}")
if any(k.endswith("_throughput") for k in validation.keys()):
logger.info("\nPerformance:")
if "baseline_throughput" in validation:
logger.info(
f" Baseline (8 req/s): {validation['baseline_throughput']}"
)
if "moderate_throughput" in validation:
logger.info(
f" Moderate (15 req/s): {validation['moderate_throughput']}"
)
if "trigger_throughput" in validation:
logger.info(f" Trigger (25 req/s): {validation['trigger_throughput']}")
logger.info(f"\nDetailed results saved to: {results_file}")
logger.info("=" * 60)
return 0 if validation["test_passed"] else 1
except Exception as e:
logger.error(f"Test failed with error: {e}")
return 1
if __name__ == "__main__":
import sys
sys.exit(asyncio.run(main()))
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Load generation script for SLA planner scaling tests.
This script uses genai-perf to generate load at specific request rates
to test the planner's scaling behavior.
"""
import argparse
import asyncio
import json
import logging
import os
import tempfile
import time
from typing import Any, Dict, List, Optional
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
class LoadGenerator:
"""Generate load using genai-perf to test planner scaling."""
def __init__(
self,
base_url: str = "http://localhost:8000",
model: str = "nvidia/Llama-3.1-8B-Instruct-FP8",
isl: int = 4000,
osl: int = 150,
save_results: bool = False,
):
self.base_url = base_url
self.model = model
self.isl = isl
self.osl = osl
self.save_results = save_results
def _calculate_genai_perf_params(
self,
req_per_sec: float,
) -> Dict[str, Any]:
"""
Calculate genai-perf parameters to approximate desired request rate.
Args:
req_per_sec: Desired requests per second
duration_sec: Test duration in seconds
estimated_request_duration: Estimated average request duration in seconds
Returns:
Dictionary with concurrency and request_rate parameters
"""
concurrency = max(1, int(req_per_sec * 3))
return {
"concurrency": concurrency,
"request_rate": req_per_sec,
}
async def generate_load(
self, req_per_sec: float, duration_sec: int, artifact_dir: Optional[str] = None
) -> Dict[str, Any]:
"""
Generate load at specified request rate for given duration.
Args:
req_per_sec: Target requests per second
duration_sec: Duration to generate load (seconds)
artifact_dir: Directory to store genai-perf artifacts
Returns:
Dictionary with load test results
"""
logger.info(f"Generating load: {req_per_sec} req/s for {duration_sec}s")
# Calculate genai-perf parameters
params = self._calculate_genai_perf_params(req_per_sec)
logger.info(f"Using request_rate={params['request_rate']} req/s")
# Create artifact directory if not provided
if artifact_dir is None:
artifact_dir = tempfile.mkdtemp(prefix="scaling_test_")
os.makedirs(artifact_dir, exist_ok=True)
# Drive test length by caller-provided duration
request_count = max(1, int(params["request_rate"] * duration_sec))
logger.info(
f"Adjusted parameters: duration={duration_sec}s, request_count={request_count}"
)
# Build genai-perf command based on coworker's successful approach
cmd = [
"genai-perf",
"profile",
"--model",
self.model,
"--tokenizer",
self.model,
"--endpoint-type",
"chat",
"--url",
self.base_url.replace("http://", ""),
"--streaming",
"--synthetic-input-tokens-mean",
str(self.isl),
"--output-tokens-mean",
str(self.osl),
"--request-rate",
str(params["request_rate"]),
"--request-count",
str(request_count), # Use request count to limit test duration
"--stability-percentage",
"50",
"--num-dataset-entries",
str(
max(20, int(params["request_rate"] * 10))
), # Generate reasonable dataset size
"--artifact-dir",
artifact_dir,
"--",
"-v",
"-max-threads",
"64",
]
logger.info(f"Running command: {' '.join(cmd)}")
# Run genai-perf (async)
start_time = time.time()
timeout = max(duration_sec + 60, int(duration_sec * 1.5))
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(), timeout=timeout
)
except asyncio.TimeoutError:
proc.kill()
await proc.communicate()
logger.error("genai-perf timed out")
raise RuntimeError("Load generation timed out")
end_time = time.time()
actual_duration = end_time - start_time
# Persist logs for debugging
try:
with open(
os.path.join(artifact_dir, "genai_perf.stdout.log"), "wb"
) as f:
f.write(stdout or b"")
with open(
os.path.join(artifact_dir, "genai_perf.stderr.log"), "wb"
) as f:
f.write(stderr or b"")
except Exception:
pass
if proc.returncode == 0:
logger.info("Load generation completed successfully")
logger.info(f"Actual duration: {actual_duration:.2f}s")
results = self._parse_genai_perf_results(artifact_dir)
results.update(
{
"requested_req_per_sec": req_per_sec,
"actual_duration": actual_duration,
"target_duration": duration_sec,
"genai_perf_params": params,
"artifact_dir": artifact_dir,
"success": True,
}
)
return results
else:
logger.error(f"genai-perf failed with return code {proc.returncode}")
raise RuntimeError("genai-perf failed; see logs in artifact dir")
except RuntimeError:
raise
except Exception as e:
logger.error(f"genai-perf execution error: {e}")
raise
def _parse_genai_perf_results(self, artifact_dir: str) -> Dict[str, Any]:
"""Parse genai-perf results from artifact directory."""
try:
# Look for the profile_export_genai_perf.json file
json_files = [f for f in os.listdir(artifact_dir) if f.endswith(".json")]
if not json_files:
logger.warning("No JSON results found in artifact directory")
return {}
# Main results file
results_file = None
for json_file in json_files:
if "profile_export" in json_file or "genai_perf" in json_file:
results_file = os.path.join(artifact_dir, json_file)
break
if not results_file:
results_file = os.path.join(artifact_dir, json_files[0])
logger.info(f"Parsing results from: {results_file}")
with open(results_file, "r") as f:
data = json.load(f)
results = {}
if "experiments" in data and data["experiments"]:
exp = data["experiments"][0]
if "perf_metrics" in exp:
metrics = exp["perf_metrics"]
results.update(
{
"throughput": metrics.get("throughput", {}).get("avg", 0),
"ttft_mean": metrics.get("ttft", {}).get("avg", 0),
"itl_mean": metrics.get("inter_token_latency", {}).get(
"avg", 0
),
"end_to_end_latency_mean": metrics.get(
"request_latency", {}
).get("avg", 0),
}
)
if not results and "profile_export_genai_perf" in data:
summary = data.get("summary", {})
results.update(
{
"throughput": summary.get("throughput", 0),
"ttft_mean": summary.get("time_to_first_token_ms", 0),
"itl_mean": summary.get("inter_token_latency_ms", 0),
}
)
logger.info(f"Parsed results: {results}")
return results
except Exception as e:
logger.warning(f"Failed to parse genai-perf results: {e}")
return {}
async def run_scaling_test(self) -> Dict[str, Any]:
"""
Run a multi-phase graduated scaling test for prefill scaling.
Uses a conservative graduated approach:
- Phase 1: 5 req/s (baseline, should work)
- Phase 2: 10 req/s (moderate load)
- Phase 3: 18 req/s (should trigger prefill scaling to 2P1D)
Returns:
Dictionary with complete test results
"""
logger.info(
"Starting graduated prefill scaling test scenario (targeting 1P1D -> 2P1D)"
)
logger.info("Using conservative graduated approach with metric generation")
# Graduated test parameters (optimized for prefill scaling)
phases: List[Dict[str, Any]] = [
{"rate": 8.0, "duration": 90, "name": "baseline"},
{"rate": 15.0, "duration": 120, "name": "moderate"},
{"rate": 25.0, "duration": 180, "name": "prefill_scaling_trigger"},
]
transition_delay = 30
# Create artifact directory
timestamp = int(time.time())
if self.save_results:
script_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
base_dir = os.path.join(
script_dir, "e2e_scaling_results", f"scaling_test_{timestamp}"
)
else:
base_dir = f"/tmp/scaling_test_{timestamp}"
os.makedirs(base_dir, exist_ok=True)
logger.info(f"Saving results to: {base_dir}")
results = {
"test_timestamp": timestamp,
"config": {
"approach": "graduated_scaling",
"phases": phases,
"transition_delay": transition_delay,
"isl": self.isl,
"osl": self.osl,
"model": self.model,
},
}
try:
phase_results = {}
for i, phase in enumerate(phases):
phase_name = f"phase{i+1}_{phase['name']}"
logger.info(
f"Starting {phase_name}: {phase['rate']} req/s for {phase['duration']}s"
)
phase_dir = os.path.join(base_dir, phase_name)
phase_result = await self.generate_load(
req_per_sec=phase["rate"],
duration_sec=phase["duration"],
artifact_dir=phase_dir,
)
phase_results[phase_name] = phase_result
# Add transition delay except after last phase
if i < len(phases) - 1:
logger.info(f"Transition delay: {transition_delay}s")
await asyncio.sleep(transition_delay)
results["phase_results"] = phase_results
logger.info("Graduated scaling test completed successfully")
except Exception as e:
logger.error(f"Scaling test failed: {e}")
results["error"] = str(e)
raise
# Save results
results_file = os.path.join(base_dir, "scaling_test_results.json")
with open(results_file, "w") as f:
json.dump(results, f, indent=2)
logger.info(f"Test results saved to: {results_file}")
return results
async def main():
"""Main function for scaling test execution."""
parser = argparse.ArgumentParser(
description="SLA Planner Graduated Scaling Test - Optimized for 2P1D prefill scaling"
)
parser.add_argument(
"--base-url",
default="http://localhost:8000",
help="Service URL (default: http://localhost:8000)",
)
parser.add_argument(
"--model",
default="nvidia/Llama-3.1-8B-Instruct-FP8",
help="Model name (default: nvidia/Llama-3.1-8B-Instruct-FP8)",
)
parser.add_argument(
"--isl",
type=int,
default=4000,
help="Input sequence length - optimized for prefill scaling (default: 4000)",
)
parser.add_argument(
"--osl",
type=int,
default=150,
help="Output sequence length - optimized for prefill scaling (default: 150)",
)
parser.add_argument(
"--save-results",
action="store_true",
help="Save results to tests/planner/e2e_scaling_results instead of /tmp",
)
args = parser.parse_args()
generator = LoadGenerator(
base_url=args.base_url,
model=args.model,
isl=args.isl,
osl=args.osl,
save_results=args.save_results,
)
print("Starting SLA Planner Graduated Scaling Test...")
print(f"Parameters: ISL={args.isl}, OSL={args.osl}")
print(
"Test phases: 8 -> 15 -> 25 req/s (optimized for 1P1D -> 2P1D prefill scaling)"
)
results = await generator.run_scaling_test()
print("\n" + "=" * 60)
print("SCALING TEST COMPLETED")
print("=" * 60)
# Print results summary
phase_results = results.get("phase_results", {})
for phase_name, phase_data in phase_results.items():
ok = isinstance(phase_data, dict) and phase_data.get(
"success", bool(phase_data)
)
if ok:
duration = phase_data.get("actual_duration")
if isinstance(duration, (int, float)):
print(f"{phase_name}: {duration:.1f}s duration - SUCCESS")
else:
print(f"{phase_name}: SUCCESS")
else:
print(f"{phase_name}: FAILED")
print("\nResults saved to scaling test directory")
if __name__ == "__main__":
asyncio.run(main())
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment