Unverified Commit bcab0304 authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

feat: use CUDA_VISIBLE_DEVICES instead of --gpus, multi-line GPU status output (#7825)


Signed-off-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent 1f10ba55
......@@ -69,14 +69,6 @@ def pytest_addoption(parser: pytest.Parser) -> None:
"With -n N: runs N tests concurrently as subprocesses with VRAM-aware scheduling. "
"With -n auto: calculates max concurrent slots from GPU VRAM / max_vram_gib.",
)
parser.addoption(
"--gpus",
"--gpu",
type=str,
default="all",
help="Comma-separated GPU indices or 'all' (default: all). "
"Controls which GPUs the parallel test runner distributes tests across.",
)
parser.addoption(
"--dry-run",
action="store_true",
......@@ -110,17 +102,18 @@ def pytest_configure(config: pytest.Config) -> None:
return
# Delayed: vram_utils requires pynvml, otherwise conftest fails to load
# on CPU-only CI runners (e.g. ARM deploy tests) that lack nvidia-ml-py.
from tests.utils.pytest_parallel_gpu import _parse_gpu_indices
from tests.utils.pytest_parallel_gpu import _parse_cuda_visible
from tests.utils.vram_utils import auto_worker_count, detect_gpus
gpus = detect_gpus()
if gpus:
config.stash[_gpu_parallel_gpus_key] = gpus
# Parse --gpus into a list of indices (or None for all)
gpus_raw = config.getoption("gpus", default="all")
if gpus_raw and gpus_raw.strip().lower() != "all":
config.stash[_gpu_indices_key] = _parse_gpu_indices(gpus_raw, gpus)
# Honour CUDA_VISIBLE_DEVICES to restrict which GPUs the scheduler uses.
# NVML always sees all physical GPUs, so we filter here.
cvd = os.environ.get("CUDA_VISIBLE_DEVICES")
if cvd is not None:
config.stash[_gpu_indices_key] = _parse_cuda_visible(cvd, gpus)
selected_gpus = [
g for g in gpus if g["index"] in config.stash[_gpu_indices_key]
]
......@@ -548,8 +541,14 @@ def pytest_collection_modifyitems(config, items):
print(f" {name}{vram_str} -- {'; '.join(reasons)}")
gpus = config.stash.get(_gpu_parallel_gpus_key, None)
gpu_indices = config.stash.get(_gpu_indices_key, None)
if gpus and vram_limit is not None:
print_gpu_plan(gpus, vram_limit, would_run)
visible = (
[g for g in gpus if g["index"] in gpu_indices]
if gpu_indices is not None
else gpus
)
print_gpu_plan(visible, vram_limit, would_run)
print()
items.clear()
return
......
......@@ -251,24 +251,37 @@ def _capture_output(pipe, captured: list[str], prefix: str | None = None) -> Non
pipe.close()
def _parse_gpu_indices(raw: str, available: list[dict]) -> list[int]:
"""Parse --gpus value into a list of GPU indices.
def _parse_cuda_visible(raw: str | None, available: list[dict]) -> list[int]:
"""Parse CUDA_VISIBLE_DEVICES value into a list of physical GPU indices.
Accepts 'all' or comma-separated indices (e.g. '0,1').
Semantics match CUDA:
None (unset) → all GPUs visible
"" (empty) → no GPUs visible
"0,1" → those specific GPUs
Raises ValueError on UUID/MIG tokens (not supported by the scheduler).
"""
avail_indices = [g["index"] for g in available]
if raw.strip().lower() == "all":
if raw is None:
return avail_indices
if raw.strip() == "":
return []
indices = []
for part in raw.split(","):
part = part.strip()
if not part:
continue
try:
idx = int(part)
except ValueError:
raise ValueError(
f"Unsupported CUDA_VISIBLE_DEVICES token {part!r}; "
"only integer GPU indices are supported by the scheduler"
)
if idx not in avail_indices:
raise ValueError(f"GPU {idx} not found (available: {avail_indices})")
indices.append(idx)
return indices or avail_indices
return indices
def run_parallel(
......@@ -453,10 +466,10 @@ def run_parallel(
_VLLM_LAUNCH_STAGGER_S = 5.0
last_vllm_launch: dict[int, float] = {} # gpu_index -> monotonic timestamp
def _build_status(now: float) -> str:
"""Build multi-GPU status string for periodic output."""
def _build_status_lines(now: float) -> list[str]:
"""Build per-GPU status lines for periodic output."""
elapsed = int(now - t0)
gpu_parts = []
lines = []
for gi in sorted(gpu_states):
gs = gpu_states[gi]
actual = _get_gpu_used_gib(gi)
......@@ -469,8 +482,8 @@ def run_parallel(
part = f"GPU{gi}: {actual:.1f}/{gs.total_gib:.0f} GiB"
if wstr:
part += f" [{wstr}]"
gpu_parts.append(part)
return f"[elapsed {elapsed}s] {', '.join(gpu_parts)}"
lines.append(f"[elapsed {elapsed}s] {part}")
return lines
def _launch_test(test: _TestEntry, env_base: dict) -> _RunningTest:
"""Build env, spawn subprocess, start output streamer thread."""
......@@ -617,11 +630,12 @@ def run_parallel(
del running[w_id]
# Print status immediately after completion
parts = [_build_status(now)]
lines = _build_status_lines(now)
if pending:
queued_str = ", ".join(f"w{t.w_id}" for t in pending)
parts.append(f"[queued: {queued_str}]")
_print(" ".join(parts))
lines[-1] += f" [queued: {queued_str}]"
for ln in lines:
_print(ln)
next_status = now + 10
# --- Launch pending tests ---
......@@ -710,23 +724,25 @@ def run_parallel(
now = time.monotonic()
if now >= next_status and (running or pending):
parts = [_build_status(now)]
lines = _build_status_lines(now)
if pending:
queued_str = ", ".join(f"w{t.w_id}" for t in pending)
parts.append(f"[queued: {queued_str}]")
_print(" ".join(parts))
lines[-1] += f" [queued: {queued_str}]"
for ln in lines:
_print(ln)
next_status = now + 10
# Periodic status (print even when waiting for VRAM to free up)
if now >= next_status and (running or pending):
parts = [_build_status(now)]
lines = _build_status_lines(now)
if pending:
queued_str = ", ".join(f"w{t.w_id}" for t in pending)
if not running:
next_needed = pending[0].profiled_gib
parts.append(f"[waiting for {next_needed:.1f} GiB free]")
parts.append(f"[queued: {queued_str}]")
_print(" ".join(parts))
lines[-1] += f" [waiting for {next_needed:.1f} GiB free]"
lines[-1] += f" [queued: {queued_str}]"
for ln in lines:
_print(ln)
next_status = now + 10
if running or pending:
......@@ -807,7 +823,7 @@ def run_parallel(
def main() -> int:
parser = argparse.ArgumentParser(
description="Run GPU tests in parallel with VRAM-aware scheduling.",
usage="%(prog)s --max-vram-gib=N [-n SLOTS] [--gpu=0,1] [pytest-args...]",
usage="%(prog)s --max-vram-gib=N [-n SLOTS] [pytest-args...]",
)
parser.add_argument(
"--max-vram-gib",
......@@ -821,13 +837,6 @@ def main() -> int:
default="auto",
help="Number of concurrent slots. 'auto' = gpu_usable / max_vram_gib.",
)
parser.add_argument(
"--gpu",
"--gpus",
type=str,
default="all",
help="Comma-separated GPU indices or 'all' (default: all).",
)
raw = sys.argv[1:]
if "--" in raw:
......@@ -847,7 +856,11 @@ def main() -> int:
_print("ERROR: No GPUs detected")
return 1
gpu_indices = _parse_gpu_indices(args.gpus, gpus)
cvd = os.environ.get("CUDA_VISIBLE_DEVICES")
gpu_indices = _parse_cuda_visible(cvd, gpus)
if not gpu_indices:
_print("ERROR: CUDA_VISIBLE_DEVICES hides all GPUs")
return 1
_print(f"Collecting tests with --max-vram-gib={args.max_vram_gib}...")
test_ids = _collect_tests(pytest_args, args.max_vram_gib)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment