New checks: - SystemHealthCheck (15min interval): detects newly-failing HA integrations via /api/system_health snapshot diff; transition-based dedup (ok→error fires, sustained error silent, error→ok clears alert) - UpdatesAvailableCheck (daily cron 09:00): per-update ha_update_available events with 7-day dedup; release notes truncated at 2000 chars - UpdatesDigestCheck (Sunday cron 09:00): single digest event with all pending updates; weekly ISO-week dedup, independent of daily dedup key - AutomationFailuresCheck (30min interval): detects automations with N consecutive failures (default 3) via /api/trace/automation/<id>; 6h cooldown per automation Phase 3 flag fixes: - Flag #1 (since field): UnavailableEntitiesCheck now uses min(state.last_changed, baseline.first_seen) as effective "since", giving accurate duration when agent was offline at entity's first fail - Flag #3 (registry cache): HAClient.get_entity_registry() caches response in-process with configurable TTL (default 300s); avoids repeated API calls across concurrent check cycles; invalidate_registry_cache() for manual invalidation Storage: system_health_snapshot table (component, last_status, last_seen_at, payload) created automatically on next Storage.open() call Config additions (all with defaults): entity_registry_cache_ttl=300, system_health_check_interval=900, automation_check_interval=1800, automation_failure_threshold=3, updates_check_hour=9, updates_check_minute=0, updates_cooldown_days=7 Tests: 95 unit tests pass (49 new), 13 integration tests pass (9 new); 3 skipped (live-HA token not set in CI) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
222 lines
8.1 KiB
Python
222 lines
8.1 KiB
Python
"""Unit tests for SystemHealthCheck."""
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
import pytest
|
|
|
|
from ha_diag.checks.system_health import SystemHealthCheck, _extract_component_statuses
|
|
from ha_diag.config import Settings
|
|
from ha_diag.models import HAEventType, Severity
|
|
from ha_diag.storage import Storage
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_settings(**overrides) -> Settings:
|
|
defaults: dict = {
|
|
"ha_url": "http://test.local:8123",
|
|
"ha_token": "test",
|
|
"node_name": "test-node",
|
|
"location_tag": "test-loc",
|
|
"alert_cooldown_hours": 0.0,
|
|
"check_interval": 60,
|
|
"check_interval_unavailable": 3600,
|
|
}
|
|
defaults.update(overrides)
|
|
return Settings(**defaults)
|
|
|
|
|
|
def _make_client(health=None, error=None):
|
|
client = MagicMock()
|
|
if error:
|
|
client.get_system_health = AsyncMock(side_effect=error)
|
|
else:
|
|
client.get_system_health = AsyncMock(return_value=health or {})
|
|
return client
|
|
|
|
|
|
def _ok_response(*components: str) -> dict:
|
|
return {c: {"type": "result", "data": {"ok": True}} for c in components}
|
|
|
|
|
|
def _error_response(*components: str) -> dict:
|
|
return {c: {"type": "error", "error": f"{c} failed"} for c in components}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _extract_component_statuses unit tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_extract_typed_result_format():
|
|
data = {"recorder": {"type": "result", "data": {"backlog": 0}}}
|
|
result = _extract_component_statuses(data)
|
|
assert result["recorder"]["status"] == "ok"
|
|
assert result["recorder"]["details"] == {"backlog": 0}
|
|
|
|
|
|
def test_extract_typed_error_format():
|
|
data = {"cloud": {"type": "error", "error": "Connection refused"}}
|
|
result = _extract_component_statuses(data)
|
|
assert result["cloud"]["status"] == "error"
|
|
assert "Connection refused" in result["cloud"]["details"]["error"]
|
|
|
|
|
|
def test_extract_legacy_error_field():
|
|
data = {"cloud": {"error": "Timeout"}}
|
|
result = _extract_component_statuses(data)
|
|
assert result["cloud"]["status"] == "error"
|
|
|
|
|
|
def test_extract_nested_checks_format():
|
|
data = {
|
|
"info": {"version": "2024.12.0"},
|
|
"checks": {
|
|
"homeassistant": {"type": "result", "data": {}},
|
|
"recorder": {"type": "error", "error": "DB locked"},
|
|
},
|
|
}
|
|
result = _extract_component_statuses(data)
|
|
assert "homeassistant" not in result or result.get("homeassistant", {}).get("status") == "ok"
|
|
assert result["recorder"]["status"] == "error"
|
|
assert "info" not in result
|
|
|
|
|
|
def test_extract_plain_dict_treated_as_ok():
|
|
data = {"homeassistant": {"version": "2024.12.0", "docker": True}}
|
|
result = _extract_component_statuses(data)
|
|
assert result["homeassistant"]["status"] == "ok"
|
|
|
|
|
|
def test_extract_non_dict_value_skipped():
|
|
data = {"scalar_component": "just-a-string"}
|
|
result = _extract_component_statuses(data)
|
|
assert "scalar_component" not in result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SystemHealthCheck run() tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_first_run_no_snapshot_no_event_for_ok(storage: Storage):
|
|
"""All components ok on first run — record snapshots, emit nothing."""
|
|
check = SystemHealthCheck(_make_client(_ok_response("homeassistant", "recorder")),
|
|
storage, _make_settings())
|
|
results = await check.run()
|
|
assert results == []
|
|
snap = await storage.get_system_health_snapshot("homeassistant")
|
|
assert snap is not None
|
|
assert snap["last_status"] == "ok"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_first_run_error_component_emits_event(storage: Storage):
|
|
"""Component in error on first run (no prior snapshot) → ha_system_health_degraded."""
|
|
check = SystemHealthCheck(_make_client(_error_response("cloud")), storage, _make_settings())
|
|
results = await check.run()
|
|
assert len(results) == 1
|
|
r = results[0]
|
|
assert r.event_type == HAEventType.ha_system_health_degraded
|
|
assert r.payload["component"] == "cloud"
|
|
assert r.payload["previous_status"] == "unknown"
|
|
assert r.payload["current_status"] == "error"
|
|
assert r.severity == Severity.warning
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ok_to_error_transition_emits_event(storage: Storage):
|
|
"""Component transitions ok → error → event fired."""
|
|
client_ok = _make_client(_ok_response("cloud"))
|
|
client_err = _make_client(_error_response("cloud"))
|
|
settings = _make_settings()
|
|
|
|
await SystemHealthCheck(client_ok, storage, settings).run()
|
|
results = await SystemHealthCheck(client_err, storage, settings).run()
|
|
|
|
assert len(results) == 1
|
|
assert results[0].payload["previous_status"] == "ok"
|
|
assert results[0].payload["current_status"] == "error"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sustained_error_no_duplicate_event(storage: Storage):
|
|
"""Component stays in error across multiple runs — only first run emits."""
|
|
client_ok = _make_client(_ok_response("cloud"))
|
|
client_err = _make_client(_error_response("cloud"))
|
|
settings = _make_settings()
|
|
|
|
await SystemHealthCheck(client_ok, storage, settings).run()
|
|
results1 = await SystemHealthCheck(client_err, storage, settings).run()
|
|
results2 = await SystemHealthCheck(client_err, storage, settings).run()
|
|
results3 = await SystemHealthCheck(client_err, storage, settings).run()
|
|
|
|
assert len(results1) == 1 # transition fires
|
|
assert results2 == []
|
|
assert results3 == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_recovery_clears_alert_and_next_degradation_re_fires(storage: Storage):
|
|
"""error → ok → error: second degradation fires a new event."""
|
|
settings = _make_settings()
|
|
|
|
# First degradation
|
|
await SystemHealthCheck(_make_client(_ok_response("cloud")), storage, settings).run()
|
|
r1 = await SystemHealthCheck(_make_client(_error_response("cloud")), storage, settings).run()
|
|
assert len(r1) == 1
|
|
|
|
# Recovery
|
|
r2 = await SystemHealthCheck(_make_client(_ok_response("cloud")), storage, settings).run()
|
|
assert r2 == []
|
|
|
|
# Second degradation
|
|
r3 = await SystemHealthCheck(_make_client(_error_response("cloud")), storage, settings).run()
|
|
assert len(r3) == 1
|
|
assert r3[0].payload["previous_status"] == "ok"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_multiple_degraded_components_multiple_events(storage: Storage):
|
|
health = {**_error_response("cloud", "recorder"), **_ok_response("homeassistant")}
|
|
check = SystemHealthCheck(_make_client(health), storage, _make_settings())
|
|
results = await check.run()
|
|
components = {r.payload["component"] for r in results}
|
|
assert components == {"cloud", "recorder"}
|
|
assert all(r.event_type == HAEventType.ha_system_health_degraded for r in results)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_api_error_returns_empty(storage: Storage):
|
|
"""If /api/system_health is unreachable, return no results (not an error event)."""
|
|
check = SystemHealthCheck(
|
|
_make_client(error=Exception("timeout")), storage, _make_settings()
|
|
)
|
|
results = await check.run()
|
|
assert results == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_payload_contains_details(storage: Storage):
|
|
health = {"recorder": {"type": "error", "error": "DB write lag 5000ms"}}
|
|
check = SystemHealthCheck(_make_client(health), storage, _make_settings())
|
|
results = await check.run()
|
|
assert len(results) == 1
|
|
assert "DB write lag" in results[0].payload["details"]["error"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_snapshot_updated_after_recovery(storage: Storage):
|
|
"""After a recovery cycle, snapshot shows last_status='ok'."""
|
|
settings = _make_settings()
|
|
await SystemHealthCheck(_make_client(_error_response("cloud")), storage, settings).run()
|
|
await SystemHealthCheck(_make_client(_ok_response("cloud")), storage, settings).run()
|
|
snap = await storage.get_system_health_snapshot("cloud")
|
|
assert snap["last_status"] == "ok"
|