"ssh:/git@developer.sourcefind.cn:2222/OpenDAS/dynamo.git" did not exist on "1585244bc7004d4fc4f0f40d9c53994083251e25"
Unverified Commit 6401e34d authored by Tushar Sharma's avatar Tushar Sharma Committed by GitHub
Browse files

ci: Transition deploy tests to pytest framework (#5874)


Signed-off-by: default avatarTushar Sharma <tusharma@nvidia.com>
parent 5092a5d0
...@@ -10,82 +10,19 @@ inputs: ...@@ -10,82 +10,19 @@ inputs:
description: 'Kubernetes namespace for deployment' description: 'Kubernetes namespace for deployment'
required: true required: true
# Deployment Configuration
deployment_file:
description: 'Path to the DynamoGraphDeployment YAML file (relative to examples/backends/<framework>)'
required: true
framework: framework:
description: 'Framework name (vllm, sglang, trtllm)' description: 'Framework name (vllm, sglang, trtllm)'
required: true required: true
framework_runtime_image: profile:
description: 'Full container image reference for the framework runtime' description: 'Deployment profile (e.g., disagg_router, agg)'
required: true required: true
image:
# Model Configuration description: 'Full container image reference for the framework runtime'
model_name:
description: 'Model name to test (e.g., Qwen/Qwen3-0.6B)'
required: false
default: 'Qwen/Qwen3-0.6B'
# Test Configuration
pod_ready_timeout:
description: 'Timeout for pods to become ready (kubectl wait format)'
required: false
default: '300s'
model_available_max_attempts:
description: 'Maximum attempts to wait for model availability'
required: false
default: '30'
model_available_retry_delay:
description: 'Delay between model availability checks (seconds)'
required: false
default: '5'
port_forward_delay:
description: 'Delay after port-forward to allow connection (seconds)'
required: false
default: '10'
test_identifier:
description: 'Unique identifier for test output (used for log file and artifact naming)'
required: true required: true
platform_arch:
description: 'Platform architecture (amd64, arm64)'
# Request Configuration
max_tokens:
description: 'Maximum tokens for test request'
required: false
default: '30'
temperature:
description: 'Temperature for test request'
required: false
default: '0.0'
test_prompt:
description: 'Test prompt to send (optional, uses default if not provided)'
required: false
default: ''
# Validation Configuration
min_response_length:
description: 'Minimum expected response content length'
required: false
default: '100'
skip_cleanup:
description: 'Skip cleanup step (useful for debugging)'
required: false required: false
default: 'false' default: 'amd64'
outputs:
graph_name:
description: 'Name of the deployed DynamoGraphDeployment'
value: ${{ steps.deploy.outputs.graph_name }}
test_result:
description: 'Test result (0=pass, 1=fail)'
value: ${{ steps.test.outputs.test_result }}
test_log_path:
description: 'Path to test output log'
value: ${{ steps.setup-test-names.outputs.test_output_log_file }}
artifact_name:
description: 'Name of the uploaded artifact'
value: ${{ steps.setup-test-names.outputs.artifact_name }}
runs: runs:
using: "composite" using: "composite"
...@@ -102,232 +39,38 @@ runs: ...@@ -102,232 +39,38 @@ runs:
kubectl config set-context --current --namespace=${{ inputs.namespace }} kubectl config set-context --current --namespace=${{ inputs.namespace }}
kubectl config get-contexts kubectl config get-contexts
- name: Setup Test Output Names - name: Set up Python
id: setup-test-names uses: actions/setup-python@v5
with:
python-version: '3.12'
cache: 'pip'
cache-dependency-path: 'container/deps/requirements.test.txt'
- name: Install test dependencies
shell: bash shell: bash
run: | run: |
TEST_IDENTIFIER="${{ inputs.test_identifier }}" python -m pip install --upgrade pip
echo "test_output_log_file=deploy_test_output_${TEST_IDENTIFIER}.log" >> $GITHUB_OUTPUT pip install -r container/deps/requirements.test.txt
# Replace underscores with dashes for artifact name (GitHub artifact naming convention)
ARTIFACT_NAME="test-results-${TEST_IDENTIFIER//_/-}"
echo "artifact_name=${ARTIFACT_NAME}" >> $GITHUB_OUTPUT
- name: Deploy and Test - name: Deploy and Test
id: deploy id: deploy
shell: bash shell: bash
working-directory: ${{ github.workspace }}/examples/backends/${{ inputs.framework }}
env:
NAMESPACE: ${{ inputs.namespace }}
FRAMEWORK: ${{ inputs.framework }}
FRAMEWORK_RUNTIME_IMAGE: ${{ inputs.framework_runtime_image }}
DEPLOYMENT_FILE: ${{ inputs.deployment_file }}
MODEL_NAME: ${{ inputs.model_name }}
POD_READY_TIMEOUT: ${{ inputs.pod_ready_timeout }}
run: |
set -x
export KUBECONFIG=${{ github.workspace }}/.kubeconfig
kubectl config set-context --current --namespace=$NAMESPACE
# Redirect all output to a log file while still showing it
exec > >(tee -a "${{ steps.setup-test-names.outputs.test_output_log_file }}") 2>&1
export KUBE_NS=$NAMESPACE
export GRAPH_NAME=$(yq e '.metadata.name' $DEPLOYMENT_FILE)
echo "graph_name=${GRAPH_NAME}" >> $GITHUB_OUTPUT
# Update the deployment file with the runtime image
# Use strenv() to ensure the image string is treated as plain string, not parsed as YAML
yq -i '.spec.services.[].extraPodSpec.mainContainer.image = strenv(FRAMEWORK_RUNTIME_IMAGE)' $DEPLOYMENT_FILE
echo "=== DEPLOYMENT CONFIGURATION ==="
echo "Framework: ${FRAMEWORK}"
echo "Runtime Image: ${FRAMEWORK_RUNTIME_IMAGE}"
echo "Graph Name: ${GRAPH_NAME}"
echo "Namespace: ${KUBE_NS}"
echo ""
echo "=== UPDATED DEPLOYMENT FILE ==="
cat $DEPLOYMENT_FILE
# Apply the deployment
kubectl apply -n $KUBE_NS -f $DEPLOYMENT_FILE
# Wait for pods to be ready
echo "=== WAITING FOR PODS ==="
sleep 20
echo "Waiting for pods with label nvidia.com/dynamo-graph-deployment-name: $GRAPH_NAME"
if ! kubectl wait --for=condition=ready pod \
-l "nvidia.com/dynamo-graph-deployment-name=$GRAPH_NAME" \
-n ${KUBE_NS} \
--timeout=${POD_READY_TIMEOUT}; then
echo "::error::Pods failed to become ready within timeout"
echo "deploy_failed=true" >> $GITHUB_OUTPUT
exit 1
fi
echo "=== FINAL POD STATUSES ==="
kubectl get pods -l "nvidia.com/dynamo-graph-deployment-name=$GRAPH_NAME" -n $KUBE_NS -o wide
echo ""
kubectl get all -n $KUBE_NS
- name: Debug Pod Failure
id: debug-failure
if: failure() && steps.deploy.outputs.deploy_failed == 'true'
shell: bash
env: env:
KUBECONFIG: ${{ github.workspace }}/.kubeconfig
NAMESPACE: ${{ inputs.namespace }} NAMESPACE: ${{ inputs.namespace }}
FRAMEWORK: ${{ inputs.framework }} FRAMEWORK: ${{ inputs.framework }}
DEPLOYMENT_FILE: ${{ inputs.deployment_file }} PROFILE: ${{ inputs.profile }}
GRAPH_NAME: ${{ steps.deploy.outputs.graph_name }} IMAGE: ${{ inputs.image }}
run: |
export KUBECONFIG=${{ github.workspace }}/.kubeconfig
echo "## ❌ Pod Readiness Failure: ${FRAMEWORK} / ${DEPLOYMENT_FILE}" >> "$GITHUB_STEP_SUMMARY"
echo "" >> "$GITHUB_STEP_SUMMARY"
echo "**Graph Name:** \`${GRAPH_NAME}\`" >> "$GITHUB_STEP_SUMMARY"
echo "**Namespace:** \`${NAMESPACE}\`" >> "$GITHUB_STEP_SUMMARY"
echo "" >> "$GITHUB_STEP_SUMMARY"
echo "### All relevant Pods in Namespace" >> "$GITHUB_STEP_SUMMARY"
echo '```' >> "$GITHUB_STEP_SUMMARY"
kubectl get pods -l "nvidia.com/dynamo-graph-deployment-name=$GRAPH_NAME" -n ${NAMESPACE} -o wide >> "$GITHUB_STEP_SUMMARY" 2>&1
echo '```' >> "$GITHUB_STEP_SUMMARY"
echo "" >> "$GITHUB_STEP_SUMMARY"
# echo "### Pod Descriptions" >> "$GITHUB_STEP_SUMMARY"
# echo '```' >> "$GITHUB_STEP_SUMMARY"
# kubectl describe pods -l "nvidia.com/dynamo-graph-deployment-name=$GRAPH_NAME" -n ${NAMESPACE} >> "$GITHUB_STEP_SUMMARY" 2>&1
# echo '```' >> "$GITHUB_STEP_SUMMARY"
# echo "" >> "$GITHUB_STEP_SUMMARY"
echo "### Pod Logs (last 30 lines per container)" >> "$GITHUB_STEP_SUMMARY"
echo "" >> "$GITHUB_STEP_SUMMARY"
# Get logs pod by pod for better readability
PODS=$(kubectl get pods -l "nvidia.com/dynamo-graph-deployment-name=$GRAPH_NAME" -n ${NAMESPACE} -o jsonpath='{.items[*].metadata.name}')
if [ -z "$PODS" ]; then
echo "_No pods found matching the deployment label_" >> "$GITHUB_STEP_SUMMARY"
else
for POD in $PODS; do
echo "#### Pod: \`${POD}\`" >> "$GITHUB_STEP_SUMMARY"
echo '```' >> "$GITHUB_STEP_SUMMARY"
kubectl logs --tail=30 --all-containers=true ${POD} -n ${NAMESPACE} >> "$GITHUB_STEP_SUMMARY" 2>&1 || echo "No logs available for ${POD}" >> "$GITHUB_STEP_SUMMARY"
echo '```' >> "$GITHUB_STEP_SUMMARY"
echo "" >> "$GITHUB_STEP_SUMMARY"
done
fi
- name: Run Validation Tests
id: test
shell: bash
env:
NAMESPACE: ${{ inputs.namespace }}
FRAMEWORK: ${{ inputs.framework }}
MODEL_NAME: ${{ inputs.model_name }}
MAX_ATTEMPTS: ${{ inputs.model_available_max_attempts }}
RETRY_DELAY: ${{ inputs.model_available_retry_delay }}
PORT_FORWARD_DELAY: ${{ inputs.port_forward_delay }}
MAX_TOKENS: ${{ inputs.max_tokens }}
TEMPERATURE: ${{ inputs.temperature }}
MIN_RESPONSE_LENGTH: ${{ inputs.min_response_length }}
TEST_PROMPT: ${{ inputs.test_prompt }}
GRAPH_NAME: ${{ steps.deploy.outputs.graph_name }}
run: | run: |
set -x mkdir -p test-results
export KUBECONFIG=${{ github.workspace }}/.kubeconfig pytest tests/deploy/test_deploy.py \
--framework="${FRAMEWORK}" \
# Get frontend pod and setup port-forward --profile="${PROFILE}" \
FRONTEND_POD=$(kubectl get pods -n ${NAMESPACE} \ --image="${IMAGE}" \
-l nvidia.com/dynamo-component-type=frontend,nvidia.com/dynamo-graph-deployment-name=${GRAPH_NAME} \ --namespace="${NAMESPACE}" \
-o jsonpath='{.items[0].metadata.name}') -v -s \
--durations=10 \
CONTAINER_PORT=$(kubectl get pod $FRONTEND_POD -n ${NAMESPACE} \ --junitxml=test-results/pytest_deploy_${FRAMEWORK}_${PROFILE}_${{ inputs.platform_arch }}_${{ github.run_id }}_${{ job.check_run_id }}.xml \
-o jsonpath='{.spec.containers[0].ports[?(@.name=="http")].containerPort}') --log-cli-level=INFO
echo "Frontend Pod: ${FRONTEND_POD}"
echo "Container Port: ${CONTAINER_PORT}"
kubectl port-forward pod/$FRONTEND_POD 8000:${CONTAINER_PORT} -n ${NAMESPACE} &
PORT_FORWARD_PID=$!
LLM_URL="http://localhost:8000"
sleep ${PORT_FORWARD_DELAY}
echo "LLM URL: ${LLM_URL}"
echo "Model Name: ${MODEL_NAME}"
# Wait for model to be available
ATTEMPT=1
while [ $ATTEMPT -le $MAX_ATTEMPTS ]; do
MODELS_RESPONSE=$(curl -s --retry 5 --retry-delay 2 --retry-connrefused "${LLM_URL}/v1/models" || true)
if echo "$MODELS_RESPONSE" | jq -e --arg MODEL_NAME "$MODEL_NAME" '.data[]?.id == $MODEL_NAME' >/dev/null 2>&1; then
echo "Model $MODEL_NAME is available in /v1/models"
break
fi
echo "Waiting for model $MODEL_NAME... (attempt $ATTEMPT/$MAX_ATTEMPTS)"
sleep ${RETRY_DELAY}
ATTEMPT=$((ATTEMPT + 1))
done
if [ $ATTEMPT -gt $MAX_ATTEMPTS ]; then
echo "Model $MODEL_NAME not found after $MAX_ATTEMPTS attempts"
echo "Last response: $MODELS_RESPONSE"
echo "test_result=1" >> $GITHUB_OUTPUT
kill $PORT_FORWARD_PID 2>/dev/null || true
exit 1
fi
# Use default prompt if not provided
if [ -z "$TEST_PROMPT" ]; then
TEST_PROMPT="In the heart of Eldoria, an ancient land of boundless magic and mysterious creatures, lies the long-forgotten city of Aeloria. Once a beacon of knowledge and power, Aeloria was buried beneath the shifting sands of time, lost to the world for centuries. You are an intrepid explorer, known for your unparalleled curiosity and courage, who has stumbled upon an ancient map hinting at ests that Aeloria holds a secret so profound that it has the potential to reshape the very fabric of reality. Your journey will take you through treacherous deserts, enchanted forests, and across perilous mountain ranges. Your Task: Character Background: Develop a detailed background for your character. Describe their motivations for seeking out Aeloria, their skills and weaknesses, and any personal connections to the ancient city or its legends. Are they driven by a quest for knowledge, a search for lost familt clue is hidden."
fi
# Send test request
RESPONSE=$(curl -s -N --no-buffer --retry 10 --retry-delay 5 --retry-connrefused \
-X POST "${LLM_URL}/v1/chat/completions" \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"model": "'"${MODEL_NAME}"'",
"messages": [{"role": "user", "content": "'"${TEST_PROMPT}"'"}],
"stream": false,
"max_tokens": '"${MAX_TOKENS}"',
"temperature": '"${TEMPERATURE}"'
}' 2>&1)
echo "Response: $RESPONSE"
# Validate response
TEST_RESULT=0
if ! echo "$RESPONSE" | jq -e . >/dev/null 2>&1; then
echo "❌ Test failed: Response is not valid JSON"
echo "Got: $RESPONSE"
TEST_RESULT=1
elif ! echo "$RESPONSE" | jq -e '.choices[0].message.role == "assistant"' >/dev/null 2>&1; then
echo "❌ Test failed: Message role is not 'assistant'"
echo "Got: $(echo "$RESPONSE" | jq '.choices[0].message.role')"
TEST_RESULT=1
elif ! echo "$RESPONSE" | jq -e '.model == "'"${MODEL_NAME}"'"' >/dev/null 2>&1; then
echo "❌ Test failed: Model name mismatch"
echo "Expected: ${MODEL_NAME}"
echo "Got: $(echo "$RESPONSE" | jq '.model')"
TEST_RESULT=1
elif ! echo "$RESPONSE" | jq -e '.choices[0].message.content | length > '"${MIN_RESPONSE_LENGTH}"'' >/dev/null 2>&1; then
echo "❌ Test failed: Response too short (min: ${MIN_RESPONSE_LENGTH})"
echo "Got length: $(echo "$RESPONSE" | jq '.choices[0].message.content | length')"
TEST_RESULT=1
else
echo "✅ Test passed: Response matches expected format and content"
fi
echo "test_result=${TEST_RESULT}" >> $GITHUB_OUTPUT
# Cleanup port-forward
kill $PORT_FORWARD_PID 2>/dev/null || true
exit $TEST_RESULT
- name: Cleanup Deployment - name: Cleanup Deployment
if: always() && inputs.skip_cleanup != 'true' if: always() && inputs.skip_cleanup != 'true'
...@@ -357,6 +100,6 @@ runs: ...@@ -357,6 +100,6 @@ runs:
uses: actions/upload-artifact@b7c566a772e6b6bfb58ed0dc250532a479d7789f #v6 uses: actions/upload-artifact@b7c566a772e6b6bfb58ed0dc250532a479d7789f #v6
if: always() if: always()
with: with:
name: ${{ steps.setup-test-names.outputs.artifact_name }} name: test-results-${{ inputs.framework }}-${{ inputs.profile }}-${{ inputs.platform_arch }}-${{ github.run_id }}-${{ job.check_run_id }}
path: ${{ steps.setup-test-names.outputs.test_output_log_file }} path: test-results/pytest_deploy_${{ inputs.framework }}_${{ inputs.profile }}_${{ inputs.platform_arch }}_${{ github.run_id }}_${{ job.check_run_id }}.xml
retention-days: 7 retention-days: 7
...@@ -318,7 +318,8 @@ jobs: ...@@ -318,7 +318,8 @@ jobs:
# End-to-end tests for each framework with various deployment profiles # End-to-end tests for each framework with various deployment profiles
# ============================================================================ # ============================================================================
deploy-test-vllm: deploy-test-vllm:
if: ( github.ref_name == 'main' || github.event.inputs.run_deploy_operator ) # Run if core, vllm, or deploy is changed
if: needs.changed-files.outputs.core == 'true' || needs.changed-files.outputs.vllm == 'true' || needs.changed-files.outputs.deploy == 'true'
runs-on: prod-default-small-v2 runs-on: prod-default-small-v2
needs: [deploy-operator, vllm-pipeline] needs: [deploy-operator, vllm-pipeline]
permissions: permissions:
...@@ -331,44 +332,6 @@ jobs: ...@@ -331,44 +332,6 @@ jobs:
- agg - agg
- agg_router - agg_router
- disagg - disagg
# - disagg_router
name: deploy-test-vllm (${{ matrix.profile }})
env:
FRAMEWORK: vllm
steps:
- name: Checkout code
uses: actions/checkout@08eba0b27e820071cde6df949e0beb9ba4906955 # v4.3.0
- name: Run Dynamo Deploy Test
id: deploy-test
uses: ./.github/actions/dynamo-deploy-test
with:
kubeconfig_base64: ${{ secrets.AZURE_AKS_CI_KUBECONFIG_B64 }}
namespace: ${{ needs.deploy-operator.outputs.NAMESPACE }}
deployment_file: "deploy/${{ matrix.profile }}.yaml"
framework: ${{ env.FRAMEWORK }}
framework_runtime_image: ${{ secrets.AZURE_ACR_HOSTNAME }}/ai-dynamo/dynamo:${{ github.sha }}-vllm-cuda12-amd64
model_name: "Qwen/Qwen3-0.6B"
test_identifier: ${{ env.FRAMEWORK }}_${{ matrix.profile }}_amd64_${{ github.run_id }}_${{ github.run_attempt }}
# For now, this job is separated from the job matrix above for easier flow control handling
# Uncomment the disagg_router matrix entry from the above job and delete the below job
# when we want to run them under the same conditions.
# Current conditions:
# - Run vllm disagg_router on all commits
# - Run rest of jobs only on push to main or manual trigger
deploy-test-vllm-disagg-router:
runs-on: prod-default-small-v2
if: needs.changed-files.outputs.core == 'true' || needs.changed-files.outputs.vllm == 'true' || needs.changed-files.outputs.deploy == 'true'
needs: [changed-files, deploy-operator, vllm-pipeline]
permissions:
contents: read
strategy:
fail-fast: false
max-parallel: 1
matrix:
profile:
- disagg_router - disagg_router
name: deploy-test-vllm (${{ matrix.profile }}) name: deploy-test-vllm (${{ matrix.profile }})
env: env:
...@@ -384,15 +347,14 @@ jobs: ...@@ -384,15 +347,14 @@ jobs:
namespace: ${{ needs.deploy-operator.outputs.NAMESPACE }} namespace: ${{ needs.deploy-operator.outputs.NAMESPACE }}
deployment_file: "deploy/${{ matrix.profile }}.yaml" deployment_file: "deploy/${{ matrix.profile }}.yaml"
framework: ${{ env.FRAMEWORK }} framework: ${{ env.FRAMEWORK }}
framework_runtime_image: ${{ secrets.AZURE_ACR_HOSTNAME }}/ai-dynamo/dynamo:${{ github.sha }}-vllm-cuda12-amd64 profile: ${{ matrix.profile }}
model_name: "Qwen/Qwen3-0.6B" image: ${{ secrets.AZURE_ACR_HOSTNAME }}/ai-dynamo/dynamo:${{ github.sha }}-vllm-cuda12-amd64
test_identifier: ${{ env.FRAMEWORK }}_${{ matrix.profile }}_amd64_${{ github.run_id }}_${{ github.run_attempt }} platform_arch: amd64
deploy-test-sglang: deploy-test-sglang:
runs-on: prod-default-small-v2 runs-on: prod-default-small-v2
# Run if push to main, or manually triggered # Run if core, sglang, or deploy is changed
if: ( github.ref_name == 'main' || github.event.inputs.run_deploy_operator ) if: needs.changed-files.outputs.core == 'true' || needs.changed-files.outputs.sglang == 'true' || needs.changed-files.outputs.deploy == 'true'
needs: [changed-files, deploy-operator, sglang-pipeline] needs: [changed-files, deploy-operator, sglang-pipeline]
permissions: permissions:
contents: read contents: read
...@@ -417,15 +379,14 @@ jobs: ...@@ -417,15 +379,14 @@ jobs:
namespace: ${{ needs.deploy-operator.outputs.NAMESPACE }} namespace: ${{ needs.deploy-operator.outputs.NAMESPACE }}
deployment_file: "deploy/${{ matrix.profile }}.yaml" deployment_file: "deploy/${{ matrix.profile }}.yaml"
framework: ${{ env.FRAMEWORK }} framework: ${{ env.FRAMEWORK }}
framework_runtime_image: ${{ secrets.AZURE_ACR_HOSTNAME }}/ai-dynamo/dynamo:${{ github.sha }}-sglang-cuda12-amd64 profile: ${{ matrix.profile }}
model_name: "Qwen/Qwen3-0.6B" image: ${{ secrets.AZURE_ACR_HOSTNAME }}/ai-dynamo/dynamo:${{ github.sha }}-sglang-cuda12-amd64
test_identifier: ${{ env.FRAMEWORK }}_${{ matrix.profile }}_amd64_${{ github.run_id }}_${{ github.run_attempt }} platform_arch: amd64
deploy-test-trtllm: deploy-test-trtllm:
runs-on: prod-default-small-v2 runs-on: prod-default-small-v2
# Run if push to main, or manually triggered # Run if core, trtllm, or deploy is changed
if: ( github.ref_name == 'main' || github.event.inputs.run_deploy_operator ) if: needs.changed-files.outputs.core == 'true' || needs.changed-files.outputs.trtllm == 'true' || needs.changed-files.outputs.deploy == 'true'
needs: [changed-files, deploy-operator, trtllm-pipeline] needs: [changed-files, deploy-operator, trtllm-pipeline]
permissions: permissions:
contents: read contents: read
...@@ -452,9 +413,9 @@ jobs: ...@@ -452,9 +413,9 @@ jobs:
namespace: ${{ needs.deploy-operator.outputs.NAMESPACE }} namespace: ${{ needs.deploy-operator.outputs.NAMESPACE }}
deployment_file: "deploy/${{ matrix.profile }}.yaml" deployment_file: "deploy/${{ matrix.profile }}.yaml"
framework: ${{ env.FRAMEWORK }} framework: ${{ env.FRAMEWORK }}
framework_runtime_image: ${{ secrets.AZURE_ACR_HOSTNAME }}/ai-dynamo/dynamo:${{ github.sha }}-trtllm-cuda13-amd64 profile: ${{ matrix.profile }}
model_name: "Qwen/Qwen3-0.6B" image: ${{ secrets.AZURE_ACR_HOSTNAME }}/ai-dynamo/dynamo:${{ github.sha }}-trtllm-cuda13-amd64
test_identifier: ${{ env.FRAMEWORK }}_${{ matrix.profile }}_amd64_${{ github.run_id }}_${{ github.run_attempt }} platform_arch: amd64
# ============================================================================ # ============================================================================
# CLEANUP JOBS # CLEANUP JOBS
...@@ -486,7 +447,7 @@ jobs: ...@@ -486,7 +447,7 @@ jobs:
name: Cleanup AKS resources name: Cleanup AKS resources
runs-on: prod-default-small-v2 runs-on: prod-default-small-v2
if: always() if: always()
needs: [deploy-operator, deploy-test-sglang, deploy-test-trtllm, deploy-test-vllm, deploy-test-vllm-disagg-router] needs: [deploy-operator, deploy-test-sglang, deploy-test-trtllm, deploy-test-vllm]
steps: steps:
- name: Checkout code - name: Checkout code
uses: actions/checkout@08eba0b27e820071cde6df949e0beb9ba4906955 # v4.3.0 uses: actions/checkout@08eba0b27e820071cde6df949e0beb9ba4906955 # v4.3.0
......
...@@ -825,11 +825,6 @@ class WorkflowMetricsUploader: ...@@ -825,11 +825,6 @@ class WorkflowMetricsUploader:
job_name = job_data.get("name", "") job_name = job_data.get("name", "")
job_id = str(job_data["id"]) job_id = str(job_data["id"])
# Skip deployment test jobs (No pytest metadata files are created)
if job_name.lower().startswith("deploy"):
print(f"⏭️ Skipping test metrics for deployment job '{job_name}'")
return
print(f"🧪 Looking for test results for job '{job_name}'") print(f"🧪 Looking for test results for job '{job_name}'")
# Determine framework from job name to filter metadata files # Determine framework from job name to filter metadata files
......
...@@ -13,9 +13,14 @@ boto3==1.42.4 ...@@ -13,9 +13,14 @@ boto3==1.42.4
boto3-stubs[s3]==1.42.9 # Type stubs for boto3 S3 client boto3-stubs[s3]==1.42.9 # Type stubs for boto3 S3 client
# For IFEval dataset loading in kvbm tests # For IFEval dataset loading in kvbm tests
datasets==4.4.1 datasets==4.4.1
# For Kubernetes operations in deploy tests
kr8s==0.20.13
kubernetes==32.0.1
kubernetes_asyncio==32.0.0
# For NATS object store verification in router tests # For NATS object store verification in router tests
nats-py==2.12.0 nats-py==2.12.0
psutil<=7.0.0 # System package, may vary by platform (was >=5.0.0) psutil<=7.0.0 # System package, may vary by platform (was >=5.0.0)
pydantic==2.11.7
pyright==1.1.407 pyright==1.1.407
pytest==8.4.2 pytest==8.4.2
pytest-asyncio==1.3.0 pytest-asyncio==1.3.0
...@@ -29,8 +34,13 @@ pytest-mypy==1.0.1 ...@@ -29,8 +34,13 @@ pytest-mypy==1.0.1
pytest-order==1.3.0 pytest-order==1.3.0
pytest-timeout==2.4.0 pytest-timeout==2.4.0
pytest-xdist==3.8.0 pytest-xdist==3.8.0
pyyaml==6.0.3
requests==2.32.5
sniffio==1.3.1
tabulate==0.9.0
# Triton client to Dynamo gRPC server # Triton client to Dynamo gRPC server
tritonclient[grpc]<=2.62.0 # May have platform-specific builds tritonclient[grpc]<=2.62.0 # May have platform-specific builds
# add types library stub for PyYAML # add types library stub for PyYAML
types-PyYAML==6.0.12.20250915 types-PyYAML==6.0.12.20250915
types-requests==2.32.4.20250913 types-requests==2.32.4.20250913
websocket-client==1.9.0
...@@ -241,6 +241,7 @@ markers = [ ...@@ -241,6 +241,7 @@ markers = [
"custom_build: marks tests that require custom builds or special setup (e.g., MoE models)", "custom_build: marks tests that require custom builds or special setup (e.g., MoE models)",
"k8s: marks tests as requiring Kubernetes", "k8s: marks tests as requiring Kubernetes",
"fault_tolerance: marks tests as fault tolerance tests", "fault_tolerance: marks tests as fault tolerance tests",
"deploy: marks tests as deployment tests",
# Built-in markers # Built-in markers
"skip: skip this test", "skip: skip this test",
"skipif: skip if condition is true", "skipif: skip if condition is true",
......
...@@ -60,6 +60,7 @@ def pytest_configure(config): ...@@ -60,6 +60,7 @@ def pytest_configure(config):
"custom_build: marks tests that require custom builds or special setup (e.g., MoE models)", "custom_build: marks tests that require custom builds or special setup (e.g., MoE models)",
"k8s: marks tests as requiring Kubernetes", "k8s: marks tests as requiring Kubernetes",
"fault_tolerance: marks tests as fault tolerance tests", "fault_tolerance: marks tests as fault tolerance tests",
"deploy: marks tests as deployment tests",
# Third-party plugin markers # Third-party plugin markers
"timeout: test timeout in seconds (pytest-timeout plugin)", "timeout: test timeout in seconds (pytest-timeout plugin)",
] ]
...@@ -67,6 +68,37 @@ def pytest_configure(config): ...@@ -67,6 +68,37 @@ def pytest_configure(config):
config.addinivalue_line("markers", marker) config.addinivalue_line("markers", marker)
def pytest_addoption(parser: pytest.Parser) -> None:
"""Add shared command-line options for all tests.
Shared options that apply across multiple test suites are defined here.
Suite-specific options (e.g., deploy, fault-tolerance) are defined in
their respective subdirectory conftest.py files.
"""
# -------------------------------------------------------------------------
# Shared Deployment Options (used by multiple test suites)
# -------------------------------------------------------------------------
parser.addoption(
"--image",
type=str,
default=None,
help="Container image to use for deployment (overrides YAML default)",
)
parser.addoption(
"--namespace",
type=str,
default=None, # No default here - subdirectories provide their own
help="Kubernetes namespace for deployment",
)
parser.addoption(
"--skip-service-restart",
action="store_true",
default=None, # None = use fixture's default behavior
help="Skip restarting NATS and etcd services before deployment. "
"Default: deploy tests skip (for speed), fault-tolerance tests restart (for clean state).",
)
LOG_FORMAT = "[TEST] %(asctime)s %(levelname)s %(name)s: %(message)s" LOG_FORMAT = "[TEST] %(asctime)s %(levelname)s %(name)s: %(message)s"
DATE_FORMAT = "%Y-%m-%dT%H:%M:%S" DATE_FORMAT = "%Y-%m-%dT%H:%M:%S"
......
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Pytest configuration for deployment tests.
This module provides dynamic test discovery and fixtures for running deployment tests
against Kubernetes deployments. This currently only covers deployments in the examples directory.
"""
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional
import pytest
from tests.utils.managed_deployment import DeploymentSpec, _get_workspace_dir
# Shared CLI options (--image, --namespace, --skip-service-restart) are defined in tests/conftest.py.
# Only deploy-specific options are defined here.
def pytest_addoption(parser: pytest.Parser) -> None:
"""Add deploy-specific command-line options.
These options control which deployment configurations are tested.
Shared options (--image, --namespace, --skip-service-restart) are
defined in tests/conftest.py.
"""
parser.addoption(
"--framework",
type=str,
default=None,
help="Framework to test (e.g., vllm, sglang, trtllm). "
"If not specified, runs all discovered frameworks.",
)
parser.addoption(
"--profile",
type=str,
default=None,
help="Deployment profile to test (e.g., agg, disagg, disagg_router). "
"If not specified, runs all profiles for the selected framework.",
)
@dataclass(frozen=True)
class DeploymentTarget:
"""Represents a deployment configuration to be tested.
Attributes:
yaml_path: Absolute path to the deployment YAML file
framework: The inference framework (vllm, sglang, trtllm, etc.)
profile: The deployment profile name (agg, disagg, etc.)
source: Where this target came from (e.g., examples)
"""
yaml_path: Path
framework: str
profile: str
source: str = "examples"
@property
def test_id(self) -> str:
"""Generate a unique, readable test ID for pytest parametrization."""
return f"{self.framework}-{self.profile}"
def exists(self) -> bool:
"""Check if the deployment YAML file exists."""
return self.yaml_path.exists()
def discover_example_targets(
workspace: Optional[Path] = None,
) -> List[DeploymentTarget]:
"""Discover deployment targets from examples/backends/{framework}/deploy/*.yaml.
This function scans the examples directory for deployment YAML files.
Files in subdirectories (e.g., lora/) are excluded.
Args:
workspace: Workspace root directory. If None, auto-detected.
Returns:
List of DeploymentTarget objects for each discovered deployment.
"""
if workspace is None:
workspace = Path(_get_workspace_dir())
backends_dir = workspace / "examples" / "backends"
targets: List[DeploymentTarget] = []
if not backends_dir.exists():
return targets
for framework_dir in backends_dir.iterdir():
if not framework_dir.is_dir():
continue
deploy_dir = framework_dir / "deploy"
if not deploy_dir.exists():
continue
framework_name = framework_dir.name
for yaml_file in deploy_dir.glob("*.yaml"):
# Only include files directly in deploy/, not in subdirectories
if yaml_file.parent != deploy_dir:
continue
profile_name = yaml_file.stem
targets.append(
DeploymentTarget(
yaml_path=yaml_file,
framework=framework_name,
profile=profile_name,
source="examples",
)
)
return targets
def _collect_all_targets() -> List[DeploymentTarget]:
"""Collect deployment targets from all sources.
Returns:
List of all deployment targets, sorted for consistent test ordering.
"""
targets: List[DeploymentTarget] = []
# Discover from examples
targets.extend(discover_example_targets())
# Sort for consistent test ordering
return sorted(targets, key=lambda t: (t.source, t.framework, t.profile))
def _build_test_matrix(targets: List[DeploymentTarget]) -> Dict[str, List[str]]:
"""Build a framework -> profiles mapping for CLI validation.
This preserves backward compatibility with the existing CLI interface
that validates --framework and --profile options.
Args:
targets: List of deployment targets to index
Returns:
Dictionary mapping framework names to lists of profile names.
"""
matrix: Dict[str, List[str]] = {}
for target in targets:
if target.framework not in matrix:
matrix[target.framework] = []
if target.profile not in matrix[target.framework]:
matrix[target.framework].append(target.profile)
# Sort profiles within each framework
for framework in matrix:
matrix[framework] = sorted(matrix[framework])
return matrix
# Discover all targets and build matrix at module load time for test collection
ALL_DEPLOYMENT_TARGETS = _collect_all_targets()
DEPLOY_TEST_MATRIX = _build_test_matrix(ALL_DEPLOYMENT_TARGETS)
def _filter_targets(
targets: List[DeploymentTarget],
framework: Optional[str] = None,
profile: Optional[str] = None,
) -> List[DeploymentTarget]:
"""Filter deployment targets based on CLI options.
Args:
targets: List of targets to filter
framework: Optional framework filter
profile: Optional profile filter
Returns:
Filtered list of targets
"""
result = targets
if framework:
result = [t for t in result if t.framework == framework]
if profile:
result = [t for t in result if t.profile == profile]
return result
def _find_target(
framework: str, profile: str, targets: List[DeploymentTarget]
) -> Optional[DeploymentTarget]:
"""Find a specific deployment target by framework and profile.
Args:
framework: Framework name to match
profile: Profile name to match
targets: List of targets to search
Returns:
Matching DeploymentTarget or None if not found
"""
for target in targets:
if target.framework == framework and target.profile == profile:
return target
return None
def pytest_generate_tests(metafunc: pytest.Metafunc) -> None:
"""Dynamically parametrize tests based on CLI options or full matrix.
If --framework and --profile are specified, runs only that combination.
Otherwise, generates tests for the full matrix of discovered deployments.
The test receives both the DeploymentTarget and individual parameters
(framework, profile) for backward compatibility and readable test output.
"""
if "deployment_target" not in metafunc.fixturenames:
return
framework_opt = metafunc.config.getoption("--framework")
profile_opt = metafunc.config.getoption("--profile")
# Filter targets based on CLI options
filtered_targets = _filter_targets(
ALL_DEPLOYMENT_TARGETS,
framework=framework_opt,
profile=profile_opt,
)
# Validate that requested combination exists
if framework_opt and profile_opt and not filtered_targets:
if framework_opt not in DEPLOY_TEST_MATRIX:
pytest.skip(f"Framework '{framework_opt}' not found in discovered profiles")
return
if profile_opt not in DEPLOY_TEST_MATRIX.get(framework_opt, []):
pytest.skip(
f"Profile '{profile_opt}' not found for framework '{framework_opt}'"
)
return
# Build parametrization
if filtered_targets:
metafunc.parametrize(
"deployment_target",
filtered_targets,
ids=[t.test_id for t in filtered_targets],
)
@pytest.fixture
def image(request: pytest.FixtureRequest) -> Optional[str]:
"""Get custom container image from CLI option."""
return request.config.getoption("--image")
@pytest.fixture
def namespace(request: pytest.FixtureRequest) -> str:
"""Get Kubernetes namespace from CLI option."""
return request.config.getoption("--namespace")
@pytest.fixture
def skip_service_restart(request: pytest.FixtureRequest) -> bool:
"""Whether to skip restarting NATS and etcd services.
Deploy tests default to SKIPPING restart (for speed).
The --skip-service-restart flag can override this behavior.
Returns:
If --skip-service-restart is passed: True (skip restart)
If flag not passed: True (deploy tests skip by default)
"""
value = request.config.getoption("--skip-service-restart")
return value if value is not None else True # Default: skip for deploy tests
@pytest.fixture
def framework(deployment_target: DeploymentTarget) -> str:
"""Extract framework from deployment target for backward compatibility."""
return deployment_target.framework
@pytest.fixture
def profile(deployment_target: DeploymentTarget) -> str:
"""Extract profile from deployment target for backward compatibility."""
return deployment_target.profile
@pytest.fixture
def deployment_yaml(deployment_target: DeploymentTarget) -> Path:
"""Get the path to deployment YAML file from the target.
This fixture validates that the YAML file exists before returning.
"""
yaml_path = deployment_target.yaml_path
if not yaml_path.exists():
pytest.fail(f"Deployment YAML not found: {yaml_path}")
return yaml_path
@pytest.fixture
def deployment_spec(
deployment_yaml: Path,
image: Optional[str],
namespace: str,
) -> DeploymentSpec:
"""Create DeploymentSpec from YAML with optional image override.
Args:
deployment_yaml: Path to the deployment YAML file
image: Optional container image override
namespace: Kubernetes namespace for deployment
Returns:
Configured DeploymentSpec ready for deployment
"""
spec = DeploymentSpec(str(deployment_yaml))
# Set namespace
spec.namespace = namespace
# Override image if provided
if image:
spec.set_image(image)
return spec
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Deployment tests for Kubernetes-based LLM deployments.
These tests verify that deployments can be created, become ready, and respond
to chat completion requests correctly.
"""
import logging
from typing import Any, Dict
import pytest
import requests
from tests.deploy.conftest import DeploymentTarget
from tests.utils.client import send_request, wait_for_model_availability
from tests.utils.managed_deployment import DeploymentSpec, ManagedDeployment
logger = logging.getLogger(__name__)
# Test prompt designed to validate model capabilities:
# - Long enough to test context handling (multiple sentences, ~150 words)
# - Descriptive content requiring multi-sentence responses
# - Consistent across test runs for reproducibility
# This prompt is maintained from the original shell-based deployment tests.
TEST_PROMPT = """In the heart of Eldoria, an ancient land of boundless magic and mysterious creatures, \
lies the long-forgotten city of Aeloria. Once a beacon of knowledge and power, Aeloria was buried \
beneath the shifting sands of time, lost to the world for centuries. You are an intrepid explorer, \
known for your unparalleled curiosity and courage, who has stumbled upon an ancient map hinting at \
the city's location. Your journey will take you through treacherous deserts, enchanted forests, \
and across perilous mountain ranges. Describe your first steps into the ruins of Aeloria."""
DEFAULT_MAX_TOKENS = 30
DEFAULT_TEMPERATURE = 0.0
DEFAULT_REQUEST_TIMEOUT = 120
# Minimum response content length to validate that the model is generating meaningful output.
# This matches the validation threshold from the original shell-based deployment tests.
MIN_RESPONSE_CONTENT_LENGTH = 100
def validate_chat_response(
response: requests.Response,
expected_model: str,
min_content_length: int = MIN_RESPONSE_CONTENT_LENGTH,
) -> Dict[str, Any]:
"""Validate the structure and content of a chat completion response.
Args:
response: HTTP response from the chat completion endpoint
expected_model: Expected model name in the response
min_content_length: Minimum required length for response content
Returns:
Parsed response JSON on success
Raises:
AssertionError: If validation fails
"""
# Check HTTP status
assert response.status_code == 200, (
f"Expected status 200, got {response.status_code}. "
f"Response: {response.text[:500]}"
)
try:
data = response.json()
except ValueError as e:
pytest.fail(f"Response is not valid JSON: {e}. Response: {response.text[:500]}")
assert "choices" in data, f"Response missing 'choices' field: {data}"
assert len(data["choices"]) > 0, f"Response has empty 'choices': {data}"
choice = data["choices"][0]
assert "message" in choice, f"Choice missing 'message' field: {choice}"
message = choice["message"]
assert (
message.get("role") == "assistant"
), f"Expected role 'assistant', got '{message.get('role')}'"
assert "content" in message, f"Message missing 'content' field: {message}"
content = message["content"]
assert len(content) >= min_content_length, (
f"Response content too short: {len(content)} chars (min: {min_content_length}). "
f"Content: {content[:200]}"
)
assert "model" in data, f"Response missing 'model' field: {data}"
assert (
data["model"] == expected_model
), f"Expected model '{expected_model}', got '{data['model']}'"
logger.info(
f"Response validation passed: model={data['model']}, "
f"content_length={len(content)}"
)
return data
@pytest.mark.k8s
@pytest.mark.deploy
@pytest.mark.post_merge
@pytest.mark.e2e
async def test_deployment(
deployment_target: DeploymentTarget,
deployment_spec: DeploymentSpec,
namespace: str,
skip_service_restart: bool,
request,
) -> None:
"""Test Kubernetes deployment end-to-end.
This test:
1. Deploys the specified configuration to Kubernetes
2. Waits for all pods to become ready
3. Port-forwards to the frontend service
4. Waits for the model to be available
5. Sends a test chat completion request
6. Validates the response structure and content
Args:
deployment_target: The deployment target containing path and metadata
deployment_spec: Configured DeploymentSpec from fixture
namespace: Kubernetes namespace for the deployment
skip_service_restart: Whether to skip restarting NATS/etcd services (default: True).
Use --restart-services flag to restart services before deployment.
request: Pytest request object for accessing test metadata
"""
# Extract identifying information from the target
framework = deployment_target.framework
profile = deployment_target.profile
model = next((s.model for s in deployment_spec.services if s.model), None)
if not model:
pytest.fail(
f"Could not determine model name from deployment spec for "
f"{framework}/{profile}"
)
logger.info(
f"Starting deployment test for {deployment_target.test_id} "
f"(source: {deployment_target.source}, model: {model}, namespace: {namespace})"
)
logger.info(f"Log directory: {request.node.name}")
# Deploy and test
async with ManagedDeployment(
log_dir=request.node.name,
deployment_spec=deployment_spec,
namespace=namespace,
skip_service_restart=skip_service_restart,
) as deployment:
# Get frontend pod for port forwarding
frontend_pods = deployment.get_pods([deployment.frontend_service_name])
frontend_pod_list = frontend_pods.get(deployment.frontend_service_name, [])
assert (
len(frontend_pod_list) > 0
), f"No frontend pods found for deployment {deployment_spec.name}"
frontend_pod = frontend_pod_list[0]
logger.info(f"Found frontend pod: {frontend_pod.name}")
# Setup port forwarding
port = deployment_spec.port
port_forward = deployment.port_forward(frontend_pod, port)
assert (
port_forward is not None
), f"Failed to establish port forward to {frontend_pod.name}:{port}"
base_url = f"http://localhost:{port_forward.local_port}"
logger.info(f"Port forwarding established: {base_url}")
# Wait for model to be available
endpoint = deployment_spec.endpoint
model_ready = wait_for_model_availability(
url=base_url,
endpoint=endpoint,
model=model,
logger=logger,
max_attempts=30,
)
assert (
model_ready
), f"Model '{model}' did not become available within the timeout period"
# Send test request
url = f"{base_url}{endpoint}"
payload = {
"model": model,
"messages": [{"role": "user", "content": TEST_PROMPT}],
"max_tokens": DEFAULT_MAX_TOKENS,
"temperature": DEFAULT_TEMPERATURE,
"stream": False,
}
response = send_request(
url, payload, timeout=float(DEFAULT_REQUEST_TIMEOUT), method="POST"
)
# Validate response
validate_chat_response(
response=response,
expected_model=model,
min_content_length=MIN_RESPONSE_CONTENT_LENGTH,
)
logger.info(
f"Deployment test PASSED for {deployment_target.test_id} "
f"(source: {deployment_target.source}, model: {model}, namespace: {namespace})"
)
...@@ -24,9 +24,9 @@ import time ...@@ -24,9 +24,9 @@ import time
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional, Tuple
import requests
from kr8s.objects import Pod from kr8s.objects import Pod
from tests.utils.client import wait_for_model_availability
from tests.utils.managed_deployment import ManagedDeployment from tests.utils.managed_deployment import ManagedDeployment
LOG_FORMAT = "[TEST] %(asctime)s %(levelname)s %(name)s: %(message)s" LOG_FORMAT = "[TEST] %(asctime)s %(levelname)s %(name)s: %(message)s"
...@@ -110,80 +110,7 @@ def get_frontend_port( ...@@ -110,80 +110,7 @@ def get_frontend_port(
return pod_name, port, selected_pod return pod_name, port, selected_pod
def wait_for_model_availability( # wait_for_model_availability has been moved to tests.utils.client
url: str,
endpoint: str,
model: str,
logger: logging.Logger,
max_attempts: int = 15,
attempt_timeouts: Optional[List[float]] = None,
) -> bool:
"""
Wait for model to be available before running AI-Perf.
Args:
url: Base URL for the service
endpoint: API endpoint path
model: Model name to test
logger: Logger instance
max_attempts: Maximum number of attempts to check availability
attempt_timeouts: List of timeout values for each attempt
Returns:
True if model is available, False otherwise
"""
if attempt_timeouts is None:
# Default: Start with 60s timeout, then gradually decrease
attempt_timeouts = [60, 60, 45, 30, 30, 20, 20, 15, 15, 15, 10, 10, 10, 10, 10]
test_url = f"{url}{endpoint}"
for attempt in range(max_attempts):
try:
test_payload = {
"model": model,
"messages": [{"role": "user", "content": "test"}],
"max_tokens": 1,
"stream": False,
}
timeout_val = attempt_timeouts[min(attempt, len(attempt_timeouts) - 1)]
logger.info(
f"Testing model availability at {test_url} (attempt {attempt+1}/{max_attempts}, timeout={timeout_val}s)"
)
response = requests.post(test_url, json=test_payload, timeout=timeout_val)
if response.status_code == 200:
logger.info(f"Model '{model}' is available and responding")
# Give a bit more time for stabilization
logger.info("Model ready, waiting 5s for stabilization...")
time.sleep(5)
return True
elif response.status_code == 404:
logger.warning(
f"Model '{model}' not found (404). Response: {response.text[:200]}"
)
elif response.status_code == 400:
logger.warning(f"Bad request (400). Response: {response.text[:200]}")
else:
logger.warning(
f"Unexpected status code {response.status_code}: {response.text[:200]}"
)
except requests.Timeout as e:
logger.warning(
f"Model availability test timed out (attempt {attempt+1}): {e}"
)
except Exception as e:
logger.warning(f"Model availability test failed (attempt {attempt+1}): {e}")
if attempt < max_attempts - 1:
wait_time = 10 if attempt < 5 else 5
logger.info(f"Waiting {wait_time}s before retry...")
time.sleep(wait_time)
logger.warning("Could not confirm model availability after all attempts")
return False
def validate_aiperf_results( def validate_aiperf_results(
......
...@@ -18,9 +18,9 @@ import pytest ...@@ -18,9 +18,9 @@ import pytest
from tests.fault_tolerance.deploy.scenarios import scenarios from tests.fault_tolerance.deploy.scenarios import scenarios
# Shared CLI options (--image, --namespace, --skip-service-restart) are defined in tests/conftest.py.
# Only fault_tolerance-specific options are defined here.
def pytest_addoption(parser): def pytest_addoption(parser):
parser.addoption("--image", type=str, default=None)
parser.addoption("--namespace", type=str, default="fault-tolerance-test")
parser.addoption( parser.addoption(
"--client-type", "--client-type",
type=str, type=str,
...@@ -35,13 +35,6 @@ def pytest_addoption(parser): ...@@ -35,13 +35,6 @@ def pytest_addoption(parser):
help="Include tests that require custom builds (e.g., MoE models). " help="Include tests that require custom builds (e.g., MoE models). "
"By default, these tests are excluded.", "By default, these tests are excluded.",
) )
parser.addoption(
"--skip-service-restart",
action="store_true",
default=False,
help="Skip restarting NATS and etcd services before deployment. "
"By default, these services are restarted.",
)
def pytest_generate_tests(metafunc): def pytest_generate_tests(metafunc):
...@@ -109,7 +102,9 @@ def image(request): ...@@ -109,7 +102,9 @@ def image(request):
@pytest.fixture @pytest.fixture
def namespace(request): def namespace(request):
return request.config.getoption("--namespace") """Get Kubernetes namespace from CLI option, with fault-tolerance-specific default."""
value = request.config.getoption("--namespace")
return value if value is not None else "fault-tolerance-test"
@pytest.fixture @pytest.fixture
...@@ -120,5 +115,14 @@ def client_type(request): ...@@ -120,5 +115,14 @@ def client_type(request):
@pytest.fixture @pytest.fixture
def skip_service_restart(request): def skip_service_restart(request):
"""Get skip restart services flag from command line.""" """Whether to skip restarting NATS and etcd services.
return request.config.getoption("--skip-service-restart")
Fault tolerance tests default to RESTARTING services (for clean state).
The --skip-service-restart flag can override this behavior.
Returns:
If --skip-service-restart is passed: True (skip restart)
If flag not passed: False (FT tests restart by default)
"""
value = request.config.getoption("--skip-service-restart")
return value if value is not None else False # Default: restart for FT tests
...@@ -143,3 +143,83 @@ def send_request( ...@@ -143,3 +143,83 @@ def send_request(
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
logger.error("Request failed: %s", e) logger.error("Request failed: %s", e)
raise raise
def wait_for_model_availability(
url: str,
endpoint: str,
model: str,
logger: logging.Logger,
max_attempts: int = 15,
attempt_timeouts: list[float] | None = None,
) -> bool:
"""
Wait for model to be available by sending test requests.
Polls the specified endpoint with test requests until the model responds
successfully or max attempts are reached. Used to ensure a deployed model
is ready before running tests.
Args:
url: Base URL for the service (e.g., "http://localhost:8000")
endpoint: API endpoint path (e.g., "/v1/chat/completions")
model: Model name to test
logger: Logger instance for output
max_attempts: Maximum number of attempts to check availability (default: 15)
attempt_timeouts: List of timeout values for each attempt (default: decreasing from 60s)
Returns:
True if model is available and responding, False otherwise
"""
if attempt_timeouts is None:
# Default: Start with 60s timeout, then gradually decrease
attempt_timeouts = [60, 60, 45, 30, 30, 20, 20, 15, 15, 15, 10, 10, 10, 10, 10]
test_url = f"{url}{endpoint}"
for attempt in range(max_attempts):
try:
test_payload = {
"model": model,
"messages": [{"role": "user", "content": "test"}],
"max_tokens": 1,
"stream": False,
}
timeout_val = attempt_timeouts[min(attempt, len(attempt_timeouts) - 1)]
logger.debug(
f"Testing model availability at {test_url} (attempt {attempt+1}/{max_attempts}, timeout={timeout_val}s)"
)
response = requests.post(test_url, json=test_payload, timeout=timeout_val)
if response.status_code == 200:
logger.info(f"Model '{model}' is available and responding")
# Give a bit more time for stabilization
logger.info("Model ready, waiting 5s for stabilization...")
time.sleep(5)
return True
elif response.status_code == 404:
logger.warning(
f"Model '{model}' not found (404). Response: {response.text[:200]}"
)
elif response.status_code == 400:
logger.warning(f"Bad request (400). Response: {response.text[:200]}")
else:
logger.warning(
f"Unexpected status code {response.status_code}: {response.text[:200]}"
)
except requests.Timeout as e:
logger.warning(
f"Model availability test timed out (attempt {attempt+1}): {e}"
)
except Exception as e:
logger.warning(f"Model availability test failed (attempt {attempt+1}): {e}")
if attempt < max_attempts - 1:
wait_time = 10 if attempt < 5 else 5
logger.info(f"Waiting {wait_time}s before retry...")
time.sleep(wait_time)
logger.warning("Could not confirm model availability after all attempts")
return False
...@@ -501,7 +501,8 @@ class ManagedDeployment: ...@@ -501,7 +501,8 @@ class ManagedDeployment:
_in_cluster: bool = False _in_cluster: bool = False
_logger: logging.Logger = logging.getLogger() _logger: logging.Logger = logging.getLogger()
_port_forward: Optional[Any] = None _port_forward: Optional[Any] = None
_deployment_name: Optional[str] = None # Initialized from deployment_spec.name in __post_init__; placeholder needed for dataclass ordering
_deployment_name: str = field(default="")
_apps_v1: Optional[Any] = None _apps_v1: Optional[Any] = None
_active_port_forwards: List[Any] = field(default_factory=list) _active_port_forwards: List[Any] = field(default_factory=list)
...@@ -509,14 +510,38 @@ class ManagedDeployment: ...@@ -509,14 +510,38 @@ class ManagedDeployment:
self._deployment_name = self.deployment_spec.name self._deployment_name = self.deployment_spec.name
async def _init_kubernetes(self): async def _init_kubernetes(self):
"""Initialize kubernetes client""" """Initialize kubernetes client.
try:
# Try in-cluster config first (for pods with service accounts) Priority order:
config.load_incluster_config() 1. KUBECONFIG environment variable (CI scenario with proper RBAC)
self._in_cluster = True 2. In-cluster config (for pods without explicit kubeconfig)
except Exception: 3. Default kubeconfig (~/.kube/config)
# Fallback to kube config file (for local development) """
await config.load_kube_config() kubeconfig_path = os.environ.get("KUBECONFIG")
if kubeconfig_path and os.path.exists(kubeconfig_path):
# Explicit kubeconfig provided (CI scenario) - use it first
self._logger.info(f"Loading kubeconfig from KUBECONFIG: {kubeconfig_path}")
await config.load_kube_config(config_file=kubeconfig_path)
self._in_cluster = False
self._logger.info("Successfully loaded kubeconfig from KUBECONFIG")
else:
try:
# Try in-cluster config (for pods without explicit kubeconfig)
self._logger.info("Attempting in-cluster kubernetes config")
config.load_incluster_config()
self._in_cluster = True
self._logger.info("Successfully loaded in-cluster kubernetes config")
except Exception as e:
# Fallback to default kube config file (for local development)
self._logger.warning(
f"In-cluster config failed ({type(e).__name__}: {e}), "
f"falling back to default kubeconfig (~/.kube/config)"
)
await config.load_kube_config()
self._in_cluster = False
self._logger.info("Successfully loaded default kubeconfig")
k8s_client = client.ApiClient() k8s_client = client.ApiClient()
self._custom_api = client.CustomObjectsApi(k8s_client) self._custom_api = client.CustomObjectsApi(k8s_client)
self._core_api = client.CoreV1Api(k8s_client) self._core_api = client.CoreV1Api(k8s_client)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment