Unverified Commit 7b141f81 authored by Keyang Ru's avatar Keyang Ru Committed by GitHub
Browse files

[router][ci] Add gpu utilization analyze with nvml (#10345)

parent 7bc5fb0d
......@@ -185,6 +185,34 @@ jobs:
output_display=$(printf "%.0f" "$output_throughput_mean" 2>/dev/null || echo "$output_throughput_mean")
echo "| ${label} | ✅ Success | $ttft_display | $e2e_display | $input_display | $output_display |" >> $GITHUB_STEP_SUMMARY
# Optional GPU utilization table if monitor output exists
gpu_json="$result_folder/gpu_utilization.json"
if [ -f "$gpu_json" ]; then
overall_mean=$(jq -r '.overall.mean // 0' "$gpu_json")
printf "\n#### GPU Utilization — %s\n\n" "$label" >> $GITHUB_STEP_SUMMARY
printf "Overall mean: %.2f%%\n\n" "$overall_mean" >> $GITHUB_STEP_SUMMARY
echo "| GPU | Mean (%) | p5 | p10 | p25 | p50 | p75 | p90 | p95 |" >> $GITHUB_STEP_SUMMARY
echo "|-----|----------|----|-----|-----|-----|-----|-----|-----|" >> $GITHUB_STEP_SUMMARY
jq -r '
.per_gpu
| to_entries[]
| [ .key,
(.value.mean // 0),
(.value.p5 // 0),
(.value.p10 // 0),
(.value.p25 // 0),
(.value.p50 // 0),
(.value.p75 // 0),
(.value.p90 // 0),
(.value.p95 // 0)
]
| @tsv' "$gpu_json" \
| while IFS=$'\t' read -r gpu m p5 p10 p25 p50 p75 p90 p95; do
printf "| %s | %.2f | %.2f | %.2f | %.2f | %.2f | %.2f | %.2f | %.2f |\n" "$gpu" "$m" "$p5" "$p10" "$p25" "$p50" "$p75" "$p90" "$p95" >> $GITHUB_STEP_SUMMARY
done
echo "" >> $GITHUB_STEP_SUMMARY
fi
fi
fi
done
......@@ -238,6 +238,231 @@ def _graceful_stop_any(obj) -> None:
pass
def _gpu_monitor_should_run(thresholds: Optional[dict]) -> bool:
"""Decide whether to enable the GPU monitor.
Runs if thresholds request GPU checks or if GPU_UTIL_LOG is truthy.
"""
want = False
try:
mean_th = None if thresholds is None else thresholds.get("gpu_util_mean_min")
p50_th = None if thresholds is None else thresholds.get("gpu_util_p50_min")
want = bool(mean_th is not None or p50_th is not None)
except Exception:
want = False
if not want:
env_flag = os.environ.get("GPU_UTIL_LOG", "").lower() in ("1", "true", "yes")
want = want or env_flag
return want
def _gpu_monitor_path(experiment_folder: str) -> str:
"""Return the JSON path for storing GPU monitor results."""
base = Path.cwd() / experiment_folder
return str(base / "gpu_utilization.json")
def _launch_gpu_monitor(bench_pid: int, experiment_folder: str, interval: float):
"""Start the GPU monitor process. Returns (proc, path) or (None, None)."""
try:
from multiprocessing import Process
out_path = _gpu_monitor_path(experiment_folder)
proc = Process(
target=_gpu_monitor_proc_entry,
args=(bench_pid, out_path, interval),
daemon=True,
)
proc.start()
return proc, out_path
except Exception as e:
logger.warning("Failed to launch GPU monitor: %s", e)
return None, None
def _read_gpu_monitor_result(path: Optional[str]) -> Optional[dict]:
try:
if path and os.path.exists(path):
with open(path, "r") as f:
return json.load(f)
except Exception as e:
logger.warning("Failed to read GPU monitor result from %r: %s", path, e)
return None
def _log_and_assert_gpu_thresholds(
result: Optional[dict], thresholds: Optional[dict]
) -> None:
if not result or not isinstance(result, dict) or result.get("count", 0) <= 0:
logger.warning("GPU utilization monitor produced no samples.")
return
overall = result.get("overall", {}) if isinstance(result, dict) else {}
count = int(result.get("count", 0))
mean_th = None if thresholds is None else thresholds.get("gpu_util_mean_min")
p50_th = None if thresholds is None else thresholds.get("gpu_util_p50_min")
mean_v = float(overall.get("mean", 0.0))
p50_v = overall.get("p50")
logger.info(
"GPU utilization overall: mean=%.2f%% p50=%s (samples=%d)",
mean_v,
(f"{float(p50_v):.2f}%" if p50_v is not None else "n/a"),
count,
)
if mean_th is not None:
assert mean_v >= float(
mean_th
), f"GPU utilization mean below threshold: {mean_v:.2f}% < {mean_th}%"
if p50_th is not None and p50_v is not None:
p50_f = float(p50_v)
assert p50_f >= float(
p50_th
), f"GPU utilization p50 below threshold: {p50_f:.2f}% < {p50_th}%"
def _gpu_monitor_proc_entry(bench_pid: int, out_file: str, interval: float) -> None:
"""Low-impact GPU utilization monitor using NVML in a separate process.
Writes JSON to out_file that includes overall and per-GPU raw samples and summary stats.
"""
try:
try:
os.nice(10)
except Exception:
pass
total = 0.0
n = 0
try:
import pynvml # type: ignore
pynvml.nvmlInit()
except Exception:
with open(out_file, "w") as f:
os.makedirs(os.path.dirname(out_file), exist_ok=True)
json.dump(
{
"count": 0,
"overall": {"mean": 0.0},
"per_gpu": {},
"raw": {},
},
f,
)
return
try:
import pynvml # type: ignore
count = pynvml.nvmlDeviceGetCount()
handles = [pynvml.nvmlDeviceGetHandleByIndex(i) for i in range(count)]
except Exception:
with open(out_file, "w") as f:
os.makedirs(os.path.dirname(out_file), exist_ok=True)
json.dump(
{
"count": 0,
"overall": {"mean": 0.0},
"per_gpu": {},
"raw": {},
},
f,
)
return
# Prepare per-GPU and overall raw collectors
per_gpu_samples: dict[str, list[float]] = {}
overall_samples: list[float] = []
while True:
if not os.path.exists(f"/proc/{bench_pid}"):
break
try:
vals = []
import pynvml # type: ignore
for idx, h in enumerate(handles):
try:
util = pynvml.nvmlDeviceGetUtilizationRates(h).gpu
vals.append(float(util))
key = str(idx)
per_gpu_samples.setdefault(key, []).append(float(util))
except Exception:
continue
if vals:
avg = sum(vals) / len(vals)
overall_samples.append(avg)
total += avg
n += 1
except Exception:
pass
time.sleep(interval)
finally:
try:
os.makedirs(os.path.dirname(out_file), exist_ok=True)
with open(out_file, "w") as f:
def pct_from(samples: list[float], p: float) -> float:
if not samples:
return 0.0
srt = sorted(samples)
i = max(
0, min(len(srt) - 1, int(round((p / 100.0) * (len(srt) - 1))))
)
return float(srt[i])
overall_mean = (total / n) if n > 0 else 0.0
per_gpu_summary: dict[str, dict] = {}
for key, arr in per_gpu_samples.items():
per_gpu_summary[key] = {
"mean": float(sum(arr) / len(arr)) if arr else 0.0,
"p5": pct_from(arr, 5),
"p10": pct_from(arr, 10),
"p25": pct_from(arr, 25),
"p50": pct_from(arr, 50),
"p75": pct_from(arr, 75),
"p90": pct_from(arr, 90),
"p95": pct_from(arr, 95),
"min": float(min(arr)) if arr else 0.0,
"max": float(max(arr)) if arr else 0.0,
"count": len(arr),
}
out_payload = {
"bench_pid": bench_pid,
"interval_sec": interval,
"count": n,
"overall": {
"mean": float(overall_mean),
"p5": pct_from(overall_samples, 5),
"p10": pct_from(overall_samples, 10),
"p25": pct_from(overall_samples, 25),
"p50": pct_from(overall_samples, 50),
"p75": pct_from(overall_samples, 75),
"p90": pct_from(overall_samples, 90),
"p95": pct_from(overall_samples, 95),
"min": float(min(overall_samples)) if overall_samples else 0.0,
"max": float(max(overall_samples)) if overall_samples else 0.0,
},
"per_gpu": per_gpu_summary,
"raw": {
"overall": overall_samples,
"per_gpu": per_gpu_samples,
},
}
json.dump(out_payload, f)
except Exception:
pass
try:
import pynvml # type: ignore
pynvml.nvmlShutdown()
except Exception:
pass
@pytest.fixture(scope="session")
def genai_bench_runner() -> Callable[..., None]:
"""Provide a callable to run genai-bench and validate metrics.
......@@ -278,7 +503,7 @@ def genai_bench_runner() -> Callable[..., None]:
mrr = (
max_requests_per_run
if max_requests_per_run is not None
else num_concurrency * 3
else num_concurrency * 5
)
cmd = [
......@@ -303,7 +528,7 @@ def genai_bench_runner() -> Callable[..., None]:
"--max-requests-per-run",
str(mrr),
"--max-time-per-run",
"2",
"3",
"--experiment-folder-name",
experiment_folder,
"--experiment-base-dir",
......@@ -318,6 +543,19 @@ def genai_bench_runner() -> Callable[..., None]:
proc = subprocess.Popen(
cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
# Optional GPU utilization monitor in a low-priority child process (pynvml only)
# Enabled only when gpu_util_mean_min is provided in thresholds.
monitor_path = None
monitor_proc = None
gpu_util_result: dict | None = None
want_gpu_monitor = _gpu_monitor_should_run(thresholds)
if want_gpu_monitor:
interval = float(os.environ.get("GPU_UTIL_SAMPLE_INTERVAL", "2.0"))
monitor_proc, monitor_path = _launch_gpu_monitor(
bench_pid=proc.pid,
experiment_folder=experiment_folder,
interval=interval,
)
stdout = stderr = ""
rc = None
try:
......@@ -348,11 +586,16 @@ def genai_bench_runner() -> Callable[..., None]:
)
actual_folder = candidates[0]
json_files = [
p
for p in actual_folder.rglob("*.json")
if "experiment_metadata" not in p.name
]
json_files = []
for _ in range(10):
json_files = [
p
for p in actual_folder.rglob("*.json")
if "experiment_metadata" not in p.name
]
if json_files:
break
time.sleep(1)
if not json_files:
raise AssertionError(
"Benchmark failed: no JSON results found\n"
......@@ -365,36 +608,50 @@ def genai_bench_runner() -> Callable[..., None]:
with jf.open("r") as f:
data = json.load(f)
stats = data.get("aggregated_metrics", {}).get("stats", {})
ttft_mean = float(stats.get("ttft", {}).get("mean", float("inf")))
e2e_latency_mean = float(
stats.get("e2e_latency", {}).get("mean", float("inf"))
)
input_tp_mean = float(stats.get("input_throughput", {}).get("mean", 0.0))
output_tp_mean = float(stats.get("output_throughput", {}).get("mean", 0.0))
logger.info(
"genai-bench[%s] %s ttft_mean=%.3fs e2e_latency_mean=%.3fs input_tp_mean=%.1f tok/s output_tp_mean=%.1f tok/s",
experiment_folder,
jf.name,
ttft_mean,
e2e_latency_mean,
input_tp_mean,
output_tp_mean,
)
ttft_mean = float(stats.get("ttft", {}).get("mean", float("inf")))
e2e_latency_mean = float(
stats.get("e2e_latency", {}).get("mean", float("inf"))
)
input_tp_mean = float(
stats.get("input_throughput", {}).get("mean", 0.0)
)
output_tp_mean = float(
stats.get("output_throughput", {}).get("mean", 0.0)
)
if th is not None:
assert (
ttft_mean <= th["ttft_mean_max"]
), f"TTFT validation failed: {ttft_mean} > {th['ttft_mean_max']} (file={jf.name})"
assert (
e2e_latency_mean <= th["e2e_latency_mean_max"]
), f"E2E latency validation failed: {e2e_latency_mean} > {th['e2e_latency_mean_max']} (file={jf.name})"
assert (
input_tp_mean >= th["input_throughput_mean_min"]
), f"Input throughput validation failed: {input_tp_mean} < {th['input_throughput_mean_min']} (file={jf.name})"
assert (
output_tp_mean >= th["output_throughput_mean_min"]
), f"Output throughput validation failed: {output_tp_mean} < {th['output_throughput_mean_min']} (file={jf.name})"
logger.info(
"genai-bench[%s] %s ttft_mean=%.3fs e2e_latency_mean=%.3fs input_tp_mean=%.1f tok/s output_tp_mean=%.1f tok/s",
experiment_folder,
jf.name,
ttft_mean,
e2e_latency_mean,
input_tp_mean,
output_tp_mean,
)
if th is not None:
assert (
ttft_mean <= th["ttft_mean_max"]
), f"TTFT validation failed: {ttft_mean} > {th['ttft_mean_max']} (file={jf.name})"
assert (
e2e_latency_mean <= th["e2e_latency_mean_max"]
), f"E2E latency validation failed: {e2e_latency_mean} > {th['e2e_latency_mean_max']} (file={jf.name})"
assert (
input_tp_mean >= th["input_throughput_mean_min"]
), f"Input throughput validation failed: {input_tp_mean} < {th['input_throughput_mean_min']} (file={jf.name})"
assert (
output_tp_mean >= th["output_throughput_mean_min"]
), f"Output throughput validation failed: {output_tp_mean} < {th['output_throughput_mean_min']} (file={jf.name})"
# Validate optional GPU utilization threshold if provided
if want_gpu_monitor:
try:
if monitor_proc is not None:
monitor_proc.join(timeout=5)
except Exception:
pass
gpu_util_result = _read_gpu_monitor_result(monitor_path)
_log_and_assert_gpu_thresholds(gpu_util_result, thresholds)
finally:
# Always attempt to stop workers to avoid resource leakage
......@@ -411,6 +668,12 @@ def genai_bench_runner() -> Callable[..., None]:
time.sleep(2)
except Exception:
pass
# Ensure GPU monitor process is cleaned up
if monitor_proc is not None and monitor_proc.is_alive():
try:
monitor_proc.terminate()
except Exception:
pass
return _run
......
......@@ -257,6 +257,7 @@ def test_pd_genai_bench(e2e_model: str, pd_cluster, genai_bench_runner):
"e2e_latency_mean_max": 15,
"input_throughput_mean_min": 400,
"output_throughput_mean_min": 20,
"gpu_util_p50_min": 99,
},
kill_procs=pd_cluster.workers,
)
......@@ -47,6 +47,8 @@ def test_genai_bench(
"e2e_latency_mean_max": 14,
"input_throughput_mean_min": 1000,
"output_throughput_mean_min": 12,
# Enforce GPU utilization p50 >= 99% during the run.
"gpu_util_p50_min": 99,
},
kill_procs=e2e_two_workers_dp2,
)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment