Commit 3fbc8b8f authored by one's avatar one
Browse files

[hytop] Improve rate calculation and binary prefixes

- Add IEC binary prefixes
- Improve remote collect script
parent fb3ef5fe
from __future__ import annotations
def fmt_window(window_s: float) -> str:
"""Format a window duration for display.
Args:
window_s: Window duration in seconds.
Returns:
Human-readable duration string.
"""
return f"{int(window_s)}s" if window_s.is_integer() else f"{window_s:.1f}s"
def fmt_elapsed(elapsed_s: float) -> str:
"""Format elapsed seconds as HH:MM:SS.
Args:
elapsed_s: Elapsed seconds.
Returns:
Formatted time string.
"""
total_seconds = max(0, int(elapsed_s))
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
seconds = total_seconds % 60
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
...@@ -7,40 +7,11 @@ from rich import box ...@@ -7,40 +7,11 @@ from rich import box
from rich.console import Group from rich.console import Group
from rich.table import Table from rich.table import Table
from hytop.core.format import fmt_elapsed, fmt_window
from hytop.core.history import SlidingHistory from hytop.core.history import SlidingHistory
from hytop.gpu.metrics import render_columns_for_show_flags from hytop.gpu.metrics import render_columns_for_show_flags
def fmt_window(window_s: float) -> str:
"""Format a window duration for display.
Args:
window_s: Window duration in seconds.
Returns:
Human-readable duration string.
"""
return f"{int(window_s)}s" if window_s.is_integer() else f"{window_s:.1f}s"
def fmt_elapsed(elapsed_s: float) -> str:
"""Format elapsed seconds as HH:MM:SS.
Args:
elapsed_s: Elapsed seconds.
Returns:
Formatted time string.
"""
total_seconds = max(0, int(elapsed_s))
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
seconds = total_seconds % 60
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
def build_renderable( def build_renderable(
window: float, window: float,
hosts: list[str], hosts: list[str],
......
# This script is shipped verbatim to remote hosts via SSH (python3 -c).
# It must be fully self-contained: no imports from hytop.* are allowed.
#
# IMPORTANT: The caller prepends the following line before running this script:
# INPUT_JSON='{"kind_filter": [...], "include": [...]}'
#
# Keep the collection logic in sync with collect_local_counters() in collector.py.
import json
import re
import subprocess
from pathlib import Path
vprefix = ("veth", "docker", "cni", "flannel", "virbr", "br-", "tunl")
def read_int(path):
try:
return int(path.read_text().strip())
except (OSError, ValueError):
return None
def read_text(path):
try:
return path.read_text().strip()
except OSError:
return None
def normalize_link_layer(value):
if not value:
return "ib"
text = value.strip().lower()
return "eth" if text.startswith("ethernet") else "ib"
def parse_ibdev2netdev_output(stdout):
mapping = {}
pattern = re.compile(
r"^(?P<dev>\S+)\s+port\s+(?P<port>\d+)\s+==>\s+(?P<netdev>\S+)\s+\((?P<state>[^)]+)\)$"
)
for line in stdout.splitlines():
m = pattern.match(line.strip())
if not m:
continue
dev = m.group("dev")
port = m.group("port")
netdev = m.group("netdev")
state = m.group("state").strip().lower()
mapping[(dev, port)] = {"netdev": netdev, "state": state}
return mapping
def get_ibdev2netdev_mapping():
try:
proc = subprocess.run(
["ibdev2netdev"],
check=False,
capture_output=True,
text=True,
timeout=2.0,
)
except (OSError, subprocess.TimeoutExpired):
return {}
if proc.returncode != 0:
return {}
return parse_ibdev2netdev_output(proc.stdout)
def is_ib_netdev(iface_path):
type_text = read_text(iface_path / "type")
return type_text == "32"
def want(name, include):
if include and name not in include:
return False
return not (name == "lo" or name.startswith(vprefix))
def collect(kind_filter, include):
out = []
ib_map = get_ibdev2netdev_mapping()
ib_netdevs = {item["netdev"] for item in ib_map.values()}
if "eth" in kind_filter:
for p in Path("/sys/class/net").iterdir():
if not p.is_dir():
continue
name = p.name
if name in ib_netdevs:
continue
if is_ib_netdev(p):
continue
if not want(name, include):
continue
rx = read_int(p / "statistics" / "rx_bytes")
tx = read_int(p / "statistics" / "tx_bytes")
if rx is None or tx is None:
continue
out.append(
{
"kind": "eth",
"name": name,
"rx_bytes": rx,
"tx_bytes": tx,
"link_state": (read_text(p / "operstate") or "unknown").lower(),
}
)
# Always scan /sys/class/infiniband even for eth-only filters:
# IB ports operating in RoCE mode report link_layer=Ethernet and are
# classified as "eth" by normalize_link_layer(). The mode check below
# filters them out if they don't match kind_filter.
if "ib" in kind_filter or "eth" in kind_filter:
base = Path("/sys/class/infiniband")
if base.exists():
for dev in base.iterdir():
ports = dev / "ports"
if not ports.is_dir():
continue
for port in ports.iterdir():
pname = f"{dev.name}/p{port.name}"
mapped = ib_map.get((dev.name, port.name))
mapped_netdev = mapped["netdev"] if mapped is not None else None
mode = normalize_link_layer(read_text(port / "link_layer"))
if mode not in kind_filter:
continue
candidate_name = mapped_netdev or pname
if not want(candidate_name, include):
continue
if include and pname in include:
pass
elif include and candidate_name not in include:
continue
rxw = read_int(port / "counters" / "port_rcv_data")
txw = read_int(port / "counters" / "port_xmit_data")
if rxw is None or txw is None:
continue
out.append(
{
"kind": mode,
"name": candidate_name,
"rx_bytes": rxw * 4,
"tx_bytes": txw * 4,
"link_state": (
mapped["state"]
if mapped is not None
else (read_text(port / "state") or "unknown")
),
"device_name": pname,
}
)
return out
payload = json.loads(INPUT_JSON) # noqa: F821 # injected by caller
res = {
"counters": collect(
kind_filter=set(payload.get("kind_filter", ["eth", "ib"])),
include=set(payload.get("include", [])),
)
}
print(json.dumps(res, separators=(",", ":")))
...@@ -25,6 +25,11 @@ def net( ...@@ -25,6 +25,11 @@ def net(
"--ifaces", "--ifaces",
help="Comma-separated interface names to include.", help="Comma-separated interface names to include.",
), ),
iec: bool = typer.Option(
False,
"--iec",
help="Use IEC binary prefixes KiB/MiB/GiB instead of SI prefixes kB/MB/GB.",
),
) -> None: ) -> None:
"""Network monitoring commands.""" """Network monitoring commands."""
if ctx.obj is None: if ctx.obj is None:
...@@ -46,5 +51,6 @@ def net( ...@@ -46,5 +51,6 @@ def net(
window=window_value, window=window_value,
interval=interval, interval=interval,
timeout=timeout_value, timeout=timeout_value,
iec=iec,
) )
raise typer.Exit(code=code) raise typer.Exit(code=code)
...@@ -18,155 +18,10 @@ VIRTUAL_PREFIXES: Final[tuple[str, ...]] = ( ...@@ -18,155 +18,10 @@ VIRTUAL_PREFIXES: Final[tuple[str, ...]] = (
"tunl", "tunl",
) )
REMOTE_COLLECTOR_PY: Final[str] = r""" # Remote collector script shipped verbatim to remote hosts via SSH.
import json # Stored as a standalone file so it gets syntax highlighting and linting.
import re # Keep _remote_collect.py in sync with collect_local_counters() below.
import subprocess REMOTE_COLLECTOR_PY: Final[str] = (Path(__file__).parent / "_remote_collect.py").read_text()
from pathlib import Path
vprefix = ("veth", "docker", "cni", "flannel", "virbr", "br-", "tunl")
def read_int(path):
try:
return int(path.read_text().strip())
except (OSError, ValueError):
return None
def read_text(path):
try:
return path.read_text().strip()
except OSError:
return None
def normalize_link_layer(value):
if not value:
return "ib"
text = value.strip().lower()
return "eth" if text.startswith("ethernet") else "ib"
def parse_ibdev2netdev_output(stdout):
mapping = {}
pattern = re.compile(
r"^(?P<dev>\S+)\s+port\s+(?P<port>\d+)\s+==>\s+(?P<netdev>\S+)\s+\((?P<state>[^)]+)\)$"
)
for line in stdout.splitlines():
m = pattern.match(line.strip())
if not m:
continue
dev = m.group("dev")
port = m.group("port")
netdev = m.group("netdev")
state = m.group("state").strip().lower()
mapping[(dev, port)] = {"netdev": netdev, "state": state}
return mapping
def get_ibdev2netdev_mapping():
try:
proc = subprocess.run(
["ibdev2netdev"],
check=False,
capture_output=True,
text=True,
timeout=2.0,
)
except (OSError, subprocess.TimeoutExpired):
return {}
if proc.returncode != 0:
return {}
return parse_ibdev2netdev_output(proc.stdout)
def is_ib_netdev(iface_path):
type_text = read_text(iface_path / "type")
return type_text == "32"
def want(name, include):
if include and name not in include:
return False
if name == "lo" or name.startswith(vprefix):
return False
return True
def collect(kind_filter, include):
out = []
ib_map = get_ibdev2netdev_mapping()
ib_netdevs = {item["netdev"] for item in ib_map.values()}
if "eth" in kind_filter:
for p in Path("/sys/class/net").iterdir():
if not p.is_dir():
continue
name = p.name
if name in ib_netdevs:
continue
if is_ib_netdev(p):
continue
if not want(name, include):
continue
rx = read_int(p / "statistics" / "rx_bytes")
tx = read_int(p / "statistics" / "tx_bytes")
if rx is None or tx is None:
continue
out.append(
{
"kind": "eth",
"name": name,
"rx_bytes": rx,
"tx_bytes": tx,
"link_state": (read_text(p / "operstate") or "unknown").lower(),
}
)
if ("ib" in kind_filter) or ("eth" in kind_filter):
base = Path("/sys/class/infiniband")
if base.exists():
for dev in base.iterdir():
ports = dev / "ports"
if not ports.is_dir():
continue
for port in ports.iterdir():
pname = f"{dev.name}/p{port.name}"
mapped = ib_map.get((dev.name, port.name))
mapped_netdev = mapped["netdev"] if mapped is not None else None
mode = normalize_link_layer(read_text(port / "link_layer"))
if mode not in kind_filter:
continue
candidate_name = mapped_netdev or pname
if not want(candidate_name, include):
continue
if include and pname in include:
pass
elif include and candidate_name not in include:
continue
rxw = read_int(port / "counters" / "port_rcv_data")
txw = read_int(port / "counters" / "port_xmit_data")
if rxw is None or txw is None:
continue
out.append(
{
"kind": mode,
"name": candidate_name,
"rx_bytes": rxw * 4,
"tx_bytes": txw * 4,
"link_state": (
mapped["state"]
if mapped is not None
else (read_text(port / "state") or "unknown")
),
"device_name": pname,
}
)
return out
payload = json.loads(INPUT_JSON)
res = {
"counters": collect(
kind_filter=set(payload.get("kind_filter", ["eth", "ib"])),
include=set(payload.get("include", [])),
)
}
print(json.dumps(res, separators=(",", ":")))
""".strip()
def parse_kind_filter(kind: str) -> set[NetKind]: def parse_kind_filter(kind: str) -> set[NetKind]:
...@@ -307,7 +162,11 @@ def collect_local_counters( ...@@ -307,7 +162,11 @@ def collect_local_counters(
) )
counters[counter.key] = counter counters[counter.key] = counter
if ("ib" in kind_filter) or ("eth" in kind_filter): # Always scan /sys/class/infiniband even for eth-only filters:
# IB ports operating in RoCE mode report link_layer=Ethernet and are
# classified as "eth" by _normalize_link_layer(). The mode check below
# filters them out if they don't match kind_filter.
if "ib" in kind_filter or "eth" in kind_filter:
ib_root = Path("/sys/class/infiniband") ib_root = Path("/sys/class/infiniband")
if ib_root.exists(): if ib_root.exists():
for hca in ib_root.iterdir(): for hca in ib_root.iterdir():
......
...@@ -57,6 +57,7 @@ class MonitorState: ...@@ -57,6 +57,7 @@ class MonitorState:
monitored_keys: set[tuple[str, str]] monitored_keys: set[tuple[str, str]]
latest_counter_by_key: dict[tuple[str, str], NetCounter] latest_counter_by_key: dict[tuple[str, str], NetCounter]
previous_counter_by_key: dict[tuple[str, str], NetCounter] previous_counter_by_key: dict[tuple[str, str], NetCounter]
previous_sample_ts_by_key: dict[tuple[str, str], float]
errors: dict[str, str] errors: dict[str, str]
host_state: dict[str, HostSnapshot] host_state: dict[str, HostSnapshot]
processed_seq: dict[str, int] processed_seq: dict[str, int]
......
...@@ -6,17 +6,22 @@ from rich import box ...@@ -6,17 +6,22 @@ from rich import box
from rich.console import Group from rich.console import Group
from rich.table import Table from rich.table import Table
from hytop.core.format import fmt_elapsed, fmt_window
from hytop.core.history import SlidingHistory from hytop.core.history import SlidingHistory
from hytop.gpu.render import fmt_elapsed, fmt_window
from hytop.net.models import NetCounter, RateSample from hytop.net.models import NetCounter, RateSample
def format_rate(value: float) -> str: def format_rate(value: float, iec: bool = False) -> str:
units = ["B/s", "KB/s", "MB/s", "GB/s", "TB/s"] if iec:
units = ["B/s", "KiB/s", "MiB/s", "GiB/s", "TiB/s"]
base = 1024.0
else:
units = ["B/s", "kB/s", "MB/s", "GB/s", "TB/s"]
base = 1000.0
output = float(value) output = float(value)
idx = 0 idx = 0
while output >= 1000.0 and idx < len(units) - 1: while output >= base and idx < len(units) - 1:
output /= 1000.0 output /= base
idx += 1 idx += 1
return f"{output:7.2f} {units[idx]}" return f"{output:7.2f} {units[idx]}"
...@@ -47,6 +52,7 @@ def build_renderable( ...@@ -47,6 +52,7 @@ def build_renderable(
errors: dict[str, str], errors: dict[str, str],
poll_interval: float, poll_interval: float,
elapsed_since_start: float, elapsed_since_start: float,
iec: bool = False,
) -> Group: ) -> Group:
now = time.monotonic() now = time.monotonic()
host_rank = {host: idx for idx, host in enumerate(hosts)} host_rank = {host: idx for idx, host in enumerate(hosts)}
...@@ -91,10 +97,10 @@ def build_renderable( ...@@ -91,10 +97,10 @@ def build_renderable(
mode, mode,
device_text, device_text,
nic_text, nic_text,
format_rate(latest_rate.rx_bps), format_rate(latest_rate.rx_bps, iec),
format_rate(latest_rate.tx_bps), format_rate(latest_rate.tx_bps, iec),
format_rate(rx_avg), format_rate(rx_avg, iec),
format_rate(tx_avg), format_rate(tx_avg, iec),
) )
if table.row_count == 0: if table.row_count == 0:
......
...@@ -112,6 +112,7 @@ def init_monitor_state(hosts: list[str], max_window: float) -> MonitorState: ...@@ -112,6 +112,7 @@ def init_monitor_state(hosts: list[str], max_window: float) -> MonitorState:
monitored_keys=set(), monitored_keys=set(),
latest_counter_by_key={}, latest_counter_by_key={},
previous_counter_by_key={}, previous_counter_by_key={},
previous_sample_ts_by_key={},
errors={}, errors={},
host_state={host: HostSnapshot() for host in hosts}, host_state={host: HostSnapshot() for host in hosts},
processed_seq={host: 0 for host in hosts}, processed_seq={host: 0 for host in hosts},
...@@ -181,13 +182,20 @@ def apply_node_results( ...@@ -181,13 +182,20 @@ def apply_node_results(
state.latest_counter_by_key[key] = counter state.latest_counter_by_key[key] = counter
prev = state.previous_counter_by_key.get(key) prev = state.previous_counter_by_key.get(key)
prev_ts = state.previous_sample_ts_by_key.get(key)
state.previous_counter_by_key[key] = counter state.previous_counter_by_key[key] = counter
if prev is None: state.previous_sample_ts_by_key[key] = node.sample_ts
if prev is None or prev_ts is None:
continue continue
delta_rx = counter.rx_bytes - prev.rx_bytes delta_rx = counter.rx_bytes - prev.rx_bytes
delta_tx = counter.tx_bytes - prev.tx_bytes delta_tx = counter.tx_bytes - prev.tx_bytes
if delta_rx < 0 or delta_tx < 0: if delta_rx < 0 or delta_tx < 0:
continue continue
# Use actual elapsed time between samples rather than the configured
# interval, so SSH latency spikes don't inflate the rate.
actual_delta = node.sample_ts - prev_ts
if actual_delta <= 0:
continue
history = state.histories.get(key) history = state.histories.get(key)
if history is None: if history is None:
...@@ -196,8 +204,8 @@ def apply_node_results( ...@@ -196,8 +204,8 @@ def apply_node_results(
history.add( history.add(
RateSample( RateSample(
ts=node.sample_ts, ts=node.sample_ts,
rx_bps=delta_rx / interval, rx_bps=delta_rx / actual_delta,
tx_bps=delta_tx / interval, tx_bps=delta_tx / actual_delta,
) )
) )
...@@ -209,6 +217,7 @@ def run_monitor( ...@@ -209,6 +217,7 @@ def run_monitor(
window: float, window: float,
interval: float, interval: float,
timeout: float | None, timeout: float | None,
iec: bool = False,
ssh_options: SSHOptions | None = None, ssh_options: SSHOptions | None = None,
) -> int: ) -> int:
if interval <= 0: if interval <= 0:
...@@ -257,6 +266,7 @@ def run_monitor( ...@@ -257,6 +266,7 @@ def run_monitor(
errors=state.errors, errors=state.errors,
poll_interval=interval, poll_interval=interval,
elapsed_since_start=time.monotonic() - started, elapsed_since_start=time.monotonic() - started,
iec=iec,
), ),
refresh=True, refresh=True,
) )
...@@ -270,3 +280,4 @@ def run_monitor( ...@@ -270,3 +280,4 @@ def run_monitor(
except KeyboardInterrupt: except KeyboardInterrupt:
err_console.print("status: interrupted by user", style="yellow") err_console.print("status: interrupted by user", style="yellow")
return 130 return 130
return 0
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
from __future__ import annotations from __future__ import annotations
import pytest
from hytop.net.collector import ( from hytop.net.collector import (
_normalize_ib_port_state, _normalize_ib_port_state,
_parse_ibdev2netdev_output, _parse_ibdev2netdev_output,
...@@ -56,12 +58,8 @@ class TestParseCounterPayload: ...@@ -56,12 +58,8 @@ class TestParseCounterPayload:
assert result["ib:mlx5_0/p1"].link_state == "active" assert result["ib:mlx5_0/p1"].link_state == "active"
def test_invalid_json_raises(self): def test_invalid_json_raises(self):
try: with pytest.raises(ValueError, match="invalid collector payload"):
parse_counter_payload("{") parse_counter_payload("{")
except ValueError as exc:
assert "invalid collector payload" in str(exc)
else:
raise AssertionError("expected ValueError")
class TestNormalizeIbState: class TestNormalizeIbState:
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
from __future__ import annotations from __future__ import annotations
from hytop.net.render import format_iface_name from hytop.net.render import format_iface_name, format_rate, split_iface_key
class TestFormatIfaceName: class TestFormatIfaceName:
...@@ -14,3 +14,43 @@ class TestFormatIfaceName: ...@@ -14,3 +14,43 @@ class TestFormatIfaceName:
def test_mark_init_state(self): def test_mark_init_state(self):
assert format_iface_name("mlx5_0/p1", "init") == "mlx5_0/p1 (down)" assert format_iface_name("mlx5_0/p1", "init") == "mlx5_0/p1 (down)"
class TestFormatRate:
def test_zero(self):
assert format_rate(0.0) == " 0.00 B/s"
def test_bytes(self):
assert format_rate(512.0) == " 512.00 B/s"
def test_kilobytes(self):
result = format_rate(1500.0)
assert "kB/s" in result
def test_megabytes(self):
result = format_rate(2_000_000.0)
assert "MB/s" in result
def test_gigabytes(self):
result = format_rate(3_000_000_000.0)
assert "GB/s" in result
def test_iec_kibibytes(self):
result = format_rate(2048.0, iec=True)
assert "KiB/s" in result
def test_iec_mebibytes(self):
result = format_rate(2 * 1024 * 1024.0, iec=True)
assert "MiB/s" in result
def test_iec_gibibytes(self):
result = format_rate(2 * 1024**3, iec=True)
assert "GiB/s" in result
class TestSplitIfaceKey:
def test_eth(self):
assert split_iface_key("eth:eth0") == ("eth", "eth0")
def test_ib(self):
assert split_iface_key("ib:mlx5_0/p1") == ("ib", "mlx5_0/p1")
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment