"""Tests for hytop.gpu.service — business logic (collect_from_host mocked).""" from __future__ import annotations import json import time from unittest.mock import patch from hytop.core.history import SlidingHistory from hytop.core.ssh import CollectResult from hytop.gpu.models import MonitorState, NodeResult, Sample from hytop.gpu.service import ( apply_node_results, availability_ready, collect_node, drain_pending_nodes, init_monitor_state, ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- HY_SMI_FULL_JSON = json.dumps( { "card0": { "Average Graphics Package Power (W)": "157.0", "Temperature (Sensor core) (C)": "30.0", "HCU use (%)": "0.0", "HCU memory use (%)": "89", "sclk clock speed": "1500Mhz", }, "card7": { "Average Graphics Package Power (W)": "141.0", "Temperature (Sensor core) (C)": "25.0", "HCU use (%)": "100.0", "HCU memory use (%)": "89", "sclk clock speed": "1500Mhz", }, } ) def _state(hosts=("localhost",), device_filter=None, max_window=10.0) -> MonitorState: return init_monitor_state(hosts=list(hosts), device_filter=device_filter, max_window=max_window) def _sample(ts: float, hcu_pct: float = 0.0, vram_pct: float = 0.0) -> Sample: return Sample(ts=ts, hcu_pct=hcu_pct, vram_pct=vram_pct) # --------------------------------------------------------------------------- # collect_node # --------------------------------------------------------------------------- class TestCollectNode: @patch("hytop.gpu.service.collect_from_host") def test_success_returns_samples(self, mock_collect): mock_collect.return_value = CollectResult( host="localhost", stdout=HY_SMI_FULL_JSON, stderr="" ) result = collect_node("localhost", ssh_timeout=5, cmd_timeout=10, hy_smi_args=["--json"]) assert result.error is None assert set(result.samples.keys()) == {0, 7} @patch("hytop.gpu.service.collect_from_host") def test_host_error_propagated(self, mock_collect): mock_collect.return_value = CollectResult( host="node01", stdout="", stderr="", error="timeout after 10.0s" ) result = collect_node("node01", ssh_timeout=5, cmd_timeout=10, hy_smi_args=["--json"]) assert result.error is not None assert "timeout" in result.error @patch("hytop.gpu.service.collect_from_host") def test_empty_output_yields_error(self, mock_collect): mock_collect.return_value = CollectResult(host="localhost", stdout="", stderr="") result = collect_node("localhost", ssh_timeout=5, cmd_timeout=10, hy_smi_args=["--json"]) assert result.error is not None # --------------------------------------------------------------------------- # apply_node_results # --------------------------------------------------------------------------- class TestApplyNodeResults: def test_successful_node_adds_to_history(self): state = _state(hosts=["localhost"]) sample = _sample(ts=1.0, hcu_pct=50.0) node = NodeResult(host="localhost", samples={0: sample}) apply_node_results([node], device_filter=None, state=state) assert ("localhost", 0) in state.histories def test_error_node_sets_error(self): state = _state(hosts=["localhost"]) node = NodeResult(host="localhost", samples={}, error="connection refused") apply_node_results([node], device_filter=None, state=state) assert state.errors["localhost"] == "connection refused" def test_device_filter_excludes_other_gpus(self): state = _state(hosts=["localhost"], device_filter={0}) s0 = _sample(ts=1.0) s1 = _sample(ts=1.0) node = NodeResult(host="localhost", samples={0: s0, 1: s1}) apply_node_results([node], device_filter={0}, state=state) assert ("localhost", 0) in state.histories assert ("localhost", 1) not in state.histories def test_success_clears_previous_error(self): state = _state(hosts=["localhost"]) state.errors["localhost"] = "old error" node = NodeResult(host="localhost", samples={0: _sample(ts=1.0)}) apply_node_results([node], device_filter=None, state=state) assert "localhost" not in state.errors def test_duplicate_sample_not_added(self): state = _state(hosts=["localhost"]) sample = _sample(ts=5.0) node = NodeResult(host="localhost", samples={0: sample}) apply_node_results([node], device_filter=None, state=state) apply_node_results([node], device_filter=None, state=state) # same ts assert len(state.histories[("localhost", 0)].samples) == 1 # --------------------------------------------------------------------------- # availability_ready # --------------------------------------------------------------------------- class TestAvailabilityReady: def _make_history(self, hcu_pct: float, vram_pct: float) -> SlidingHistory: """Build a SlidingHistory with one fresh sample using real monotonic time.""" h = SlidingHistory(max_window_s=30) h.add(_sample(ts=time.monotonic(), hcu_pct=hcu_pct, vram_pct=vram_pct)) return h def test_idle_gpu_returns_true(self): key = ("localhost", 0) histories = {key: self._make_history(hcu_pct=0.0, vram_pct=0.0)} assert availability_ready( window=5.0, histories=histories, monitored_keys={key}, hosts=["localhost"], errors={}, ) def test_busy_gpu_returns_false(self): key = ("localhost", 0) histories = {key: self._make_history(hcu_pct=100.0, vram_pct=89.0)} assert not availability_ready( window=5.0, histories=histories, monitored_keys={key}, hosts=["localhost"], errors={}, ) def test_host_error_returns_false(self): key = ("localhost", 0) histories = {key: self._make_history(hcu_pct=0.0, vram_pct=0.0)} assert not availability_ready( window=5.0, histories=histories, monitored_keys={key}, hosts=["localhost"], errors={"localhost": "connection refused"}, ) def test_empty_monitored_keys_returns_false(self): assert not availability_ready( window=5.0, histories={}, monitored_keys=set(), hosts=["localhost"], errors={}, ) def test_missing_history_returns_false(self): key = ("localhost", 0) assert not availability_ready( window=5.0, histories={}, # no history for this key monitored_keys={key}, hosts=["localhost"], errors={}, ) # --------------------------------------------------------------------------- # drain_pending_nodes # --------------------------------------------------------------------------- class TestDrainPendingNodes: def test_drains_new_result(self): state = _state(hosts=["localhost"]) result = NodeResult(host="localhost", samples={}) with state.state_lock: snap = state.host_state["localhost"] snap.seq = 1 snap.result = result nodes = drain_pending_nodes(["localhost"], state) assert len(nodes) == 1 assert nodes[0] is result def test_does_not_drain_already_processed(self): state = _state(hosts=["localhost"]) state.processed_seq["localhost"] = 1 with state.state_lock: snap = state.host_state["localhost"] snap.seq = 1 snap.result = NodeResult(host="localhost", samples={}) nodes = drain_pending_nodes(["localhost"], state) assert nodes == []