Unverified Commit 480d1b8b authored by Keyang Ru's avatar Keyang Ru Committed by GitHub
Browse files

[router] add benchmark for regular router and pd router (#10280)

parent 6c18ab46
......@@ -65,10 +65,10 @@ jobs:
# Run quick benchmarks to ensure they work using Python script
python3 scripts/run_benchmarks.py --quick
e2e-python:
pytest-rust:
if: github.repository == 'sgl-project/sglang' || github.event_name == 'pull_request'
runs-on: BM.A10.4
timeout-minutes: 35
timeout-minutes: 25
steps:
- name: Checkout code
uses: actions/checkout@v4
......@@ -109,11 +109,82 @@ jobs:
run: |
bash scripts/killall_sglang.sh "nuk_gpus"
cd sgl-router
python3 -m pip --no-cache-dir install --upgrade --ignore-installed blinker
python3 -m pip --no-cache-dir install --upgrade --break-system-packages genai-bench==0.0.2
pytest -m e2e -s -vv -o log_cli=true --log-cli-level=INFO
- name: Upload benchmark results
if: success()
uses: actions/upload-artifact@v4
with:
name: genai-bench-results-all-policies
path: sgl-router/benchmark_**/
finish:
needs: [unit-test-rust, e2e-python]
needs: [unit-test-rust, pytest-rust]
runs-on: ubuntu-latest
steps:
- name: Finish
run: echo "This is an empty step to ensure that all jobs are completed."
summarize-benchmarks:
needs: pytest-rust
runs-on: ubuntu-latest
if: success()
steps:
- name: Install jq
run: sudo apt-get update && sudo apt-get install -y jq bc
- name: Download benchmark results
uses: actions/download-artifact@v4
with:
name: genai-bench-results-all-policies
- name: List downloaded contents
run: |
echo "Contents after download:"
ls -la
find . -name "benchmark_*" -type d
echo "JSON files found:"
find . -name "*.json" | head -10
- name: Create benchmark summary
run: |
echo "=== DEBUG: Creating benchmark summary ==="
echo "Available benchmark directories:"
find . -name "benchmark_*" -type d || true
echo "=========================================="
echo "## Router E2E Genai-Bench Results Summary" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "Results captured from E2E tests for two scenarios: regular router (2 workers, dp=2) and PD router (2 prefill + 2 decode)." >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "| Scenario | Status | TTFT (s) | E2E Latency (s) | Input Throughput (tok/s) | Output Throughput (tok/s) |" >> $GITHUB_STEP_SUMMARY
echo "|----------|--------|----------|-----------------|--------------------------|---------------------------|" >> $GITHUB_STEP_SUMMARY
scenarios=$'Regular (dp=2, round_robin)|benchmark_round_robin_regular\nPD (2 prefill + 2 decode, round_robin)|benchmark_round_robin_pd'
echo "$scenarios" | sed 's/^\s*//' | while IFS='|' read -r label pattern; do
[ -z "$label" ] && continue
# Find the result folder (handle different extraction layouts)
result_folder=$(find . -maxdepth 3 \( -name "$pattern" -o -path "*${pattern}*" \) -type d | head -1)
if [ -n "$result_folder" ] && [ -d "$result_folder" ]; then
json_file=$(find "$result_folder" -name "*.json" -not -name "experiment_metadata.json" | head -1)
if [ -n "$json_file" ] && [ -f "$json_file" ]; then
ttft_mean=$(jq -r '.aggregated_metrics.stats.ttft.mean' "$json_file")
e2e_latency_mean=$(jq -r '.aggregated_metrics.stats.e2e_latency.mean' "$json_file")
input_throughput_mean=$(jq -r '.aggregated_metrics.stats.input_throughput.mean' "$json_file")
output_throughput_mean=$(jq -r '.aggregated_metrics.stats.output_throughput.mean' "$json_file")
ttft_display=$(printf "%.2f" "$ttft_mean" 2>/dev/null || echo "$ttft_mean")
e2e_display=$(printf "%.2f" "$e2e_latency_mean" 2>/dev/null || echo "$e2e_latency_mean")
input_display=$(printf "%.0f" "$input_throughput_mean" 2>/dev/null || echo "$input_throughput_mean")
output_display=$(printf "%.0f" "$output_throughput_mean" 2>/dev/null || echo "$output_throughput_mean")
echo "| ${label} | ✅ Success | $ttft_display | $e2e_display | $input_display | $output_display |" >> $GITHUB_STEP_SUMMARY
fi
fi
done
import json
import logging
import os
import shutil
import signal
import socket
import subprocess
import time
from pathlib import Path
from types import SimpleNamespace
from typing import Callable, Optional
from urllib.parse import urlparse
import pytest
......@@ -13,6 +20,8 @@ from sglang.test.test_utils import (
DEFAULT_URL_FOR_TEST,
)
logger = logging.getLogger(__name__)
def _find_available_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
......@@ -89,6 +98,7 @@ def _popen_launch_worker(
*,
dp_size: int | None = None,
api_key: str | None = None,
base_gpu_id: int | None = 0,
) -> subprocess.Popen:
host, port = _parse_url(base_url)
......@@ -103,7 +113,7 @@ def _popen_launch_worker(
"--port",
port,
"--base-gpu-id",
"0",
str(base_gpu_id or 0),
]
if dp_size is not None:
cmd += ["--dp-size", str(dp_size)]
......@@ -161,6 +171,250 @@ def _terminate(proc: subprocess.Popen, timeout: float = 120) -> None:
time.sleep(1)
def _which(cmd: str) -> Optional[str]:
try:
return shutil.which(cmd)
except Exception as e:
logger.warning("shutil.which(%r) failed: %s", cmd, e)
return None
def _graceful_stop_popen(p: subprocess.Popen) -> None:
if p is None:
return
try:
if p.poll() is None:
p.terminate()
for _ in range(5):
if p.poll() is not None:
break
time.sleep(1)
if p.poll() is None:
p.kill()
except Exception as e:
logger.warning("Exception during graceful stop of popen: %s", e)
def _pid_alive(pid: int) -> bool:
try:
os.kill(pid, 0)
return True
except Exception:
return False
def _graceful_stop_pid(pid: int) -> None:
try:
if _pid_alive(pid):
try:
os.kill(pid, signal.SIGTERM)
except Exception:
pass
for _ in range(5):
if not _pid_alive(pid):
break
time.sleep(1)
if _pid_alive(pid):
try:
os.kill(pid, signal.SIGKILL)
except Exception:
pass
except Exception:
pass
def _graceful_stop_any(obj) -> None:
try:
if isinstance(obj, subprocess.Popen):
_graceful_stop_popen(obj)
return
if isinstance(obj, int):
_graceful_stop_pid(obj)
return
proc_obj = getattr(obj, "proc", None)
if isinstance(proc_obj, subprocess.Popen):
_graceful_stop_popen(proc_obj)
except Exception:
pass
@pytest.fixture(scope="session")
def genai_bench_runner() -> Callable[..., None]:
"""Provide a callable to run genai-bench and validate metrics.
Usage in tests:
def test(..., genai_bench_runner):
genai_bench_runner(router_url=..., model_path=..., experiment_folder=...)
"""
def _run(
*,
router_url: str,
model_path: str,
experiment_folder: str,
timeout_sec: int | None = None,
thresholds: dict | None = None,
extra_env: dict | None = None,
num_concurrency: int = 32,
traffic_scenario: str = "D(4000,100)",
max_requests_per_run: int | None = None,
clean_experiment: bool = True,
kill_procs: list | None = None,
drain_delay_sec: int = 6,
) -> None:
cli = _which("genai-bench")
if not cli:
pytest.fail(
"genai-bench CLI not found; please install it to run benchmarks"
)
# Clean previous experiment folder under current working directory
if clean_experiment:
exp_dir = Path.cwd() / experiment_folder
if exp_dir.exists():
shutil.rmtree(exp_dir, ignore_errors=True)
# Default requests per run if not provided
mrr = (
max_requests_per_run
if max_requests_per_run is not None
else num_concurrency * 3
)
cmd = [
cli,
"benchmark",
"--api-backend",
"openai",
"--api-base",
router_url,
"--api-key",
"dummy-token",
"--api-model-name",
model_path,
"--model-tokenizer",
model_path,
"--task",
"text-to-text",
"--num-concurrency",
str(num_concurrency),
"--traffic-scenario",
traffic_scenario,
"--max-requests-per-run",
str(mrr),
"--max-time-per-run",
"2",
"--experiment-folder-name",
experiment_folder,
"--experiment-base-dir",
str(Path.cwd()),
]
env = os.environ.copy()
if extra_env:
env.update(extra_env)
to = timeout_sec or int(os.environ.get("GENAI_BENCH_TEST_TIMEOUT", "120"))
proc = subprocess.Popen(
cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
stdout = stderr = ""
rc = None
try:
try:
stdout, stderr = proc.communicate(timeout=to)
except subprocess.TimeoutExpired:
# Simple: kill the CLI process if it doesn't exit in time
try:
proc.kill()
except Exception:
pass
stdout, stderr = proc.communicate()
rc = proc.returncode
# Prefer exact path under cwd; fallback to rglob search
base = Path.cwd()
direct = base / experiment_folder
candidates = [direct] if direct.is_dir() else []
if not candidates:
for p in base.rglob(experiment_folder):
if p.is_dir() and p.name == experiment_folder:
candidates = [p]
break
if not candidates:
raise AssertionError(
"Benchmark failed: experiment folder not found: "
f"{experiment_folder}\nExit code: {rc}\nSTDOUT (tail):\n{stdout[-1000:]}\nSTDERR (tail):\n{stderr[-1000:]}"
)
actual_folder = candidates[0]
json_files = [
p
for p in actual_folder.rglob("*.json")
if "experiment_metadata" not in p.name
]
if not json_files:
raise AssertionError(
"Benchmark failed: no JSON results found\n"
f"Exit code: {rc}\nSTDOUT (tail):\n{stdout[-1000:]}\nSTDERR (tail):\n{stderr[-1000:]}"
)
th = thresholds # None means "log only", no validation
for jf in json_files:
with jf.open("r") as f:
data = json.load(f)
stats = data.get("aggregated_metrics", {}).get("stats", {})
ttft_mean = float(stats.get("ttft", {}).get("mean", float("inf")))
e2e_latency_mean = float(
stats.get("e2e_latency", {}).get("mean", float("inf"))
)
input_tp_mean = float(stats.get("input_throughput", {}).get("mean", 0.0))
output_tp_mean = float(stats.get("output_throughput", {}).get("mean", 0.0))
logger.info(
"genai-bench[%s] %s ttft_mean=%.3fs e2e_latency_mean=%.3fs input_tp_mean=%.1f tok/s output_tp_mean=%.1f tok/s",
experiment_folder,
jf.name,
ttft_mean,
e2e_latency_mean,
input_tp_mean,
output_tp_mean,
)
if th is not None:
assert (
ttft_mean <= th["ttft_mean_max"]
), f"TTFT validation failed: {ttft_mean} > {th['ttft_mean_max']} (file={jf.name})"
assert (
e2e_latency_mean <= th["e2e_latency_mean_max"]
), f"E2E latency validation failed: {e2e_latency_mean} > {th['e2e_latency_mean_max']} (file={jf.name})"
assert (
input_tp_mean >= th["input_throughput_mean_min"]
), f"Input throughput validation failed: {input_tp_mean} < {th['input_throughput_mean_min']} (file={jf.name})"
assert (
output_tp_mean >= th["output_throughput_mean_min"]
), f"Output throughput validation failed: {output_tp_mean} < {th['output_throughput_mean_min']} (file={jf.name})"
finally:
# Always attempt to stop workers to avoid resource leakage
if kill_procs:
# Give router/workers a small grace period to finish any last drains
if drain_delay_sec > 0:
try:
time.sleep(drain_delay_sec)
except Exception:
pass
for p in kill_procs:
_graceful_stop_any(p)
try:
time.sleep(2)
except Exception:
pass
return _run
def pytest_configure(config):
config.addinivalue_line("markers", "e2e: mark as end-to-end test")
......@@ -233,3 +487,26 @@ def e2e_worker_dp2_api(e2e_model: str, e2e_router_only_rr_dp_aware_api):
yield SimpleNamespace(proc=proc, url=base_url)
finally:
_terminate(proc)
@pytest.fixture(scope="session")
def e2e_two_workers_dp2(e2e_model: str):
"""Launch two workers, each with dp_size=2, mapped to GPUs [0,1] and [2,3]."""
workers = []
try:
# Worker A on GPUs 0-1
port_a = _find_available_port()
url_a = f"http://127.0.0.1:{port_a}"
proc_a = _popen_launch_worker(e2e_model, url_a, dp_size=2, base_gpu_id=0)
workers.append(SimpleNamespace(proc=proc_a, url=url_a))
# Worker B on GPUs 2-3
port_b = _find_available_port()
url_b = f"http://127.0.0.1:{port_b}"
proc_b = _popen_launch_worker(e2e_model, url_b, dp_size=2, base_gpu_id=2)
workers.append(SimpleNamespace(proc=proc_b, url=url_b))
yield workers
finally:
for w in workers:
_terminate(w.proc)
import logging
import os
import socket
import subprocess
import time
......@@ -9,6 +11,8 @@ import requests
from sglang.test.run_eval import run_eval
logger = logging.getLogger(__name__)
def _find_available_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
......@@ -132,11 +136,9 @@ def _terminate(proc: subprocess.Popen, timeout: float = 120) -> None:
time.sleep(1)
@pytest.mark.e2e
def test_pd_mmlu(e2e_model: str):
"""
Launch 4 workers, start a PD router (2 prefill + 2 decode), then run MMLU.
"""
@pytest.fixture(scope="module")
def pd_cluster(e2e_model: str):
"""Start 2 prefill + 2 decode workers and one PD router, once per module."""
# Environment capability checks: require sgl_kernel and GPU backend
try:
import sgl_kernel # noqa: F401
......@@ -153,8 +155,8 @@ def test_pd_mmlu(e2e_model: str):
if not torch.cuda.is_available(): # pragma: no cover - environment dependent
pytest.fail("PD e2e requires CUDA backend, but CUDA is not available")
# Start two prefill workers (with bootstrap ports) and two decode workers
workers: list[SimpleNamespace] = []
router_proc = None
try:
ib_device = _detect_ib_device()
......@@ -196,14 +198,12 @@ def test_pd_mmlu(e2e_model: str):
"--policy",
"round_robin",
"--pd-disaggregation",
# prefill URLs (explicitly pass 'none' for bootstrap port)
]
for url, bport in prefill:
cmd += ["--prefill", url, str(bport)]
for url in decode:
cmd += ["--decode", url]
cmd += [
# prometheus (avoid collisions across tests)
"--prometheus-port",
str(pport),
"--prometheus-host",
......@@ -211,22 +211,52 @@ def test_pd_mmlu(e2e_model: str):
]
router_proc = subprocess.Popen(cmd)
try:
_wait_health(router_url, timeout=180.0)
# Run a modest MMLU eval through the PD router
args = SimpleNamespace(
base_url=router_url,
model=e2e_model,
eval_name="mmlu",
num_examples=64,
num_threads=32,
temperature=0.1,
)
metrics = run_eval(args)
assert metrics["score"] >= 0.65
finally:
_terminate(router_proc)
_wait_health(router_url, timeout=180.0)
yield SimpleNamespace(
router_url=router_url, workers=workers, router_proc=router_proc
)
finally:
if router_proc is not None:
_terminate(router_proc)
for w in workers:
_terminate(w.proc)
@pytest.mark.e2e
def test_pd_mmlu(e2e_model: str, pd_cluster):
"""
Launch 4 workers, start a PD router (2 prefill + 2 decode), then run MMLU.
"""
args = SimpleNamespace(
base_url=pd_cluster.router_url,
model=e2e_model,
eval_name="mmlu",
num_examples=64,
num_threads=32,
temperature=0.1,
)
metrics = run_eval(args)
assert metrics["score"] >= 0.65
@pytest.mark.e2e
def test_pd_genai_bench(e2e_model: str, pd_cluster, genai_bench_runner):
"""
Launch 4 workers, start a PD router (2 prefill + 2 decode), then run a
short genai-bench benchmark and validate aggregate metrics.
"""
# Run genai-bench against the shared router
policy_label = "benchmark_round_robin_pd"
genai_bench_runner(
router_url=pd_cluster.router_url,
model_path=e2e_model,
experiment_folder=policy_label,
thresholds={
"ttft_mean_max": 12,
"e2e_latency_mean_max": 15,
"input_throughput_mean_min": 400,
"output_throughput_mean_min": 20,
},
kill_procs=pd_cluster.workers,
)
......@@ -9,13 +9,12 @@ from sglang.test.run_eval import run_eval
@pytest.mark.e2e
def test_mmlu(e2e_router_only_rr, e2e_primary_worker, e2e_model):
# Attach the primary worker to a fresh router-only instance (single model)
def test_mmlu(e2e_router_only_rr, e2e_two_workers_dp2, e2e_model):
# Attach two dp=2 workers (total 4 GPUs) to a fresh router-only instance
base = e2e_router_only_rr.url
r = requests.post(
f"{base}/add_worker", params={"url": e2e_primary_worker.url}, timeout=180
)
r.raise_for_status()
for w in e2e_two_workers_dp2:
r = requests.post(f"{base}/add_worker", params={"url": w.url}, timeout=180)
r.raise_for_status()
args = SimpleNamespace(
base_url=base,
......@@ -29,6 +28,30 @@ def test_mmlu(e2e_router_only_rr, e2e_primary_worker, e2e_model):
assert metrics["score"] >= 0.65
@pytest.mark.e2e
def test_genai_bench(
e2e_router_only_rr, e2e_two_workers_dp2, e2e_model, genai_bench_runner
):
"""Attach a worker to the regular router and run a short genai-bench."""
base = e2e_router_only_rr.url
for w in e2e_two_workers_dp2:
r = requests.post(f"{base}/add_worker", params={"url": w.url}, timeout=180)
r.raise_for_status()
genai_bench_runner(
router_url=base,
model_path=e2e_model,
experiment_folder="benchmark_round_robin_regular",
thresholds={
"ttft_mean_max": 6,
"e2e_latency_mean_max": 14,
"input_throughput_mean_min": 1000,
"output_throughput_mean_min": 12,
},
kill_procs=e2e_two_workers_dp2,
)
@pytest.mark.e2e
def test_add_and_remove_worker_live(e2e_router_only_rr, e2e_primary_worker, e2e_model):
base = e2e_router_only_rr.url
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment