Unverified Commit 273252e6 authored by Biswa Panda's avatar Biswa Panda Committed by GitHub
Browse files

feat(frontend): three-layer frontend perf sweep with local and k8s support (#7700)

parent 023a299c
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
LocalExecutor -- wraps run_perf.sh for local sweep execution.
This executor delegates each run to run_perf.sh, which handles service
lifecycle (mocker + frontend), observability captures, and aiperf load.
"""
from __future__ import annotations
import json
import os
import signal
import socket
import subprocess
import time
from pathlib import Path
from typing import Optional
from sweep_core.models import DeployDimension, RunResult, RunSpec, SweepConfig
SCRIPT_DIR = Path(__file__).resolve().parent.parent
class LocalExecutor:
"""Executor that delegates runs to run_perf.sh."""
def __init__(self) -> None:
self._config: Optional[SweepConfig] = None
self._frontend_port: int = 8000
def prepare(self, config: SweepConfig) -> None:
"""Store config for use during runs."""
self._config = config
self._frontend_port = 8000 # local mode always uses 8000
def apply_deploy(
self,
deploy: DeployDimension,
prev: Optional[DeployDimension],
) -> None:
"""In local mode, run_perf.sh handles its own service lifecycle.
We just wait for the port to be free from the previous run.
"""
_wait_port_free(self._frontend_port)
def execute_run(self, run_spec: RunSpec, run_dir: Path) -> RunResult:
"""Execute a single run via run_perf.sh."""
if self._config is None:
raise RuntimeError("prepare() must be called before execute_run()")
config = self._config
deploy = run_spec.deploy
aiperf = run_spec.aiperf
result = RunResult(run_spec=run_spec, run_dir=str(run_dir))
cmd = [
str(SCRIPT_DIR / "run_perf.sh"),
"--model",
config.model,
"--isl",
str(aiperf.isl),
"--osl",
str(aiperf.osl),
"--concurrency",
str(aiperf.concurrency),
"--workers",
str(deploy.workers),
"--speedup-ratio",
str(config.speedup_ratio),
"--num-models",
str(deploy.num_models),
"--aiperf-targets",
config.aiperf_targets,
"--output-dir",
str(run_dir),
]
if aiperf.benchmark_duration:
cmd.extend(["--benchmark-duration", str(aiperf.benchmark_duration)])
if aiperf.num_requests:
cmd.extend(["--num-requests", str(aiperf.num_requests)])
if aiperf.request_rate:
cmd.extend(["--request-rate", str(aiperf.request_rate)])
if deploy.tokenizer in ("fast", "fastokens"):
cmd.append("--fast-tokens")
# TODO: when run_perf.sh gains --backend vllm support, pass it here
if deploy.backend == "vllm":
print(
" WARNING: vllm backend not yet supported by run_perf.sh; using mocker"
)
# Passthrough args (e.g., --skip-bpf --skip-nsys)
cmd.extend(config.passthrough_args)
print(f" cmd: {' '.join(cmd[:6])}...")
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
start_new_session=True,
)
stdout, _ = proc.communicate(timeout=600)
if proc.returncode == 0:
result.status = "ok"
else:
result.status = "fail"
print(f" run_perf.sh failed (rc={proc.returncode})")
lines = (stdout or "").strip().split("\n")
for line in lines[-5:]:
print(f" {line}")
except subprocess.TimeoutExpired:
result.status = "fail"
print(" TIMEOUT after 600s")
try:
pgid = os.getpgid(proc.pid)
os.killpg(pgid, signal.SIGTERM)
time.sleep(2)
os.killpg(pgid, signal.SIGKILL)
except ProcessLookupError:
pass
except Exception as e:
result.status = "fail"
print(f" ERROR: {e}")
# Parse aiperf results
_parse_aiperf_into_result(result, run_dir)
return result
def cleanup(self) -> None:
"""No persistent state to clean up in local mode."""
pass
# ── Helpers ──────────────────────────────────────────────────────────────────
def _parse_aiperf_json(json_path: Path) -> dict:
"""Parse aiperf profile_export_aiperf.json."""
if not json_path.exists():
return {}
try:
data = json.loads(json_path.read_text())
result = {}
rt = data.get("request_throughput", {})
result["req_per_sec"] = rt.get("avg", 0)
ot = data.get("output_token_throughput", {})
result["output_tok_per_sec"] = ot.get("avg", 0)
ttft = data.get("time_to_first_token", data.get("ttft", {}))
if isinstance(ttft, dict):
result["ttft_p50_ms"] = ttft.get("p50", 0) or 0
result["ttft_p99_ms"] = ttft.get("p99", 0) or 0
itl = data.get("inter_token_latency", data.get("itl", {}))
if isinstance(itl, dict):
result["itl_p50_ms"] = itl.get("p50", 0) or 0
result["itl_p99_ms"] = itl.get("p99", 0) or 0
bd = data.get("benchmark_duration", 0)
result["duration_sec"] = bd.get("avg", 0) if isinstance(bd, dict) else (bd or 0)
return result
except (json.JSONDecodeError, KeyError, TypeError):
return {}
def _parse_aiperf_into_result(result: RunResult, run_dir: Path) -> None:
"""Parse aiperf results from the run directory into the RunResult."""
aiperf_json = run_dir / "aiperf" / "profile_export_aiperf.json"
if not aiperf_json.exists():
# Multi-model: results are in aiperf/<model-name>/
for candidate in sorted(
(run_dir / "aiperf").glob("*/profile_export_aiperf.json")
):
aiperf_json = candidate
break
metrics = _parse_aiperf_json(aiperf_json)
if metrics:
result.req_per_sec = metrics.get("req_per_sec", 0)
result.output_tok_per_sec = metrics.get("output_tok_per_sec", 0)
result.ttft_p50_ms = metrics.get("ttft_p50_ms", 0)
result.ttft_p99_ms = metrics.get("ttft_p99_ms", 0)
result.itl_p50_ms = metrics.get("itl_p50_ms", 0)
result.itl_p99_ms = metrics.get("itl_p99_ms", 0)
result.duration_sec = metrics.get("duration_sec", 0)
def _port_free(port: int) -> bool:
"""Check if a port is free."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(("127.0.0.1", port)) != 0
def _kill_port(port: int) -> None:
"""Kill any process holding a port."""
subprocess.run(
f"fuser -k -TERM {port}/tcp", shell=True, capture_output=True, timeout=5
)
time.sleep(2)
subprocess.run(
f"fuser -k -KILL {port}/tcp", shell=True, capture_output=True, timeout=5
)
def _wait_port_free(port: int, timeout: int = 30) -> None:
"""Wait for a port to become free."""
for i in range(timeout):
if _port_free(port):
return
if i == 0:
print(f" Waiting for port {port} to free...")
time.sleep(1)
print(f" Forcing port {port} release...")
_kill_port(port)
time.sleep(2)
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""sweep_k8s -- Kubernetes subprocess helpers for DGD-based sweeps."""
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
K8s aiperf Job launcher.
Runs aiperf as a k8s Job inside the same namespace as the DGD, using the
in-cluster service DNS endpoint. Uses python:3.12-slim with pip-installed
aiperf (same pattern as recipes/qwen3-235b-a22b-fp8/trtllm/agg/perf.yaml).
Artifacts are written inside the pod, then copied back to the local host
via kubectl cp.
"""
from __future__ import annotations
import json
import subprocess
import time
from pathlib import Path
from typing import Optional
from sweep_k8s.kubectl import run_kubectl
DEFAULT_HF_TOKEN_SECRET_NAME = "hf-token-secret"
def _build_aiperf_script(
model_name: str,
endpoint: str,
concurrency: int,
isl: int,
osl: int = 256,
benchmark_duration: Optional[int] = None,
num_requests: Optional[int] = None,
request_rate: Optional[int] = None,
warmup_duration: Optional[int] = None,
warmup_count: Optional[int] = None,
export_level: str = "summary",
) -> str:
"""Build the shell script that runs inside the Job container."""
# Build load-control args
load_args = ""
if benchmark_duration:
load_args += f" --benchmark-duration {benchmark_duration}"
if num_requests:
load_args += f" --request-count {num_requests}"
if request_rate:
load_args += f" --request-rate {request_rate}"
if not load_args.strip():
auto_count = max(concurrency * 20, 640)
load_args = f" --request-count {auto_count}"
# Warmup args
warmup_args = ""
if warmup_duration:
warmup_args = f" --warmup-duration {warmup_duration}"
elif warmup_count:
warmup_args = f" --warmup-request-count {warmup_count}"
else:
warmup_args = f" --warmup-request-count {concurrency}"
return f"""set -e
apt-get update -qq && apt-get install -y -qq curl jq git procps 2>/dev/null
pip install --quiet git+https://github.com/ai-dynamo/aiperf.git@54cd6dc820bff8bfebc875da104e59d745e14f75
echo "aiperf installed"
# Wait for model
echo "Waiting for model '{model_name}' at http://{endpoint}/v1/models..."
while ! curl -sf "http://{endpoint}/v1/models" 2>/dev/null | \\
jq -e --arg m "{model_name}" '.data[]? | select(.id == $m)' >/dev/null 2>&1; do
echo " Model not ready, sleeping 5s..."
sleep 5
done
echo "Model ready!"
# Write artifacts to PVC so they persist after pod completion
ARTIFACT_DIR="${{ARTIFACT_PVC_DIR:-/model-cache/perf/${{JOB_NAME}}}}"
mkdir -p "$ARTIFACT_DIR"
echo "Running aiperf: c={concurrency} isl={isl} osl={osl}"
echo "Artifact dir: $ARTIFACT_DIR"
aiperf profile \\
--artifact-dir "$ARTIFACT_DIR" \\
--model "{model_name}" \\
--tokenizer "{model_name}" \\
--endpoint-type chat \\
--endpoint /v1/chat/completions \\
--streaming \\
--url "http://{endpoint}" \\
--synthetic-input-tokens-mean {isl} \\
--synthetic-input-tokens-stddev 0 \\
--output-tokens-mean {osl} \\
--output-tokens-stddev 0 \\
--extra-inputs "max_tokens:{osl}" \\
--extra-inputs "min_tokens:{osl}" \\
--extra-inputs "ignore_eos:true" \\
--extra-inputs "repetition_penalty:1.0" \\
--extra-inputs "temperature:0.0" \\
--concurrency {concurrency} \\
{load_args.strip()} \\
{warmup_args.strip()} \\
--num-dataset-entries 12800 \\
--random-seed 100 \\
--workers-max {concurrency} \\
--record-processors 32 \\
--export-level {export_level} \\
--ui simple
echo "aiperf done. Artifacts:"
ls -la "$ARTIFACT_DIR"/
"""
def _indent(text: str, spaces: int) -> str:
"""Indent each line of text by N spaces."""
prefix = " " * spaces
return "\n".join(prefix + line for line in text.split("\n"))
def _build_job_yaml(
job_name: str,
namespace: str,
script: str,
image_pull_secret: str = "",
hf_token_secret_name: str = DEFAULT_HF_TOKEN_SECRET_NAME,
) -> str:
"""Build the aiperf k8s Job YAML.
Uses python:3.12-slim with pip-installed aiperf (same pattern as
recipes/qwen3-235b-a22b-fp8/trtllm/agg/perf.yaml).
"""
image_pull_secret_block = ""
if image_pull_secret:
image_pull_secret_block = f"""
imagePullSecrets:
- name: {image_pull_secret}"""
return f"""apiVersion: batch/v1
kind: Job
metadata:
name: {job_name}
namespace: {namespace}
labels:
app: sweep-aiperf
spec:
backoffLimit: 0
completions: 1
parallelism: 1
ttlSecondsAfterFinished: 600
template:
metadata:
labels:
app: sweep-aiperf
job-name: {job_name}
spec:
restartPolicy: Never
{image_pull_secret_block}
securityContext:
sysctls:
- name: net.ipv4.ip_local_port_range
value: "1024 65000"
containers:
- name: aiperf
image: python:3.12-slim
imagePullPolicy: IfNotPresent
securityContext:
allowPrivilegeEscalation: false
command:
- /bin/bash
- -c
- |
{_indent(script, 14)}
env:
- name: HF_HOME
value: /model-cache
- name: HF_TOKEN
valueFrom:
secretKeyRef:
name: {hf_token_secret_name}
key: HF_TOKEN
- name: PYTHONUNBUFFERED
value: "1"
- name: AIPERF_HTTP_CONNECTION_LIMIT
value: "512"
- name: JOB_NAME
value: {job_name}
- name: ARTIFACT_PVC_DIR
value: /model-cache/perf/{job_name}
volumeMounts:
- name: model-cache
mountPath: /model-cache
volumes:
- name: model-cache
persistentVolumeClaim:
claimName: model-cache
"""
def _wait_for_job(
job_name: str,
namespace: str,
timeout: int = 600,
) -> bool:
"""Poll for Job completion. Returns True if succeeded."""
waited = 0
while waited < timeout:
try:
result = run_kubectl(
["get", "job", job_name, "-o", "json"],
namespace=namespace,
check=False,
)
if result.returncode != 0:
time.sleep(5)
waited += 5
continue
job_data = json.loads(result.stdout)
conditions = job_data.get("status", {}).get("conditions", [])
for cond in conditions:
if cond.get("type") == "Complete" and cond.get("status") == "True":
print(f" aiperf Job completed (waited {waited}s)")
return True
if cond.get("type") == "Failed" and cond.get("status") == "True":
print(f" aiperf Job FAILED (waited {waited}s)")
_print_job_logs(job_name, namespace)
return False
except (json.JSONDecodeError, subprocess.SubprocessError, OSError) as e:
print(f" Transient error polling job {job_name} in {namespace}: {e}")
time.sleep(5)
waited += 5
if waited % 30 == 0:
print(f" aiperf Job running ({waited}s / {timeout}s)...")
print(f" aiperf Job timed out after {timeout}s")
_print_job_logs(job_name, namespace)
return False
def _print_job_logs(job_name: str, namespace: str, tail: int = 20) -> None:
"""Print last N lines of the Job pod logs."""
result = run_kubectl(
["logs", f"job/{job_name}", f"--tail={tail}"],
namespace=namespace,
check=False,
)
if result.stdout:
print(f" --- Last {tail} lines of aiperf logs ---")
for line in result.stdout.strip().split("\n"):
print(f" {line}")
def _get_job_pod_name(job_name: str, namespace: str) -> Optional[str]:
"""Get the pod name for a Job."""
result = run_kubectl(
[
"get",
"pods",
"-l",
f"job-name={job_name}",
"-o",
"jsonpath={.items[0].metadata.name}",
],
namespace=namespace,
check=False,
)
name = result.stdout.strip()
return name if name else None
def _copy_artifacts_from_pvc(
job_name: str,
namespace: str,
local_dir: Path,
) -> bool:
"""Copy aiperf artifacts from the model-cache PVC to the local filesystem.
Spins up a temporary busybox pod that mounts the PVC, uses kubectl cp
to extract the artifacts, then deletes the pod.
Returns True if artifacts were successfully copied and the expected
profile_export_aiperf.json exists, False otherwise.
"""
local_dir.mkdir(parents=True, exist_ok=True)
artifacts_ok = False
helper_name = f"copy-{job_name[-20:]}"
pvc_path = f"/model-cache/perf/{job_name}"
try:
# Create a helper pod to access the PVC
helper_yaml = f"""apiVersion: v1
kind: Pod
metadata:
name: {helper_name}
namespace: {namespace}
spec:
restartPolicy: Never
containers:
- name: copy
image: busybox:latest
command: ["sh", "-c", "echo ready && sleep 300"]
volumeMounts:
- name: model-cache
mountPath: /model-cache
readOnly: true
volumes:
- name: model-cache
persistentVolumeClaim:
claimName: model-cache
"""
run_kubectl(["apply", "-f", "-"], namespace=namespace, input_data=helper_yaml)
# Wait for helper pod to be ready
for _ in range(30):
result = run_kubectl(
["get", "pod", helper_name, "-o", "jsonpath={.status.phase}"],
namespace=namespace,
check=False,
)
if result.stdout.strip() == "Running":
break
time.sleep(2)
# List what's on the PVC
result = run_kubectl(
["exec", helper_name, "--", "ls", "-la", pvc_path],
namespace=namespace,
check=False,
)
if result.stdout:
print(f" PVC artifacts ({pvc_path}):")
for line in result.stdout.strip().split("\n")[:6]:
print(f" {line}")
# Copy artifacts locally
subprocess.run(
[
"kubectl",
"cp",
f"{namespace}/{helper_name}:{pvc_path}/",
str(local_dir) + "/",
],
capture_output=True,
text=True,
check=True,
timeout=120,
)
files = list(local_dir.glob("*"))
print(f" Copied {len(files)} artifact files to local")
for f in sorted(files)[:5]:
print(f" {f.name} ({f.stat().st_size} bytes)")
expected = local_dir / "profile_export_aiperf.json"
if expected.exists() and expected.stat().st_size > 0:
artifacts_ok = True
else:
print(f" WARNING: expected artifact missing or empty: {expected.name}")
except Exception as e:
print(f" WARNING: artifact copy failed: {e}")
finally:
# Cleanup helper pod
run_kubectl(
["delete", "pod", helper_name, "--ignore-not-found", "--grace-period=0"],
namespace=namespace,
check=False,
)
return artifacts_ok
def run_aiperf(
artifact_dir: Path,
endpoint: str,
model_name: str,
concurrency: int,
isl: int,
namespace: str,
image: str,
run_id: str,
osl: int = 256,
benchmark_duration: Optional[int] = None,
num_requests: Optional[int] = None,
request_rate: Optional[int] = None,
warmup_duration: Optional[int] = None,
warmup_count: Optional[int] = None,
export_level: str = "summary",
image_pull_secret: str = "",
hf_token_secret_name: str = DEFAULT_HF_TOKEN_SECRET_NAME,
timeout: int = 600,
) -> bool:
"""Run aiperf as a k8s Job inside the namespace.
Creates a Job with python:3.12-slim, installs aiperf via pip, runs the
benchmark against the in-cluster service endpoint, then copies artifacts
back to the local filesystem.
Args:
artifact_dir: Local directory for aiperf artifacts.
endpoint: In-cluster frontend endpoint (service:port).
model_name: Model name for aiperf --model.
concurrency: Concurrency level.
isl: Input sequence length.
namespace: K8s namespace.
image: Container image (unused -- uses python:3.12-slim).
run_id: Unique run identifier (used in Job name).
osl: Output sequence length.
benchmark_duration: Optional benchmark duration in seconds.
num_requests: Optional request count.
request_rate: Optional request rate limit.
warmup_duration: Optional warmup duration in seconds.
warmup_count: Optional warmup request count.
export_level: aiperf export level (summary, records, raw).
image_pull_secret: Optional image pull secret for the Job pod.
hf_token_secret_name: Secret name that stores HF_TOKEN.
timeout: Job timeout in seconds.
Returns:
True if aiperf succeeded, False otherwise.
"""
# Sanitize run_id for k8s naming (lowercase, no underscores, max 63 chars)
safe_id = run_id.lower().replace("_", "-")[:40]
ts = str(int(time.time()))[-6:]
job_name = f"aiperf-{safe_id}-{ts}"
print(f" Creating aiperf Job: {job_name} (c={concurrency} isl={isl})")
script = _build_aiperf_script(
model_name=model_name,
endpoint=endpoint,
concurrency=concurrency,
isl=isl,
osl=osl,
benchmark_duration=benchmark_duration,
num_requests=num_requests,
request_rate=request_rate,
warmup_duration=warmup_duration,
warmup_count=warmup_count,
export_level=export_level,
)
job_yaml = _build_job_yaml(
job_name=job_name,
namespace=namespace,
script=script,
image_pull_secret=image_pull_secret,
hf_token_secret_name=hf_token_secret_name,
)
# Create the Job
try:
run_kubectl(
["apply", "-f", "-"],
namespace=namespace,
input_data=job_yaml,
)
except Exception as e:
print(f" ERROR: Failed to create aiperf Job: {e}")
return False
# Wait for completion
success = _wait_for_job(job_name, namespace, timeout=timeout)
# Copy artifacts from PVC regardless of success (partial results may exist)
artifacts_ok = _copy_artifacts_from_pvc(job_name, namespace, artifact_dir)
if success and not artifacts_ok:
print(" Job succeeded but artifacts missing -- marking as failure")
success = False
# Print logs on failure
if not success:
_print_job_logs(job_name, namespace, tail=30)
# Clean up the Job
run_kubectl(
["delete", "job", job_name, "--ignore-not-found"],
namespace=namespace,
check=False,
)
return success
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
DynamoGraphDeployment helpers -- backend switch, restart, readiness.
Ported from sweep.sh functions: dgd_switch_backend, dgd_restart_frontend,
dgd_restart_graph, dgd_wait_all_ready.
"""
from __future__ import annotations
import json
import random
import subprocess
import time
import urllib.error
import urllib.request
from sweep_k8s.kubectl import (
delete_pod,
get_json,
get_pod_name,
patch_json,
patch_merge,
run_kubectl,
wait_for_pod_deletion,
wait_pod,
)
# Tokenizer backend name mapping for DGD env vars
TOKENIZER_BACKEND_MAP = {
"hf": "default",
"default": "default",
"fast": "fast",
"fastokens": "fast",
}
def dgd_label_selector(dgd_name: str, component_type: str) -> str:
"""Build a label selector for DGD-managed pods."""
return (
f"nvidia.com/dynamo-graph-deployment-name={dgd_name},"
f"nvidia.com/dynamo-component-type={component_type}"
)
def wait_model_ready(
endpoint: str,
model_name: str,
max_wait: int = 300,
namespace: str = "",
) -> None:
"""Wait for a model to be registered at the frontend /v1/models endpoint.
Tries direct HTTP first. If the endpoint is not reachable from localhost
(in-cluster DNS), falls back to kubectl run to check from inside the cluster.
"""
print(f" Waiting for model '{model_name}' at http://{endpoint}/v1/models...")
waited = 0
while True:
# Try direct HTTP (works if endpoint is port-forwarded or localhost)
try:
req = urllib.request.Request(
f"http://{endpoint}/v1/models",
headers={"Accept": "application/json"},
)
with urllib.request.urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode())
models = data.get("data", [])
if any(m.get("id") == model_name for m in models):
print(f" Model ready (waited {waited}s)")
return
except (urllib.error.URLError, json.JSONDecodeError, OSError, ValueError):
pass
# Fallback: kubectl-based check for in-cluster endpoints
if namespace and _check_model_via_kubectl(endpoint, model_name, namespace):
print(f" Model ready via kubectl (waited {waited}s)")
return
time.sleep(5)
waited += 5
if waited >= max_wait:
print(f"ERROR: Model not ready after {max_wait}s")
raise TimeoutError(f"Model '{model_name}' not ready after {max_wait}s")
if waited % 15 == 0:
print(f" Still waiting ({waited}s / {max_wait}s)...")
def _check_model_via_kubectl(
endpoint: str,
model_name: str,
namespace: str,
) -> bool:
"""Check model readiness by running curl from inside the cluster."""
pod_name = f"model-check-{int(time.time())}-{random.randint(0, 9999)}"
try:
result = subprocess.run(
[
"kubectl",
"run",
pod_name,
"--rm",
"-i",
"--restart=Never",
"-n",
namespace,
"--quiet",
"--image=curlimages/curl:latest",
"--",
"-sf",
f"http://{endpoint}/v1/models",
],
capture_output=True,
text=True,
timeout=20,
)
if result.returncode == 0 and result.stdout.strip():
data = json.loads(result.stdout)
models = data.get("data", [])
return any(m.get("id") == model_name for m in models)
except (subprocess.SubprocessError, json.JSONDecodeError, OSError):
pass
return False
def dgd_wait_all_ready(
dgd_name: str,
namespace: str,
endpoint: str,
model_name: str,
max_wait: int = 300,
) -> None:
"""Wait for all DGD worker pods to be Ready, then wait for model endpoint."""
print(" Waiting for all worker pods to be Ready...")
retries = 3
for attempt in range(retries):
try:
wait_pod(
dgd_label_selector(dgd_name, "worker"),
namespace,
timeout=max_wait,
)
break
except subprocess.TimeoutExpired:
raise
except subprocess.CalledProcessError as e:
if attempt < retries - 1:
print(f" kubectl error (attempt {attempt + 1}/{retries}), retrying...")
time.sleep(5)
else:
raise RuntimeError(
f"Worker pods not ready after {retries} retries: {e}"
) from e
wait_model_ready(endpoint, model_name, max_wait, namespace=namespace)
def dgd_switch_backend(
dgd_name: str,
namespace: str,
endpoint: str,
model_name: str,
backend: str,
) -> None:
"""Switch tokenizer backend on a DynamoGraphDeployment.
Patches the DGD spec to set DYN_TOKENIZER_BACKEND; the Grove operator
recreates the frontend pod automatically.
"""
mapped_backend = TOKENIZER_BACKEND_MAP.get(backend, backend)
print(
f"\n--- Switching DGD tokenizer backend -> {mapped_backend} (dgd={dgd_name}) ---"
)
# Find the index of DYN_TOKENIZER_BACKEND in the Frontend env array
try:
dgd_json = get_json("dgd", dgd_name, namespace)
env_list = (
dgd_json.get("spec", {})
.get("services", {})
.get("Frontend", {})
.get("extraPodSpec", {})
.get("mainContainer", {})
.get("env", [])
)
idx = None
for i, env_var in enumerate(env_list):
if env_var.get("name") == "DYN_TOKENIZER_BACKEND":
idx = i
break
except Exception:
idx = None
# Capture the current frontend pod name BEFORE patching so we track
# the right pod for deletion (avoids racing with the operator).
old_pod = get_pod_name(
dgd_label_selector(dgd_name, "frontend"),
namespace,
)
if idx is not None:
patch_json(
"dgd",
dgd_name,
namespace,
[
{
"op": "replace",
"path": f"/spec/services/Frontend/extraPodSpec/mainContainer/env/{idx}/value",
"value": mapped_backend,
}
],
)
else:
patch_json(
"dgd",
dgd_name,
namespace,
[
{
"op": "add",
"path": "/spec/services/Frontend/extraPodSpec/mainContainer/env/-",
"value": {"name": "DYN_TOKENIZER_BACKEND", "value": mapped_backend},
}
],
)
print(" DGD patched -- waiting for frontend pod replacement...")
if old_pod:
print(f" Waiting for old pod {old_pod} to terminate...")
wait_for_pod_deletion(old_pod, namespace, timeout=120)
# Wait for new frontend pod to be Ready
print(" Waiting for new frontend pod to be Ready...")
wait_pod(
dgd_label_selector(dgd_name, "frontend"),
namespace,
timeout=300,
)
dgd_wait_all_ready(dgd_name, namespace, endpoint, model_name)
def dgd_restart_frontend(
dgd_name: str,
namespace: str,
endpoint: str,
model_name: str,
) -> None:
"""Restart only the frontend component to reset metrics counters."""
print(" Restarting frontend pod to reset metrics counters...")
old_pod = get_pod_name(
dgd_label_selector(dgd_name, "frontend"),
namespace,
)
if old_pod:
delete_pod(old_pod, namespace, grace_period=5)
print(f" Waiting for old pod {old_pod} to terminate...")
# Wait for delete
try:
run_kubectl(
["wait", "pod", old_pod, "--for=delete", "--timeout=90s"],
namespace=namespace,
check=False,
)
except Exception:
pass
print(" Waiting for new frontend pod to be Ready...")
wait_pod(
dgd_label_selector(dgd_name, "frontend"),
namespace,
timeout=300,
)
dgd_wait_all_ready(dgd_name, namespace, endpoint, model_name)
def dgd_restart_graph(
dgd_name: str,
namespace: str,
endpoint: str,
model_name: str,
) -> None:
"""Trigger a full DGD restart through spec.restart.
Every run starts from a clean graph deployment state.
"""
restart_id = f"bench-{time.strftime('%Y%m%d-%H%M%S')}-{random.randint(0, 9999)}"
print(f" Restarting full DGD deployment (id={restart_id})...")
# Discover service names from the DGD spec so the restart order is correct
# for any backend (mocker, vllm, trtllm, etc.)
try:
dgd_spec = get_json("dgd", dgd_name, namespace, timeout=60)
services = list(dgd_spec.get("spec", {}).get("services", {}).keys())
# Put workers before frontend: restart workers first, then frontend
frontend_names = [s for s in services if s.lower() == "frontend"]
worker_names = [s for s in services if s.lower() != "frontend"]
restart_order = worker_names + frontend_names
except Exception:
restart_order = ["Frontend"]
print(f" Restart order: {restart_order}")
patch_merge(
"dgd",
dgd_name,
namespace,
{
"spec": {
"restart": {
"id": restart_id,
"strategy": {
"type": "Sequential",
"order": restart_order,
},
}
}
},
)
waited = 0
phase = "pending"
while True:
try:
state_json = get_json("dgd", dgd_name, namespace, timeout=60)
restart_status = state_json.get("status", {}).get("restart", {})
observed = restart_status.get("observedID", "")
phase = restart_status.get("phase", "")
if observed == restart_id:
if phase == "Completed":
print(f" DGD restart completed (waited {waited}s)")
break
elif phase in ("Failed", "Superseded"):
raise RuntimeError(
f"DGD restart {restart_id} ended with phase={phase}"
)
except (KeyError, TypeError):
pass
except (subprocess.TimeoutExpired, subprocess.CalledProcessError) as e:
# Transient kubectl timeout -- retry
print(f" kubectl transient error, retrying... ({e.__class__.__name__})")
time.sleep(5)
waited += 5
if waited >= 600:
raise TimeoutError(f"Timed out waiting for DGD restart {restart_id}")
print(f" Waiting for DGD restart ({waited}s / 600s)... phase={phase}")
dgd_wait_all_ready(dgd_name, namespace, endpoint, model_name)
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Safe kubectl subprocess helpers.
All k8s interactions go through this module for consistent error handling
and namespace scoping.
"""
from __future__ import annotations
import json
import subprocess
import time
from typing import Any, Dict, List, Optional
def run_kubectl(
args: List[str],
namespace: Optional[str] = None,
capture: bool = True,
check: bool = True,
timeout: int = 60,
input_data: Optional[str] = None,
) -> subprocess.CompletedProcess:
"""Run a kubectl command with namespace scoping and error handling.
Args:
args: kubectl arguments (e.g., ["get", "pods"]).
namespace: K8s namespace (prepended as -n <namespace>).
capture: Whether to capture stdout/stderr.
check: Whether to raise on non-zero exit.
timeout: Command timeout in seconds.
input_data: Optional stdin input.
Returns:
CompletedProcess result.
"""
cmd = ["kubectl"]
if namespace:
cmd.extend(["-n", namespace])
cmd.extend(args)
result = subprocess.run(
cmd,
capture_output=capture,
text=True,
check=False,
timeout=timeout,
input=input_data,
)
if check and result.returncode != 0:
stderr = result.stderr.strip() if result.stderr else ""
print(f" kubectl error (rc={result.returncode}): {' '.join(args[:4])}")
if stderr:
print(f" {stderr}")
result.check_returncode()
return result
def get_json(
resource: str,
name: str,
namespace: str,
timeout: int = 30,
) -> Dict[str, Any]:
"""Get a k8s resource as a parsed JSON dict."""
result = run_kubectl(
["get", resource, name, "-o", "json"],
namespace=namespace,
timeout=timeout,
)
return json.loads(result.stdout)
def patch_json(
resource: str,
name: str,
namespace: str,
patch: List[Dict[str, Any]],
timeout: int = 30,
) -> None:
"""Apply a JSON patch to a k8s resource."""
patch_str = json.dumps(patch)
run_kubectl(
["patch", resource, name, "--type=json", f"-p={patch_str}"],
namespace=namespace,
timeout=timeout,
)
def patch_merge(
resource: str,
name: str,
namespace: str,
patch: Dict[str, Any],
timeout: int = 30,
) -> None:
"""Apply a strategic merge patch to a k8s resource."""
patch_str = json.dumps(patch)
run_kubectl(
["patch", resource, name, "--type=merge", f"-p={patch_str}"],
namespace=namespace,
timeout=timeout,
)
def wait_pod(
label_selector: str,
namespace: str,
condition: str = "Ready",
timeout: int = 300,
) -> None:
"""Wait for pod(s) matching a label selector to reach a condition."""
run_kubectl(
[
"wait",
"pod",
"-l",
label_selector,
f"--for=condition={condition}",
f"--timeout={timeout}s",
],
namespace=namespace,
timeout=timeout + 10,
)
def delete_pod(
name: str,
namespace: str,
grace_period: int = 5,
) -> None:
"""Delete a pod by name."""
run_kubectl(
["delete", "pod", name, f"--grace-period={grace_period}"],
namespace=namespace,
check=False,
)
def get_pod_name(
label_selector: str,
namespace: str,
) -> Optional[str]:
"""Get the name of the first pod matching a label selector."""
result = run_kubectl(
[
"get",
"pod",
"-l",
label_selector,
"-o",
"jsonpath={.items[0].metadata.name}",
],
namespace=namespace,
check=False,
)
name = result.stdout.strip()
return name if name else None
def pod_exists(name: str, namespace: str) -> bool:
"""Check if a pod exists."""
result = run_kubectl(
["get", "pod", name],
namespace=namespace,
check=False,
)
return result.returncode == 0
def apply_yaml(yaml_content: str, namespace: str) -> None:
"""Apply YAML content via kubectl apply -f -."""
run_kubectl(
["apply", "-f", "-"],
namespace=namespace,
input_data=yaml_content,
)
def apply_secret_literal(name: str, namespace: str, key: str, value: str) -> None:
"""Create or update an opaque Secret from a literal value."""
secret_yaml = f"""apiVersion: v1
kind: Secret
metadata:
name: {name}
type: Opaque
stringData:
{key}: {json.dumps(value)}
"""
apply_yaml(secret_yaml, namespace)
def wait_for_pod_deletion(
name: str,
namespace: str,
timeout: int = 120,
) -> None:
"""Wait for a pod to be deleted."""
waited = 0
while pod_exists(name, namespace):
time.sleep(5)
waited += 5
if waited >= timeout:
print(f" WARNING: pod {name} still present after {timeout}s")
break
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Prometheus metrics capture for k8s sweeps.
Captures pre/post frontend /metrics snapshots for delta analysis.
Supports both direct HTTP (when endpoint is reachable) and kubectl-exec
(when only in-cluster DNS is available).
"""
from __future__ import annotations
import shlex
import subprocess
import time
import urllib.request
from pathlib import Path
from typing import Optional
def capture_metrics(
endpoint: str,
dest: Path,
namespace: Optional[str] = None,
pod_label: Optional[str] = None,
) -> None:
"""Capture frontend /metrics to a file.
Tries direct HTTP first. If that fails and namespace + pod_label are
provided, falls back to kubectl exec curl from the frontend pod.
Args:
endpoint: Frontend endpoint (host:port) -- may be in-cluster DNS.
dest: Destination file path.
namespace: K8s namespace (for kubectl exec fallback).
pod_label: Pod label selector (for kubectl exec fallback).
"""
dest.parent.mkdir(parents=True, exist_ok=True)
# Try direct HTTP first (works if port-forwarded or on same network)
body = _try_http(endpoint)
# Fallback: kubectl exec into the frontend pod to curl metrics
if body is None and namespace and pod_label:
body = _try_kubectl_exec(endpoint, namespace, pod_label)
# Fallback 2: kubectl run a temporary pod to curl
if body is None and namespace:
body = _try_kubectl_run(endpoint, namespace)
if body and body.strip():
dest.write_text(body)
line_count = len(body.strip().split("\n"))
print(f" Metrics captured -> {dest.name} ({line_count} lines)")
else:
msg = f"# metrics capture failed at {time.strftime('%Y-%m-%dT%H:%M:%S')}\n"
dest.write_text(msg)
print(f" WARNING: could not capture metrics from {endpoint}")
def _try_http(endpoint: str) -> Optional[str]:
"""Try fetching metrics via direct HTTP."""
try:
req = urllib.request.Request(f"http://{endpoint}/metrics")
with urllib.request.urlopen(req, timeout=10) as resp:
return resp.read().decode()
except Exception:
return None
def _try_kubectl_exec(
endpoint: str,
namespace: str,
pod_label: str,
) -> Optional[str]:
"""Fetch metrics by exec-ing curl inside a running pod."""
try:
# Get a pod name from the label selector
result = subprocess.run(
[
"kubectl",
"-n",
namespace,
"get",
"pod",
"-l",
pod_label,
"-o",
"jsonpath={.items[0].metadata.name}",
],
capture_output=True,
text=True,
timeout=10,
)
pod_name = result.stdout.strip()
if not pod_name:
return None
# Exec curl inside the pod (curl may not be available; try wget too)
safe_endpoint = shlex.quote(endpoint)
result = subprocess.run(
[
"kubectl",
"-n",
namespace,
"exec",
pod_name,
"--",
"sh",
"-c",
f"curl -sf http://{safe_endpoint}/metrics 2>/dev/null || "
f"wget -qO- http://{safe_endpoint}/metrics 2>/dev/null || "
f'python3 -c "import urllib.request,sys; print(urllib.request.urlopen(sys.argv[1]).read().decode())" http://{safe_endpoint}/metrics 2>/dev/null',
],
capture_output=True,
text=True,
timeout=15,
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout
except Exception:
pass
return None
def _try_kubectl_run(endpoint: str, namespace: str) -> Optional[str]:
"""Fetch metrics via a one-shot kubectl run --rm pod."""
try:
result = subprocess.run(
[
"kubectl",
"run",
"metrics-fetch",
"--rm",
"-i",
"--restart=Never",
"-n",
namespace,
"--image=curlimages/curl:latest",
"--",
"-sf",
f"http://{endpoint}/metrics",
],
capture_output=True,
text=True,
timeout=30,
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout
except Exception:
pass
return None
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Deploy YAML template rendering and application.
Supports ${VARIABLE} placeholders using Python's string.Template.
This enables arbitrary backend deployments (mocker, vLLM, TensorRT-LLM, etc.)
without hardcoding DGD structures.
"""
from __future__ import annotations
import string
from pathlib import Path
from typing import Dict
from sweep_core.models import DeployDimension, SweepConfig
from sweep_k8s.kubectl import apply_yaml
# Tokenizer backend mapping for template substitution
TOKENIZER_TEMPLATE_MAP = {
"hf": "default",
"default": "default",
"fast": "fast",
"fastokens": "fast",
}
DEFAULT_HF_TOKEN_SECRET_NAME = "hf-token-secret"
def _indent_block(text: str, spaces: int) -> str:
prefix = " " * spaces
return "\n".join(f"{prefix}{line}" if line else "" for line in text.splitlines())
def _build_image_pull_secrets_block(image_pull_secret: str) -> str:
if not image_pull_secret:
return ""
return _indent_block(
f"""imagePullSecrets:
- name: {image_pull_secret}""",
8,
)
def build_substitution_dict(
deploy: DeployDimension,
config: SweepConfig,
) -> Dict[str, str]:
"""Build a variable substitution dictionary for template rendering.
Combines deploy dimensions, sweep config, and k8s config into a flat
dictionary suitable for string.Template substitution.
"""
k8s = config.k8s
hf_token_secret_name = DEFAULT_HF_TOKEN_SECRET_NAME
variables: Dict[str, str] = {
# Deploy dimensions
"DYN_TOKENIZER_BACKEND": TOKENIZER_TEMPLATE_MAP.get(
deploy.tokenizer, deploy.tokenizer
),
"NUM_WORKERS": str(deploy.workers),
# Model info
"MODEL": config.model,
"MODEL_NAME": config.model_name,
"MODEL_PATH": config.model,
# Image
"IMAGE": k8s.image,
# K8s config
"NAMESPACE": k8s.namespace,
"DGD_NAME": k8s.dgd_name,
"FRONTEND_PORT": str(k8s.frontend_port),
"WORKER_REPLICAS": str(k8s.worker_replicas),
"FRONTEND_REPLICAS": str(k8s.frontend_replicas),
"SPEEDUP_RATIO": str(config.speedup_ratio),
"REQUEST_PLANE": k8s.request_plane,
"EVENT_PLANE": k8s.event_plane,
"ROUTER_MODE": k8s.router_mode,
"HF_TOKEN_SECRET_NAME": hf_token_secret_name,
"FRONTEND_IMAGE_PULL_SECRETS_BLOCK": _build_image_pull_secrets_block(
k8s.image_pull_secret
),
"WORKER_IMAGE_PULL_SECRETS_BLOCK": _build_image_pull_secrets_block(
k8s.image_pull_secret
),
}
# Add any env_overrides from the deploy dimension
variables.update(deploy.env_overrides)
return variables
def render_template(template_path: Path, variables: Dict[str, str]) -> str:
"""Read a deploy YAML template and substitute ${VAR} placeholders.
Uses safe_substitute so missing variables are left as-is rather than
raising KeyError. This is important because DGD templates may contain
${VARIABLE} references that are resolved by the k8s operator at runtime.
"""
raw = template_path.read_text()
tmpl = string.Template(raw)
return tmpl.safe_substitute(variables)
def apply_rendered_template(
template_path: Path,
deploy: DeployDimension,
config: SweepConfig,
) -> None:
"""Render a deploy template and apply it via kubectl."""
variables = build_substitution_dict(deploy, config)
rendered = render_template(template_path, variables)
print(f" Applying rendered template: {template_path.name}")
apply_yaml(rendered, config.k8s.namespace)
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment