"""Unit tests for SystemHealthCheck.""" from __future__ import annotations from pathlib import Path from unittest.mock import AsyncMock, MagicMock import pytest from ha_diag.checks.system_health import SystemHealthCheck, _extract_component_statuses from ha_diag.config import Settings from ha_diag.models import HAEventType, Severity from ha_diag.storage import Storage # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_settings(**overrides) -> Settings: defaults: dict = { "ha_url": "http://test.local:8123", "ha_token": "test", "node_name": "test-node", "location_tag": "test-loc", "alert_cooldown_hours": 0.0, "check_interval": 60, "check_interval_unavailable": 3600, } defaults.update(overrides) return Settings(**defaults) def _make_client(health=None, error=None): client = MagicMock() if error: client.get_system_health = AsyncMock(side_effect=error) else: client.get_system_health = AsyncMock(return_value=health or {}) return client def _ok_response(*components: str) -> dict: return {c: {"type": "result", "data": {"ok": True}} for c in components} def _error_response(*components: str) -> dict: return {c: {"type": "error", "error": f"{c} failed"} for c in components} # --------------------------------------------------------------------------- # _extract_component_statuses unit tests # --------------------------------------------------------------------------- def test_extract_typed_result_format(): data = {"recorder": {"type": "result", "data": {"backlog": 0}}} result = _extract_component_statuses(data) assert result["recorder"]["status"] == "ok" assert result["recorder"]["details"] == {"backlog": 0} def test_extract_typed_error_format(): data = {"cloud": {"type": "error", "error": "Connection refused"}} result = _extract_component_statuses(data) assert result["cloud"]["status"] == "error" assert "Connection refused" in result["cloud"]["details"]["error"] def test_extract_legacy_error_field(): data = {"cloud": {"error": "Timeout"}} result = _extract_component_statuses(data) assert result["cloud"]["status"] == "error" def test_extract_nested_checks_format(): data = { "info": {"version": "2024.12.0"}, "checks": { "homeassistant": {"type": "result", "data": {}}, "recorder": {"type": "error", "error": "DB locked"}, }, } result = _extract_component_statuses(data) assert "homeassistant" not in result or result.get("homeassistant", {}).get("status") == "ok" assert result["recorder"]["status"] == "error" assert "info" not in result def test_extract_plain_dict_treated_as_ok(): data = {"homeassistant": {"version": "2024.12.0", "docker": True}} result = _extract_component_statuses(data) assert result["homeassistant"]["status"] == "ok" def test_extract_non_dict_value_skipped(): data = {"scalar_component": "just-a-string"} result = _extract_component_statuses(data) assert "scalar_component" not in result # --------------------------------------------------------------------------- # SystemHealthCheck run() tests # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_first_run_no_snapshot_no_event_for_ok(storage: Storage): """All components ok on first run — record snapshots, emit nothing.""" check = SystemHealthCheck(_make_client(_ok_response("homeassistant", "recorder")), storage, _make_settings()) results = await check.run() assert results == [] snap = await storage.get_system_health_snapshot("homeassistant") assert snap is not None assert snap["last_status"] == "ok" @pytest.mark.asyncio async def test_first_run_error_component_emits_event(storage: Storage): """Component in error on first run (no prior snapshot) → ha_system_health_degraded.""" check = SystemHealthCheck(_make_client(_error_response("cloud")), storage, _make_settings()) results = await check.run() assert len(results) == 1 r = results[0] assert r.event_type == HAEventType.ha_system_health_degraded assert r.payload["component"] == "cloud" assert r.payload["previous_status"] == "unknown" assert r.payload["current_status"] == "error" assert r.severity == Severity.warning @pytest.mark.asyncio async def test_ok_to_error_transition_emits_event(storage: Storage): """Component transitions ok → error → event fired.""" client_ok = _make_client(_ok_response("cloud")) client_err = _make_client(_error_response("cloud")) settings = _make_settings() await SystemHealthCheck(client_ok, storage, settings).run() results = await SystemHealthCheck(client_err, storage, settings).run() assert len(results) == 1 assert results[0].payload["previous_status"] == "ok" assert results[0].payload["current_status"] == "error" @pytest.mark.asyncio async def test_sustained_error_no_duplicate_event(storage: Storage): """Component stays in error across multiple runs — only first run emits.""" client_ok = _make_client(_ok_response("cloud")) client_err = _make_client(_error_response("cloud")) settings = _make_settings() await SystemHealthCheck(client_ok, storage, settings).run() results1 = await SystemHealthCheck(client_err, storage, settings).run() results2 = await SystemHealthCheck(client_err, storage, settings).run() results3 = await SystemHealthCheck(client_err, storage, settings).run() assert len(results1) == 1 # transition fires assert results2 == [] assert results3 == [] @pytest.mark.asyncio async def test_recovery_clears_alert_and_next_degradation_re_fires(storage: Storage): """error → ok → error: second degradation fires a new event.""" settings = _make_settings() # First degradation await SystemHealthCheck(_make_client(_ok_response("cloud")), storage, settings).run() r1 = await SystemHealthCheck(_make_client(_error_response("cloud")), storage, settings).run() assert len(r1) == 1 # Recovery r2 = await SystemHealthCheck(_make_client(_ok_response("cloud")), storage, settings).run() assert r2 == [] # Second degradation r3 = await SystemHealthCheck(_make_client(_error_response("cloud")), storage, settings).run() assert len(r3) == 1 assert r3[0].payload["previous_status"] == "ok" @pytest.mark.asyncio async def test_multiple_degraded_components_multiple_events(storage: Storage): health = {**_error_response("cloud", "recorder"), **_ok_response("homeassistant")} check = SystemHealthCheck(_make_client(health), storage, _make_settings()) results = await check.run() components = {r.payload["component"] for r in results} assert components == {"cloud", "recorder"} assert all(r.event_type == HAEventType.ha_system_health_degraded for r in results) @pytest.mark.asyncio async def test_api_error_returns_empty(storage: Storage): """If /api/system_health is unreachable, return no results (not an error event).""" check = SystemHealthCheck( _make_client(error=Exception("timeout")), storage, _make_settings() ) results = await check.run() assert results == [] @pytest.mark.asyncio async def test_payload_contains_details(storage: Storage): health = {"recorder": {"type": "error", "error": "DB write lag 5000ms"}} check = SystemHealthCheck(_make_client(health), storage, _make_settings()) results = await check.run() assert len(results) == 1 assert "DB write lag" in results[0].payload["details"]["error"] @pytest.mark.asyncio async def test_snapshot_updated_after_recovery(storage: Storage): """After a recovery cycle, snapshot shows last_status='ok'.""" settings = _make_settings() await SystemHealthCheck(_make_client(_error_response("cloud")), storage, settings).run() await SystemHealthCheck(_make_client(_ok_response("cloud")), storage, settings).run() snap = await storage.get_system_health_snapshot("cloud") assert snap["last_status"] == "ok"