Commit 24bf8df9 authored by one's avatar one
Browse files

Add hytop-gpu v0.1.0

parent e90c927f
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[codz]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py.cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
# Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
# poetry.lock
# poetry.toml
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
# pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python.
# https://pdm-project.org/en/latest/usage/project/#working-with-version-control
# pdm.lock
# pdm.toml
.pdm-python
.pdm-build/
# pixi
# Similar to Pipfile.lock, it is generally recommended to include pixi.lock in version control.
# pixi.lock
# Pixi creates a virtual environment in the .pixi directory, just like venv module creates one
# in the .venv directory. It is recommended not to include this directory in version control.
.pixi
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# Redis
*.rdb
*.aof
*.pid
# RabbitMQ
mnesia/
rabbitmq/
rabbitmq-data/
# ActiveMQ
activemq-data/
# SageMath parsed files
*.sage.py
# Environments
.env
.envrc
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
# .idea/
# Abstra
# Abstra is an AI-powered process automation framework.
# Ignore directories containing user credentials, local state, and settings.
# Learn more at https://abstra.io/docs
.abstra/
# Visual Studio Code
# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore
# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore
# and can be added to the global gitignore or merged into this file. However, if you prefer,
# you could uncomment the following to ignore the entire vscode folder
# .vscode/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc
# Marimo
marimo/_static/
marimo/_lsp/
__marimo__/
# Streamlit
.streamlit/secrets.toml
\ No newline at end of file
# hytop - monitoring tools
## Quick start
```bash
uv pip install -e .
hytop gpu --help
```
## Prerequesites
- Python >= 3.10
- Python packages: `rich`, `typer`
- Passwordless SSH for remote monitoring
## `hytop gpu`
A lightweight script for live `hy-smi` polling with rolling averages across multiple hosts. It features a modern terminal UI and can be used as a blocking scheduler for GPU jobs.
### Usage
Simple examples:
```bash
# Local node, all GPUs, 5-second rolling window
hytop gpu -n 1 --window 5
# Two nodes, monitor only GPU 0 and 1
hytop gpu -H node01,node02 --devices 0,1 -n 1
# Exit with code 0 when all monitored GPUs are available
hytop gpu --devices 0,1 --wait-idle
# Wait at most 300s for availability (exit 124 on timeout)
hytop gpu --devices 0,1 --wait-idle --timeout 300
```
Queue jobs in shared environments:
```bash
if hytop gpu -H node01,node02 --wait-idle --timeout 300; then
echo "GPUs available, starting workload..."
# YOUR COMMAND HERE (e.g., python train.py)
else
echo "Error: GPUs not available in time, aborting pipeline."
exit 1
fi
```
### Exit Codes
Designed to be script-friendly:
* `0`: Availability condition met (GPUs are idle).
* `124`: Timeout reached before the availability condition was met.
* `130`: Interrupted by the user (Ctrl+C).
* `2`: Argument or input error.
## Development
### Version bump
Version is sourced from `src/hytop/__init__.py` (`__version__`).
```bash
# patch: 0.1.0 -> 0.1.1
python scripts/bump_version.py patch
# minor: 0.1.1 -> 0.2.0
python scripts/bump_version.py minor
# major: 0.2.0 -> 1.0.0
python scripts/bump_version.py major
# set an explicit version
python scripts/bump_version.py set 1.2.3
```
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "hytop"
dynamic = ["version"]
description = "hytop toolkit"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"rich>=13",
"typer>=0.12",
]
[project.scripts]
hytop = "hytop.main:main"
[tool.setuptools]
package-dir = {"" = "src"}
[tool.setuptools.packages.find]
where = ["src"]
[tool.setuptools.dynamic]
version = {attr = "hytop.__version__"}
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import re
import sys
from pathlib import Path
VERSION_PATTERN = re.compile(r'^(\s*__version__\s*=\s*")(\d+)\.(\d+)\.(\d+)(".*)$')
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Bump hytop version in src/hytop/__init__.py"
)
parser.add_argument(
"action",
choices=["patch", "minor", "major", "set"],
help="Bump strategy or set an explicit version",
)
parser.add_argument(
"version",
nargs="?",
help="Target version when action is 'set' (format: X.Y.Z)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Print the next version without writing files",
)
return parser.parse_args()
def parse_semver(version_text: str) -> tuple[int, int, int]:
match = re.fullmatch(r"(\d+)\.(\d+)\.(\d+)", version_text)
if not match:
raise ValueError(f"Invalid version: {version_text!r}. Expected format: X.Y.Z")
major, minor, patch = (int(part) for part in match.groups())
return major, minor, patch
def bump_version(current: tuple[int, int, int], action: str) -> tuple[int, int, int]:
major, minor, patch = current
if action == "patch":
return major, minor, patch + 1
if action == "minor":
return major, minor + 1, 0
if action == "major":
return major + 1, 0, 0
raise ValueError(f"Unsupported bump action: {action}")
def main() -> int:
args = parse_args()
root = Path(__file__).resolve().parents[1]
init_file = root / "src" / "hytop" / "__init__.py"
if not init_file.exists():
print(f"Cannot find version file: {init_file}", file=sys.stderr)
return 1
lines = init_file.read_text(encoding="utf-8").splitlines(keepends=True)
matched_index = -1
matched_groups: tuple[str, str, str, str, str] | None = None
for i, line in enumerate(lines):
match = VERSION_PATTERN.match(line.rstrip("\r\n"))
if match:
if matched_index != -1:
print("Found multiple __version__ lines; aborting.", file=sys.stderr)
return 1
matched_index = i
matched_groups = match.groups()
if matched_index == -1 or matched_groups is None:
print("Cannot find __version__ declaration in __init__.py", file=sys.stderr)
return 1
prefix, major_text, minor_text, patch_text, suffix = matched_groups
current_tuple = (int(major_text), int(minor_text), int(patch_text))
current_version = ".".join((major_text, minor_text, patch_text))
if args.action == "set":
if not args.version:
print("Action 'set' requires a version argument (X.Y.Z).", file=sys.stderr)
return 1
try:
next_tuple = parse_semver(args.version)
except ValueError as exc:
print(str(exc), file=sys.stderr)
return 1
else:
if args.version:
print("Version argument is only allowed with action 'set'.", file=sys.stderr)
return 1
next_tuple = bump_version(current_tuple, args.action)
next_version = ".".join(str(part) for part in next_tuple)
if next_version == current_version:
print(f"Version unchanged: {current_version}")
return 0
new_line = f"{prefix}{next_version}{suffix}"
line_ending = "\n"
if lines[matched_index].endswith("\r\n"):
line_ending = "\r\n"
lines[matched_index] = f"{new_line}{line_ending}"
if args.dry_run:
print(f"{current_version} -> {next_version} (dry-run)")
return 0
init_file.write_text("".join(lines), encoding="utf-8")
print(f"{current_version} -> {next_version}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
__all__ = ["__version__"]
__version__ = "0.1.0"
"""Core shared utilities for hytop commands."""
from __future__ import annotations
from collections import deque
from typing import Deque, Optional, Protocol
class MetricSample(Protocol):
ts: float
class SlidingHistory:
"""Time-based sliding history for one monitored key."""
def __init__(self, max_window_s: float) -> None:
self.max_window_s = max_window_s
self.samples: Deque[MetricSample] = deque()
def add(self, sample: MetricSample) -> None:
"""Append one sample and prune data outside the max window.
Args:
sample: New sample to append.
"""
self.samples.append(sample)
self._prune(sample.ts)
def _prune(self, now: float) -> None:
"""Drop samples older than the configured max window.
Args:
now: Reference monotonic timestamp.
"""
cutoff = now - self.max_window_s
while self.samples and self.samples[0].ts < cutoff:
self.samples.popleft()
def latest(self) -> Optional[MetricSample]:
"""Return the latest sample if available.
Returns:
The latest sample or None when history is empty.
"""
if not self.samples:
return None
return self.samples[-1]
def avg(self, metric: str, window_s: float, now: float) -> float:
"""Compute average of one metric within a time window.
Args:
metric: Sample attribute name to average.
window_s: Time window in seconds.
now: Reference monotonic timestamp.
Returns:
Window average value, or 0.0 when no value exists in window.
"""
if not self.samples:
return 0.0
cutoff = now - window_s
values = [getattr(s, metric) for s in self.samples if s.ts >= cutoff]
if not values:
return 0.0
return float(sum(values) / len(values))
from __future__ import annotations
import subprocess
from dataclasses import dataclass
@dataclass
class CollectResult:
"""Raw collection output for one host.
Attributes:
host: Host name used for collection.
stdout: Command standard output.
stderr: Command standard error.
error: Normalized error message when collection failed; otherwise None.
"""
host: str
stdout: str
stderr: str
error: str | None = None
def collect_from_host(host: str, ssh_timeout: float, cmd_timeout: float) -> CollectResult:
"""Run hy-smi locally or via SSH and return raw output.
Args:
host: Hostname or localhost alias.
ssh_timeout: SSH connect timeout in seconds.
cmd_timeout: Command timeout in seconds.
Returns:
Raw command output with normalized error information.
"""
local_names = {"localhost", "127.0.0.1", "::1"}
if host in local_names:
cmd = ["hy-smi"]
else:
connect_timeout = max(1, int(round(ssh_timeout)))
cmd = [
"ssh",
"-o",
"BatchMode=yes",
"-o",
f"ConnectTimeout={connect_timeout}",
host,
"hy-smi",
]
try:
proc = subprocess.run(
cmd,
check=False,
capture_output=True,
text=True,
timeout=cmd_timeout,
)
except subprocess.TimeoutExpired:
return CollectResult(
host=host,
stdout="",
stderr="",
error=f"timeout after {cmd_timeout:.1f}s",
)
except OSError as exc:
return CollectResult(host=host, stdout="", stderr="", error=str(exc))
if proc.returncode != 0:
stderr = proc.stderr.strip() or "unknown error"
return CollectResult(
host=host,
stdout=proc.stdout,
stderr=proc.stderr,
error=f"exit {proc.returncode}: {stderr}",
)
return CollectResult(host=host, stdout=proc.stdout, stderr=proc.stderr, error=None)
from __future__ import annotations
import typer
app = typer.Typer(
add_completion=False,
context_settings={"help_option_names": ["-h", "--help"]},
help="CPU monitoring commands.",
)
@app.command("watch")
def watch() -> None:
"""CPU watcher placeholder command."""
typer.echo("cpu watch is not implemented yet")
from __future__ import annotations
from typing import Optional, Set
import typer
from hytop import __version__
from hytop.gpu.service import run_monitor
from hytop.gpu.validators import parse_csv_ints, parse_csv_strings, parse_positive_float
app = typer.Typer(
add_completion=False,
context_settings={"help_option_names": ["-h", "--help"]},
)
def version_callback(value: bool) -> None:
"""Handle Typer eager version option.
Args:
value: Whether version flag was provided.
Raises:
typer.Exit: Raised to terminate command after printing version.
"""
if value:
typer.echo(__version__)
raise typer.Exit()
@app.callback(invoke_without_command=True)
def gpu(
hosts: str = typer.Option(
"localhost",
"--hosts",
"-H",
help="Comma-separated hosts, e.g. node01,node02. Default: localhost",
),
device_filter: str = typer.Option(
"",
"--devices",
"-d",
help="Comma-separated GPU IDs, e.g. 0,1. Default: all visible GPUs",
),
interval: float = typer.Option(
1.0,
"--interval",
"-n",
help="Polling interval in seconds. Default: 1.0",
),
window: float = typer.Option(
5.0,
"--window",
help="Single rolling window in seconds. Default: 5.0",
),
wait_idle: bool = typer.Option(
False,
"--wait-idle",
help="Exit 0 when all monitored GPUs have zero VRAM/HCU avg in the configured window.",
),
timeout: Optional[float] = typer.Option(
None,
"--timeout",
help="Max runtime in seconds. Effective only with --wait-idle.",
),
version: bool = typer.Option(
False,
"--version",
"-v",
callback=version_callback,
is_eager=True,
help="Show version and exit.",
),
) -> None:
"""Run GPU monitor."""
try:
host_list = parse_csv_strings(hosts, "--hosts")
parsed_device_filter: Optional[Set[int]] = None
if device_filter:
parsed_device_filter = set(parse_csv_ints(device_filter, "--devices"))
window_value = parse_positive_float(str(window), "--window")
timeout_value = (
parse_positive_float(str(timeout), "--timeout")
if timeout is not None
else None
)
except ValueError as exc:
typer.echo(f"argument error: {exc}", err=True)
raise typer.Exit(code=2) from exc
code = run_monitor(
hosts=host_list,
device_filter=parsed_device_filter,
window=window_value,
interval=interval,
wait_idle=wait_idle,
timeout=timeout_value,
)
raise typer.Exit(code=code)
def main() -> None:
"""Module entrypoint for direct execution."""
app()
from __future__ import annotations
import threading
from dataclasses import dataclass
from typing import Dict, Optional, Set, Tuple
from hytop.core.history import SlidingHistory
@dataclass
class Sample:
"""One parsed hy-smi sample for a single GPU.
Attributes:
ts: Monotonic timestamp when the sample was captured.
temp_c: GPU temperature in Celsius.
avg_pwr_w: Average power draw in Watts.
vram_pct: VRAM usage percentage.
hcu_pct: HCU usage percentage.
"""
ts: float
temp_c: float
avg_pwr_w: float
vram_pct: float
hcu_pct: float
@dataclass
class NodeResult:
"""Collection result for one host.
Attributes:
host: Host name used for collection.
samples: Parsed samples keyed by GPU id.
error: Error message when collection failed; otherwise None.
"""
host: str
samples: Dict[int, Sample]
error: Optional[str] = None
@dataclass
class HostSnapshot:
"""Latest published collector output for a host.
Attributes:
seq: Monotonic sequence number incremented per publish.
updated_ts: Monotonic timestamp when snapshot was updated.
result: Latest node result from collector; None before first publish.
"""
seq: int = 0
updated_ts: float = 0.0
result: Optional[NodeResult] = None
@dataclass
class MonitorState:
"""In-memory monitor state shared by collectors and render loop.
Attributes:
max_window: Sliding window length in seconds.
histories: Per host+gpu sliding histories.
discovered_keys: Dynamically discovered host+gpu keys.
last_applied_sample_ts: Last sample timestamp applied per host+gpu key.
monitored_keys: Effective monitored keys after filtering/discovery.
errors: Latest host-level collection errors.
host_state: Latest collector snapshots per host.
processed_seq: Last consumed snapshot sequence per host.
state_lock: Lock protecting shared snapshot state.
stop_event: Event signaling collector shutdown.
"""
max_window: float
histories: Dict[Tuple[str, int], SlidingHistory]
discovered_keys: Set[Tuple[str, int]]
last_applied_sample_ts: Dict[Tuple[str, int], float]
monitored_keys: Set[Tuple[str, int]]
errors: Dict[str, str]
host_state: Dict[str, HostSnapshot]
processed_seq: Dict[str, int]
state_lock: threading.Lock
stop_event: threading.Event
from __future__ import annotations
import re
from typing import Dict
from hytop.gpu.models import Sample
ANSI_RE = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]")
def strip_ansi(text: str) -> str:
"""Strip ANSI escape sequences from text.
Args:
text: Input text possibly containing ANSI escapes.
Returns:
Text with ANSI escape sequences removed.
"""
return ANSI_RE.sub("", text)
def parse_number(text: str) -> float:
"""Extract the first numeric token from text.
Args:
text: Input text that contains at least one number.
Returns:
Parsed float value.
Raises:
ValueError: If no number can be found.
"""
match = re.search(r"-?\d+(?:\.\d+)?", text)
if not match:
raise ValueError(f"cannot parse number from {text!r}")
return float(match.group(0))
def parse_hy_smi_output(raw: str, sample_ts: float) -> Dict[int, Sample]:
"""Parse hy-smi stdout text into GPU keyed samples.
Args:
raw: Raw hy-smi stdout text.
sample_ts: Monotonic timestamp assigned to parsed rows.
Returns:
Mapping from GPU id to parsed sample.
"""
cleaned = strip_ansi(raw)
result: Dict[int, Sample] = {}
for line in cleaned.splitlines():
cols = line.strip().split()
if len(cols) < 7 or not cols[0].isdigit():
continue
gpu_id = int(cols[0])
try:
result[gpu_id] = Sample(
ts=sample_ts,
temp_c=parse_number(cols[1]),
avg_pwr_w=parse_number(cols[2]),
vram_pct=parse_number(cols[5]),
hcu_pct=parse_number(cols[6]),
)
except (IndexError, ValueError):
continue
return result
from __future__ import annotations
import time
from typing import Dict, Iterable, List, Tuple
from rich import box
from rich.console import Group
from rich.table import Table
from hytop.core.history import SlidingHistory
def fmt_window(window_s: float) -> str:
"""Format a window duration for display.
Args:
window_s: Window duration in seconds.
Returns:
Human-readable duration string.
"""
return f"{int(window_s)}s" if window_s.is_integer() else f"{window_s:.1f}s"
def fmt_elapsed(elapsed_s: float) -> str:
"""Format elapsed seconds as HH:MM:SS.
Args:
elapsed_s: Elapsed seconds.
Returns:
Formatted time string.
"""
total_seconds = max(0, int(elapsed_s))
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
seconds = total_seconds % 60
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
def build_renderable(
window: float,
hosts: List[str],
histories: Dict[Tuple[str, int], SlidingHistory],
monitored_keys: Iterable[Tuple[str, int]],
errors: Dict[str, str],
poll_interval: float,
elapsed_since_start: float,
) -> Group:
"""Build the Rich renderable for the current monitor frame.
Args:
window: Rolling window length in seconds.
hosts: Host order used for row ordering and error table.
histories: Sliding histories by host+gpu key.
monitored_keys: Effective host+gpu keys to show.
errors: Latest host-level errors.
poll_interval: Configured sampling interval in seconds.
elapsed_since_start: Total runtime in seconds.
Returns:
A Group containing main data table and optional error table.
"""
now = time.monotonic()
key_list = sorted(monitored_keys, key=lambda x: (hosts.index(x[0]), x[1]))
table = Table(
title=f"hy-smi monitor | interval={poll_interval:.2f}s | elapsed={fmt_elapsed(elapsed_since_start)}",
box=box.MINIMAL_HEAVY_HEAD,
expand=True,
)
table.add_column("Host", justify="left", no_wrap=True)
table.add_column("GPU", justify="right")
table.add_column("Temp", justify="right")
table.add_column(f"Temp@{fmt_window(window)}", justify="right")
table.add_column("AvgPwr", justify="right")
table.add_column(f"AvgPwr@{fmt_window(window)}", justify="right")
table.add_column("VRAM%", justify="right")
table.add_column(f"VRAM%@{fmt_window(window)}", justify="right")
table.add_column("HCU%", justify="right")
table.add_column(f"HCU%@{fmt_window(window)}", justify="right")
for key in key_list:
history = histories.get(key)
if history is None:
continue
latest = history.latest()
if latest is None:
continue
host, gpu = key
stale = (now - latest.ts) > window
if stale:
table.add_row(host, str(gpu), "-", "-", "-", "-", "-", "-", "-", "-")
continue
table.add_row(
host,
str(gpu),
f"{latest.temp_c:7.1f}C",
f"{history.avg('temp_c', window, now):7.1f}C",
f"{latest.avg_pwr_w:8.1f}W",
f"{history.avg('avg_pwr_w', window, now):8.1f}W",
f"{latest.vram_pct:7.2f}%",
f"{history.avg('vram_pct', window, now):7.2f}%",
f"{latest.hcu_pct:7.2f}%",
f"{history.avg('hcu_pct', window, now):7.2f}%",
)
if table.row_count == 0:
table.add_row("No data yet.", *[""] * (len(table.columns) - 1))
if not errors:
return Group(table)
err_table = Table(title="Host errors", box=box.MINIMAL_HEAVY_HEAD, expand=True)
err_table.add_column("Host", justify="left", no_wrap=True)
err_table.add_column("Error", justify="left")
for host in hosts:
err = errors.get(host)
if err:
err_table.add_row(host, err)
return Group(table, err_table)
from __future__ import annotations
import sys
import threading
import time
from typing import List, Optional, Set
from rich.console import Console
from rich.live import Live
from hytop.core.history import SlidingHistory
from hytop.core.ssh import collect_from_host
from hytop.gpu.models import HostSnapshot, MonitorState, NodeResult
from hytop.gpu.parser import parse_hy_smi_output
from hytop.gpu.render import build_renderable
def collect_node(host: str, ssh_timeout: float, cmd_timeout: float) -> NodeResult:
"""Collect one host snapshot and parse it into structured samples.
Args:
host: Hostname or localhost alias.
ssh_timeout: SSH connect timeout in seconds.
cmd_timeout: Command timeout in seconds.
Returns:
Normalized collection result for the host.
"""
raw = collect_from_host(host=host, ssh_timeout=ssh_timeout, cmd_timeout=cmd_timeout)
if raw.error:
return NodeResult(host=host, samples={}, error=raw.error)
sample_ts = time.monotonic()
samples = parse_hy_smi_output(raw.stdout, sample_ts=sample_ts)
if not samples:
return NodeResult(host=host, samples={}, error="no gpu rows parsed")
return NodeResult(host=host, samples=samples)
def host_collector_loop(
host: str,
ssh_timeout: float,
cmd_timeout: float,
interval: float,
state: dict[str, HostSnapshot],
state_lock: threading.Lock,
stop_event: threading.Event,
) -> None:
"""Continuously collect one host and publish latest snapshot state.
Args:
host: Hostname to collect.
ssh_timeout: SSH connect timeout in seconds.
cmd_timeout: Command timeout in seconds.
interval: Desired collection interval in seconds.
state: Shared per-host snapshot map.
state_lock: Lock guarding shared state writes.
stop_event: Stop signal for graceful shutdown.
"""
while not stop_event.is_set():
started = time.monotonic()
result = collect_node(host, ssh_timeout, cmd_timeout)
with state_lock:
snapshot = state[host]
snapshot.seq += 1
snapshot.updated_ts = time.monotonic()
snapshot.result = result
sleep_s = max(0.0, interval - (time.monotonic() - started))
if stop_event.wait(sleep_s):
break
def availability_ready(
window: float,
histories: dict[tuple[str, int], SlidingHistory],
monitored_keys: set[tuple[str, int]],
hosts: list[str],
errors: dict[str, str],
) -> bool:
"""Check whether all monitored GPUs satisfy idle availability criteria.
Args:
window: Rolling window length in seconds.
histories: Sliding histories by host+gpu key.
monitored_keys: Effective host+gpu keys to evaluate.
hosts: Host list used for host-level error checks.
errors: Latest host-level errors.
Returns:
True when all monitored GPUs are fresh and window averages are idle.
"""
if not monitored_keys:
return False
now = time.monotonic()
if any(errors.get(host) for host in hosts):
return False
for key in monitored_keys:
history = histories.get(key)
if history is None:
return False
latest = history.latest()
if latest is None or (now - latest.ts) > window:
return False
if history.avg("vram_pct", window, now) != 0.0:
return False
if history.avg("hcu_pct", window, now) != 0.0:
return False
return True
def init_monitor_state(
hosts: List[str],
device_filter: Optional[Set[int]],
max_window: float,
) -> MonitorState:
"""Create initial monitor state for the run.
Args:
hosts: Host list.
device_filter: Optional set of GPU ids to monitor.
max_window: Sliding window length in seconds.
Returns:
Initialized monitor state object.
"""
monitored_keys = (
{(h, d) for h in hosts for d in device_filter} if device_filter else set()
)
return MonitorState(
max_window=max_window,
histories={},
discovered_keys=set(),
last_applied_sample_ts={},
monitored_keys=monitored_keys,
errors={},
host_state={host: HostSnapshot() for host in hosts},
processed_seq={host: 0 for host in hosts},
state_lock=threading.Lock(),
stop_event=threading.Event(),
)
def start_collectors(
hosts: List[str],
ssh_timeout: float,
cmd_timeout: float,
interval: float,
state: MonitorState,
) -> List[threading.Thread]:
"""Start one daemon collector thread per host.
Args:
hosts: Host list.
ssh_timeout: SSH connect timeout in seconds.
cmd_timeout: Command timeout in seconds.
interval: Desired collection interval in seconds.
state: Shared monitor state.
Returns:
Started collector thread list.
"""
workers: List[threading.Thread] = []
for host in hosts:
worker = threading.Thread(
target=host_collector_loop,
args=(
host,
ssh_timeout,
cmd_timeout,
interval,
state.host_state,
state.state_lock,
state.stop_event,
),
daemon=True,
name=f"collector-{host}",
)
worker.start()
workers.append(worker)
return workers
def drain_pending_nodes(hosts: List[str], state: MonitorState) -> List[NodeResult]:
"""Fetch unseen host snapshots since the previous render tick.
Args:
hosts: Host list used to preserve deterministic ordering.
state: Shared monitor state.
Returns:
Newly published node results to apply this tick.
"""
nodes: List[NodeResult] = []
with state.state_lock:
for host in hosts:
snapshot = state.host_state[host]
if snapshot.seq <= state.processed_seq[host]:
continue
state.processed_seq[host] = snapshot.seq
if snapshot.result is not None:
nodes.append(snapshot.result)
return nodes
def apply_node_results(
nodes: List[NodeResult],
device_filter: Optional[Set[int]],
state: MonitorState,
) -> None:
"""Apply collected node results into histories and error state.
Args:
nodes: Newly collected node results.
device_filter: Optional GPU id filter.
state: Shared monitor state.
"""
for node in nodes:
if node.error:
state.errors[node.host] = node.error
continue
state.errors.pop(node.host, None)
for gpu_id, sample in node.samples.items():
key = (node.host, gpu_id)
state.discovered_keys.add(key)
if device_filter is not None and gpu_id not in device_filter:
continue
history = state.histories.get(key)
if history is None:
history = SlidingHistory(max_window_s=state.max_window)
state.histories[key] = history
last_ts = state.last_applied_sample_ts.get(key)
if last_ts is not None and sample.ts <= last_ts:
continue
history.add(sample)
state.last_applied_sample_ts[key] = sample.ts
def run_monitor(
hosts: List[str],
device_filter: Optional[Set[int]],
window: float,
interval: float,
wait_idle: bool,
timeout: Optional[float],
) -> int:
"""Run the asynchronous collector + periodic renderer monitor loop.
Args:
hosts: Host list to monitor.
device_filter: Optional GPU id filter.
window: Rolling window length in seconds.
interval: Sampling interval in seconds.
wait_idle: Whether to exit when all monitored GPUs become idle.
timeout: Optional timeout for wait-idle mode.
Returns:
Process-style exit code:
0 for success,
2 for invalid arguments,
124 for timeout in wait-idle mode,
130 when interrupted by user.
"""
console = Console()
err_console = Console(stderr=True)
if interval <= 0:
print("argument error: --interval must be > 0", file=sys.stderr)
return 2
if interval > window:
print("argument error: --interval must be <= --window value", file=sys.stderr)
return 2
state = init_monitor_state(
hosts=hosts, device_filter=device_filter, max_window=window
)
ssh_timeout = min(max(5 * interval, 2.0), 5.0)
cmd_timeout = min(max(10 * interval, 5.0), 10.0)
render_interval = min(interval, 0.5)
started = time.monotonic()
try:
with Live(console=console, auto_refresh=False, screen=True) as live:
workers = start_collectors(
hosts=hosts,
ssh_timeout=ssh_timeout,
cmd_timeout=cmd_timeout,
interval=interval,
state=state,
)
try:
while True:
loop_started = time.monotonic()
apply_node_results(
nodes=drain_pending_nodes(hosts=hosts, state=state),
device_filter=device_filter,
state=state,
)
if device_filter is None:
state.monitored_keys = state.discovered_keys.copy()
live.update(
build_renderable(
window=window,
hosts=hosts,
histories=state.histories,
monitored_keys=state.monitored_keys,
errors=state.errors,
poll_interval=interval,
elapsed_since_start=time.monotonic() - started,
),
refresh=True,
)
elapsed_since_start = time.monotonic() - started
warmup_done = elapsed_since_start >= state.max_window
if (
wait_idle
and warmup_done
and availability_ready(
window=window,
histories=state.histories,
monitored_keys=state.monitored_keys,
hosts=hosts,
errors=state.errors,
)
):
console.print(
"status: success (all monitored GPUs are available)"
)
return 0
if (
wait_idle
and timeout is not None
and elapsed_since_start >= timeout
):
err_console.print(
"status: timeout (availability condition not met)",
style="yellow",
)
return 124
time.sleep(
max(0.0, render_interval - (time.monotonic() - loop_started))
)
finally:
state.stop_event.set()
for worker in workers:
worker.join(timeout=min(0.2, interval))
except KeyboardInterrupt:
err_console.print("status: interrupted by user", style="yellow")
return 130
from __future__ import annotations
from typing import List
def parse_csv_ints(value: str, flag: str) -> List[int]:
"""Parse a non-empty comma-separated integer list.
Args:
value: Raw option value.
flag: Option name used in error messages.
Returns:
Parsed integer list.
Raises:
ValueError: If list is empty or contains non-integer tokens.
"""
out: List[int] = []
for token in value.split(","):
item = token.strip()
if not item:
continue
if not item.isdigit():
raise ValueError(f"{flag} contains non-integer token: {item}")
out.append(int(item))
if not out:
raise ValueError(f"{flag} cannot be empty")
return out
def parse_csv_strings(value: str, flag: str) -> List[str]:
"""Parse a non-empty comma-separated string list.
Args:
value: Raw option value.
flag: Option name used in error messages.
Returns:
Parsed string list.
Raises:
ValueError: If list is empty after trimming.
"""
out = [x.strip() for x in value.split(",") if x.strip()]
if not out:
raise ValueError(f"{flag} cannot be empty")
return out
def parse_positive_float(value: str, flag: str) -> float:
"""Parse a strictly positive float option value.
Args:
value: Raw option value.
flag: Option name used in error messages.
Returns:
Parsed positive float.
Raises:
ValueError: If value is non-numeric or not positive.
"""
try:
number = float(value)
except ValueError as exc:
raise ValueError(f"{flag} contains non-numeric value: {value}") from exc
if number <= 0:
raise ValueError(f"{flag} requires a positive value: {value}")
return number
import typer
from hytop.cpu.cli import app as cpu_app
from hytop.gpu.cli import app as gpu_app
app = typer.Typer(help="hytop toolkit command line")
app.add_typer(cpu_app, name="cpu")
app.add_typer(gpu_app, name="gpu")
def main() -> None:
app()
if __name__ == "__main__":
main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment