From 3499b2f280823cc69667c010a442f82525423df8 Mon Sep 17 00:00:00 2001 From: Oskar Kapala Date: Fri, 29 May 2026 14:43:10 +0200 Subject: [PATCH] feat(ha-diag-agent): three REST diagnostic checks + Phase 3 flag fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New checks: - SystemHealthCheck (15min interval): detects newly-failing HA integrations via /api/system_health snapshot diff; transition-based dedup (ok→error fires, sustained error silent, error→ok clears alert) - UpdatesAvailableCheck (daily cron 09:00): per-update ha_update_available events with 7-day dedup; release notes truncated at 2000 chars - UpdatesDigestCheck (Sunday cron 09:00): single digest event with all pending updates; weekly ISO-week dedup, independent of daily dedup key - AutomationFailuresCheck (30min interval): detects automations with N consecutive failures (default 3) via /api/trace/automation/; 6h cooldown per automation Phase 3 flag fixes: - Flag #1 (since field): UnavailableEntitiesCheck now uses min(state.last_changed, baseline.first_seen) as effective "since", giving accurate duration when agent was offline at entity's first fail - Flag #3 (registry cache): HAClient.get_entity_registry() caches response in-process with configurable TTL (default 300s); avoids repeated API calls across concurrent check cycles; invalidate_registry_cache() for manual invalidation Storage: system_health_snapshot table (component, last_status, last_seen_at, payload) created automatically on next Storage.open() call Config additions (all with defaults): entity_registry_cache_ttl=300, system_health_check_interval=900, automation_check_interval=1800, automation_failure_threshold=3, updates_check_hour=9, updates_check_minute=0, updates_cooldown_days=7 Tests: 95 unit tests pass (49 new), 13 integration tests pass (9 new); 3 skipped (live-HA token not set in CI) Co-Authored-By: Claude Sonnet 4.6 --- .../src/ha_diag/checks/automation_failures.py | 97 +++++++ .../src/ha_diag/checks/system_health.py | 110 ++++++++ .../ha_diag/checks/unavailable_entities.py | 27 +- .../src/ha_diag/checks/updates_available.py | 123 +++++++++ services/ha-diag-agent/src/ha_diag/config.py | 15 + .../ha-diag-agent/src/ha_diag/ha_client.py | 30 +- services/ha-diag-agent/src/ha_diag/main.py | 53 +++- services/ha-diag-agent/src/ha_diag/storage.py | 40 +++ services/ha-diag-agent/tests/conftest.py | 2 + .../test_automation_failures_integration.py | 167 ++++++++++++ .../test_system_health_integration.py | 151 +++++++++++ .../test_updates_available_integration.py | 169 ++++++++++++ .../tests/test_automation_failures.py | 217 +++++++++++++++ .../ha-diag-agent/tests/test_ha_client.py | 55 ++++ .../ha-diag-agent/tests/test_system_health.py | 221 +++++++++++++++ .../tests/test_unavailable_entities.py | 84 ++++++ .../tests/test_updates_available.py | 256 ++++++++++++++++++ 17 files changed, 1807 insertions(+), 10 deletions(-) create mode 100644 services/ha-diag-agent/src/ha_diag/checks/automation_failures.py create mode 100644 services/ha-diag-agent/src/ha_diag/checks/system_health.py create mode 100644 services/ha-diag-agent/src/ha_diag/checks/updates_available.py create mode 100644 services/ha-diag-agent/tests/integration/test_automation_failures_integration.py create mode 100644 services/ha-diag-agent/tests/integration/test_system_health_integration.py create mode 100644 services/ha-diag-agent/tests/integration/test_updates_available_integration.py create mode 100644 services/ha-diag-agent/tests/test_automation_failures.py create mode 100644 services/ha-diag-agent/tests/test_system_health.py create mode 100644 services/ha-diag-agent/tests/test_updates_available.py diff --git a/services/ha-diag-agent/src/ha_diag/checks/automation_failures.py b/services/ha-diag-agent/src/ha_diag/checks/automation_failures.py new file mode 100644 index 0000000..aae3f45 --- /dev/null +++ b/services/ha-diag-agent/src/ha_diag/checks/automation_failures.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from ..ha_client import HAClient +from ..models import CheckResult, HAEventType, Severity +from ..storage import Storage +from .base import Check + +if TYPE_CHECKING: + from ..config import Settings + + +class AutomationFailuresCheck(Check): + """Detects automations with consecutive run failures. + + For each enabled automation (state="on"), fetches the last N run traces. + When all N most-recent traces indicate failure, emits ha_automation_failing + with a 6-hour dedup per automation. + """ + + name = "automation_failures" + + def __init__( + self, + ha_client: HAClient, + storage: Storage, + settings: "Settings", + ) -> None: + self._client = ha_client + self._storage = storage + self._settings = settings + + async def run(self) -> list[CheckResult]: + try: + all_states = await self._client.get_states() + except Exception: + return [] + + automations = [ + s for s in all_states + if s["entity_id"].startswith("automation.") and s["state"] == "on" + ] + + results: list[CheckResult] = [] + cooldown_s = self._settings.alert_cooldown_hours * 3600 + threshold = self._settings.automation_failure_threshold + + for auto_state in automations: + eid = auto_state["entity_id"] + try: + traces = await self._client.get_automation_traces(eid) + except Exception: + continue + + if not traces or len(traces) < threshold: + continue + + recent = traces[:threshold] + failures = [t for t in recent if _is_trace_failure(t)] + if len(failures) < threshold: + continue + + alert_key = f"automation_failing:{eid}" + if await self._storage.was_alert_sent(alert_key, cooldown_s): + continue + + attrs = auto_state.get("attributes", {}) + friendly_name = attrs.get("friendly_name", eid) + last_failures = [ + {"timestamp": t.get("timestamp"), "error": t.get("error", "")} + for t in failures + ] + + results.append(CheckResult( + healthy=False, + event_type=HAEventType.ha_automation_failing, + severity=Severity.warning, + message=( + f"Automation '{friendly_name}' failed " + f"{len(failures)} consecutive time(s)" + ), + payload={ + "entity_id": eid, + "friendly_name": friendly_name, + "last_failures": last_failures, + "total_recent_failures": len(failures), + }, + )) + await self._storage.mark_alert_sent(alert_key) + + return results + + +def _is_trace_failure(trace: dict[str, Any]) -> bool: + """A trace is a failure if it has a non-empty error or an explicit failed state.""" + return bool(trace.get("error")) or trace.get("state") == "failed" diff --git a/services/ha-diag-agent/src/ha_diag/checks/system_health.py b/services/ha-diag-agent/src/ha_diag/checks/system_health.py new file mode 100644 index 0000000..13b9d70 --- /dev/null +++ b/services/ha-diag-agent/src/ha_diag/checks/system_health.py @@ -0,0 +1,110 @@ +from __future__ import annotations + +import json +from typing import TYPE_CHECKING, Any + +from ..ha_client import HAClient +from ..models import CheckResult, HAEventType, Severity +from ..storage import Storage +from .base import Check + +if TYPE_CHECKING: + from ..config import Settings + + +class SystemHealthCheck(Check): + """Detects newly-failing HA integrations via /api/system_health. + + Logic per run: + 1. Fetch /api/system_health and parse per-component statuses. + 2. Diff against stored snapshots in system_health_snapshot. + 3. Emit ha_system_health_degraded on ok → error transitions. + 4. Clear alerts_sent on error → ok recovery (next degradation re-alerts). + 5. Update all component snapshots. + + API errors (HA unreachable) return no results; HeartbeatCheck handles + HA reachability separately. + """ + + name = "system_health" + + def __init__( + self, + ha_client: HAClient, + storage: Storage, + settings: "Settings", + ) -> None: + self._client = ha_client + self._storage = storage + self._settings = settings + + async def run(self) -> list[CheckResult]: + try: + health_data = await self._client.get_system_health() + except Exception: + return [] + + statuses = _extract_component_statuses(health_data) + results: list[CheckResult] = [] + + for component, info in statuses.items(): + status = info["status"] + details = info.get("details", {}) + prev = await self._storage.get_system_health_snapshot(component) + + if status == "error": + if prev is None or prev["last_status"] == "ok": + results.append(CheckResult( + healthy=False, + event_type=HAEventType.ha_system_health_degraded, + severity=Severity.warning, + message=f"HA component '{component}' is degraded", + payload={ + "component": component, + "previous_status": prev["last_status"] if prev else "unknown", + "current_status": "error", + "details": details, + }, + )) + elif status == "ok" and prev and prev["last_status"] == "error": + await self._storage.clear_alert(f"system_health:{component}") + + await self._storage.upsert_system_health_snapshot( + component, status, json.dumps(details, default=str) + ) + + return results + + +def _extract_component_statuses( + health_data: dict[str, Any], +) -> dict[str, dict[str, Any]]: + """Parse HA /api/system_health into {component: {status, details}}. + + Handles multiple HA response shapes: + - Typed: {component: {"type": "result"|"error", "data": {...}}} + - Legacy: {component: {"error": "msg"}} or {component: {plain_data}} + - Nested: {"checks": {component: {...}}, "info": {...}} + """ + checks = health_data.get("checks", health_data) + if not isinstance(checks, dict): + return {} + + result: dict[str, dict[str, Any]] = {} + for component, value in checks.items(): + if not isinstance(value, dict): + continue + + if value.get("type") == "error" or value.get("error"): + result[component] = { + "status": "error", + "details": {"error": str(value.get("error") or value.get("type", "error"))}, + } + else: + inner = value.get("data", value) + result[component] = { + "status": "ok", + "details": inner if isinstance(inner, dict) else value, + } + + return result diff --git a/services/ha-diag-agent/src/ha_diag/checks/unavailable_entities.py b/services/ha-diag-agent/src/ha_diag/checks/unavailable_entities.py index ba0f160..f633647 100644 --- a/services/ha-diag-agent/src/ha_diag/checks/unavailable_entities.py +++ b/services/ha-diag-agent/src/ha_diag/checks/unavailable_entities.py @@ -15,6 +15,20 @@ if TYPE_CHECKING: _BAD_STATES = frozenset({"unavailable", "unknown"}) +def _parse_last_changed_ts(value: str | None) -> float | None: + """Parse HA last_changed ISO string → Unix timestamp. + + Returns None on missing or malformed input so callers can fall back + to the baseline first_seen without special-casing. + """ + if not value: + return None + try: + return datetime.fromisoformat(value).timestamp() + except (ValueError, TypeError): + return None + + class UnavailableEntitiesCheck(Check): """Detects entities stuck in unavailable/unknown state. @@ -90,7 +104,16 @@ class UnavailableEntitiesCheck(Check): first_at = await self._storage.get_entity_first_unavailable_at(eid) if first_at is None: continue - duration_h = (now - first_at) / 3600 + # Phase 3 Flag #1: if HA reports an earlier last_changed (entity was + # already unavailable before the agent started/reconnected), use that + # as the authoritative "since" so duration is accurate. + last_changed_ts = _parse_last_changed_ts(state_data.get("last_changed")) + effective_since = ( + min(last_changed_ts, first_at) + if last_changed_ts is not None + else first_at + ) + duration_h = (now - effective_since) / 3600 if duration_h < threshold_h: continue alert_key = f"entity_unavailable:{eid}" @@ -99,7 +122,7 @@ class UnavailableEntitiesCheck(Check): to_alert.append({ "entity_id": eid, "state": state_data["state"], - "first_at": first_at, + "first_at": effective_since, "duration_h": duration_h, "domain": eid.split(".")[0], "integration": integration_map.get(eid), diff --git a/services/ha-diag-agent/src/ha_diag/checks/updates_available.py b/services/ha-diag-agent/src/ha_diag/checks/updates_available.py new file mode 100644 index 0000000..0749746 --- /dev/null +++ b/services/ha-diag-agent/src/ha_diag/checks/updates_available.py @@ -0,0 +1,123 @@ +from __future__ import annotations + +from datetime import datetime +from typing import TYPE_CHECKING, Any + +from ..ha_client import HAClient +from ..models import CheckResult, HAEventType, Severity +from ..storage import Storage +from .base import Check + +if TYPE_CHECKING: + from ..config import Settings + +_MAX_RELEASE_NOTES = 2000 + + +class UpdatesAvailableCheck(Check): + """Detects available HA core/add-on updates via update.* entities. + + Runs daily. Emits one ha_update_available event per update entity whose + 7-day dedup window has expired. Falls back gracefully when HA is down. + """ + + name = "updates_available" + + def __init__( + self, + ha_client: HAClient, + storage: Storage, + settings: "Settings", + ) -> None: + self._client = ha_client + self._storage = storage + self._settings = settings + + async def run(self) -> list[CheckResult]: + updates = await self._fetch_active_updates() + if not updates: + return [] + + results: list[CheckResult] = [] + cooldown_s = self._settings.updates_cooldown_days * 86400 + + for state in updates: + eid = state["entity_id"] + alert_key = f"update_available:{eid}" + if await self._storage.was_alert_sent(alert_key, cooldown_s): + continue + attrs = state.get("attributes", {}) + results.append(CheckResult( + healthy=False, + event_type=HAEventType.ha_update_available, + severity=Severity.info, + message=( + f"Update available: {attrs.get('title', eid)} " + f"{attrs.get('installed_version', '?')} → " + f"{attrs.get('latest_version', '?')}" + ), + payload=_build_update_payload(eid, attrs), + )) + await self._storage.mark_alert_sent(alert_key) + + return results + + async def _fetch_active_updates(self) -> list[dict[str, Any]]: + try: + all_states = await self._client.get_states() + except Exception: + return [] + return [ + s for s in all_states + if s["entity_id"].startswith("update.") and s["state"] == "on" + ] + + +class UpdatesDigestCheck(UpdatesAvailableCheck): + """Weekly Sunday digest: single event listing all pending updates. + + Deduped per ISO week (won't re-fire if triggered multiple times on the + same Sunday, e.g. manual + scheduled). + """ + + name = "updates_digest" + + async def run(self) -> list[CheckResult]: + updates = await self._fetch_active_updates() + if not updates: + return [] + + week_key = datetime.now().strftime("%G-W%V") + alert_key = f"update_digest:{week_key}" + if await self._storage.was_alert_sent(alert_key, 6 * 86400): + return [] + + all_payloads = [ + _build_update_payload(s["entity_id"], s.get("attributes", {})) + for s in updates + ] + await self._storage.mark_alert_sent(alert_key) + return [CheckResult( + healthy=False, + event_type=HAEventType.ha_update_available, + severity=Severity.info, + message=f"Weekly digest: {len(all_payloads)} update(s) available", + payload={"digest": True, "updates": all_payloads, "count": len(all_payloads)}, + )] + + +def _build_update_payload(entity_id: str, attrs: dict[str, Any]) -> dict[str, Any]: + payload: dict[str, Any] = { + "entity_id": entity_id, + "title": attrs.get("title", entity_id), + "installed_version": attrs.get("installed_version"), + "latest_version": attrs.get("latest_version"), + "in_progress": attrs.get("in_progress", False), + "auto_update": attrs.get("auto_update", False), + } + if attrs.get("release_url"): + payload["release_url"] = attrs["release_url"] + summary = attrs.get("release_summary") + if summary: + payload["release_summary"] = summary[:_MAX_RELEASE_NOTES] + return payload diff --git a/services/ha-diag-agent/src/ha_diag/config.py b/services/ha-diag-agent/src/ha_diag/config.py index e967b25..50b3b16 100644 --- a/services/ha-diag-agent/src/ha_diag/config.py +++ b/services/ha-diag-agent/src/ha_diag/config.py @@ -30,6 +30,21 @@ class Settings(BaseSettings): integration_failure_min_entities: int = 3 # min count to trigger integration event alert_cooldown_hours: float = 6.0 # don't re-alert same entity within N hours + # Phase 3 Flag #3: entity registry cache TTL + entity_registry_cache_ttl: int = 300 # seconds + + # SystemHealthCheck + system_health_check_interval: int = 900 # 15 min + + # AutomationFailuresCheck + automation_check_interval: int = 1800 # 30 min + automation_failure_threshold: int = 3 # consecutive failures before alert + + # UpdatesAvailableCheck + updates_check_hour: int = 9 + updates_check_minute: int = 0 + updates_cooldown_days: int = 7 # don't re-alert same update within N days + # API server port: int = 8087 log_level: str = "info" diff --git a/services/ha-diag-agent/src/ha_diag/ha_client.py b/services/ha-diag-agent/src/ha_diag/ha_client.py index bb3f4f8..5579988 100644 --- a/services/ha-diag-agent/src/ha_diag/ha_client.py +++ b/services/ha-diag-agent/src/ha_diag/ha_client.py @@ -1,5 +1,6 @@ from __future__ import annotations +import time from typing import Any import aiohttp @@ -24,9 +25,17 @@ class HAClient: session-borrower: it never opens or closes the session it receives. """ - def __init__(self, base_url: str, session: aiohttp.ClientSession) -> None: + def __init__( + self, + base_url: str, + session: aiohttp.ClientSession, + entity_registry_cache_ttl: float = 300.0, + ) -> None: self._base_url = base_url.rstrip("/") self._session = session + self._registry_cache_ttl = entity_registry_cache_ttl + self._registry_cache: list[dict[str, Any]] | None = None + self._registry_fetched_at: float = 0.0 async def get_api_status(self) -> dict[str, Any]: """GET /api/ — returns {"message": "API running."} when HA is up.""" @@ -57,12 +66,29 @@ class HAClient: Each entry includes entity_id, platform (integration name), area_id, config_entry_id, and other metadata. + + Result is cached in-process for entity_registry_cache_ttl seconds to + avoid hammering HA on every check cycle (Phase 3 Flag #3). """ + now = time.monotonic() + if ( + self._registry_cache is not None + and (now - self._registry_fetched_at) < self._registry_cache_ttl + ): + return self._registry_cache async with self._session.get( f"{self._base_url}/api/config/entity_registry" ) as resp: resp.raise_for_status() - return await resp.json() + result = await resp.json() + self._registry_cache = result + self._registry_fetched_at = now + return result + + def invalidate_registry_cache(self) -> None: + """Force the next get_entity_registry() call to fetch fresh data.""" + self._registry_cache = None + self._registry_fetched_at = 0.0 async def get_automation_traces(self, automation_id: str) -> list[dict[str, Any]]: """GET /api/trace/automation/ — last run traces for an automation.""" diff --git a/services/ha-diag-agent/src/ha_diag/main.py b/services/ha-diag-agent/src/ha_diag/main.py index 2015daa..90881fd 100644 --- a/services/ha-diag-agent/src/ha_diag/main.py +++ b/services/ha-diag-agent/src/ha_diag/main.py @@ -11,8 +11,11 @@ import uvicorn from apscheduler.schedulers.asyncio import AsyncIOScheduler from .api import app, register_checks +from .checks.automation_failures import AutomationFailuresCheck from .checks.heartbeat import HeartbeatCheck +from .checks.system_health import SystemHealthCheck from .checks.unavailable_entities import UnavailableEntitiesCheck +from .checks.updates_available import UpdatesAvailableCheck, UpdatesDigestCheck from .config import Settings from .event_emitter import EventEmitter from .ha_client import HAClient, make_session @@ -93,37 +96,75 @@ async def run(settings: Settings) -> None: # Shared session — created once at startup, closed on shutdown session = make_session(settings.ha_token, settings.ha_timeout) - ha_client = HAClient(settings.ha_url, session) + ha_client = HAClient( + settings.ha_url, session, + entity_registry_cache_ttl=settings.entity_registry_cache_ttl, + ) heartbeat = HeartbeatCheck(ha_client) unavailable = UnavailableEntitiesCheck(ha_client, storage, settings) + system_health = SystemHealthCheck(ha_client, storage, settings) + automation_failures = AutomationFailuresCheck(ha_client, storage, settings) + updates_daily = UpdatesAvailableCheck(ha_client, storage, settings) + updates_digest = UpdatesDigestCheck(ha_client, storage, settings) - all_checks = [heartbeat, unavailable] + all_checks = [heartbeat, unavailable, system_health, automation_failures, + updates_daily, updates_digest] register_checks(all_checks, settings.node_name, settings.location_tag) scheduler = AsyncIOScheduler() scheduler.add_job( - _run_check_and_emit, - "interval", + _run_check_and_emit, "interval", seconds=settings.check_interval, args=[heartbeat, emitter, storage], id="check_heartbeat", next_run_time=datetime.now(), ) scheduler.add_job( - _run_check_and_emit, - "interval", + _run_check_and_emit, "interval", seconds=settings.check_interval_unavailable, args=[unavailable, emitter, storage], id="check_unavailable_entities", next_run_time=datetime.now(), ) + scheduler.add_job( + _run_check_and_emit, "interval", + seconds=settings.system_health_check_interval, + args=[system_health, emitter, storage], + id="check_system_health", + next_run_time=datetime.now(), + ) + scheduler.add_job( + _run_check_and_emit, "interval", + seconds=settings.automation_check_interval, + args=[automation_failures, emitter, storage], + id="check_automation_failures", + next_run_time=datetime.now(), + ) + scheduler.add_job( + _run_check_and_emit, "cron", + hour=settings.updates_check_hour, + minute=settings.updates_check_minute, + args=[updates_daily, emitter, storage], + id="check_updates_available", + ) + scheduler.add_job( + _run_check_and_emit, "cron", + day_of_week="sun", + hour=settings.updates_check_hour, + minute=settings.updates_check_minute, + args=[updates_digest, emitter, storage], + id="check_updates_digest", + ) scheduler.start() _log.info( "scheduler_started", checks=[c.name for c in all_checks], heartbeat_interval=settings.check_interval, unavailable_interval=settings.check_interval_unavailable, + system_health_interval=settings.system_health_check_interval, + automation_interval=settings.automation_check_interval, + updates_hour=settings.updates_check_hour, ) config = uvicorn.Config( diff --git a/services/ha-diag-agent/src/ha_diag/storage.py b/services/ha-diag-agent/src/ha_diag/storage.py index e628c8a..e25f7f9 100644 --- a/services/ha-diag-agent/src/ha_diag/storage.py +++ b/services/ha-diag-agent/src/ha_diag/storage.py @@ -7,6 +7,13 @@ from typing import Any import aiosqlite _SCHEMA = """ +CREATE TABLE IF NOT EXISTS system_health_snapshot ( + component TEXT PRIMARY KEY, + last_status TEXT NOT NULL, + last_seen_at REAL NOT NULL, + payload TEXT NOT NULL DEFAULT '{}' +); + CREATE TABLE IF NOT EXISTS entity_baseline ( entity_id TEXT PRIMARY KEY, -- state when entity first entered unavailable/unknown @@ -185,3 +192,36 @@ class Storage: "DELETE FROM alerts_sent WHERE alert_key = ?", (alert_key,) ) await self._conn().commit() + + # ------------------------------------------------------------------ + # system_health_snapshot — tracks last-known per-component status + # ------------------------------------------------------------------ + + async def get_system_health_snapshot( + self, component: str + ) -> dict[str, Any] | None: + """Return the stored snapshot for a component, or None if unseen.""" + async with self._conn().execute( + "SELECT * FROM system_health_snapshot WHERE component = ?", + (component,), + ) as cur: + row = await cur.fetchone() + return dict(row) if row else None + + async def upsert_system_health_snapshot( + self, component: str, last_status: str, payload: str + ) -> None: + """Insert or replace the snapshot for a component.""" + await self._conn().execute( + """ + INSERT INTO system_health_snapshot + (component, last_status, last_seen_at, payload) + VALUES (?, ?, ?, ?) + ON CONFLICT(component) DO UPDATE SET + last_status = excluded.last_status, + last_seen_at = excluded.last_seen_at, + payload = excluded.payload + """, + (component, last_status, time.time(), payload), + ) + await self._conn().commit() diff --git a/services/ha-diag-agent/tests/conftest.py b/services/ha-diag-agent/tests/conftest.py index f403666..e9550b0 100644 --- a/services/ha-diag-agent/tests/conftest.py +++ b/services/ha-diag-agent/tests/conftest.py @@ -59,4 +59,6 @@ def mock_ha_client(): client.get_api_status = AsyncMock(return_value={"message": "API running."}) client.get_states = AsyncMock(return_value=[]) client.get_entity_registry = AsyncMock(return_value=[]) + client.get_system_health = AsyncMock(return_value={}) + client.get_automation_traces = AsyncMock(return_value=[]) return client diff --git a/services/ha-diag-agent/tests/integration/test_automation_failures_integration.py b/services/ha-diag-agent/tests/integration/test_automation_failures_integration.py new file mode 100644 index 0000000..9a0baec --- /dev/null +++ b/services/ha-diag-agent/tests/integration/test_automation_failures_integration.py @@ -0,0 +1,167 @@ +"""Integration tests for AutomationFailuresCheck. + +Uses real aiosqlite Storage + EventEmitter + mocked HTTP. +""" +from __future__ import annotations + +import json +from pathlib import Path +from typing import AsyncGenerator + +import pytest +import pytest_asyncio +from aioresponses import aioresponses + +from ha_diag.checks.automation_failures import AutomationFailuresCheck +from ha_diag.config import Settings +from ha_diag.event_emitter import EventEmitter +from ha_diag.ha_client import HAClient, make_session +from ha_diag.models import HAEventType +from ha_diag.storage import Storage + +HA_URL = "http://ha-test-ken:8123" + + +def _settings(**overrides) -> Settings: + defaults: dict = { + "ha_url": HA_URL, + "ha_token": "test-token", + "node_name": "piha", + "location_tag": "ken", + "alert_cooldown_hours": 0.0, + "automation_failure_threshold": 3, + "check_interval": 60, + "check_interval_unavailable": 3600, + } + defaults.update(overrides) + return Settings(**defaults) + + +@pytest_asyncio.fixture +async def storage(tmp_path: Path) -> AsyncGenerator[Storage, None]: + s = Storage(tmp_path / "integration_test.db") + await s.open() + yield s + await s.close() + + +@pytest.fixture +def events_dir(tmp_path: Path) -> Path: + d = tmp_path / "events" + d.mkdir() + return d + + +def _auto_states(*entity_ids: str) -> list[dict]: + return [ + { + "entity_id": eid, + "state": "on", + "attributes": {"friendly_name": eid.split(".")[-1].replace("_", " ").title()}, + } + for eid in entity_ids + ] + + +def _fail_traces(n: int = 3) -> list[dict]: + return [ + { + "run_id": f"run-{i}", + "timestamp": f"2026-05-27T{10+i:02d}:00:00+00:00", + "trigger": "state", + "state": "stopped", + "error": f"Script error #{i}", + } + for i in range(n) + ] + + +def _ok_traces(n: int = 3) -> list[dict]: + return [ + { + "run_id": f"run-{i}", + "timestamp": f"2026-05-27T{10+i:02d}:00:00+00:00", + "trigger": "state", + "state": "stopped", + "error": None, + } + for i in range(n) + ] + + +@pytest.mark.integration +async def test_failing_automation_emits_event_and_writes_file( + storage: Storage, events_dir: Path +): + """3 consecutive failures → event file written with correct structure.""" + states = _auto_states("automation.morning_lights") + traces = _fail_traces(3) + emitter = EventEmitter(events_dir, node_name="piha", location_tag="ken") + + with aioresponses() as m: + m.get(f"{HA_URL}/api/states", payload=states) + m.get(f"{HA_URL}/api/trace/automation/automation.morning_lights", payload=traces) + async with make_session("test-token") as session: + client = HAClient(HA_URL, session) + check = AutomationFailuresCheck(client, storage, _settings()) + results = await check.run() + + assert len(results) == 1 + r = results[0] + assert r.event_type == HAEventType.ha_automation_failing + assert r.payload["entity_id"] == "automation.morning_lights" + assert r.payload["total_recent_failures"] == 3 + + emitter.emit( + event_type=r.event_type, + severity=r.severity.value, + service="homeassistant", + message=r.message, + payload=r.payload, + ) + + files = list(events_dir.glob("*.json")) + assert len(files) == 1 + data = json.loads(files[0].read_text()) + assert data["type"] == "ha_automation_failing" + assert data["payload"]["location_tag"] == "ken" + assert "last_failures" in data["payload"] + + +@pytest.mark.integration +async def test_healthy_automation_no_event(storage: Storage): + """All recent runs successful → no event.""" + states = _auto_states("automation.morning_lights") + traces = _ok_traces(3) + + with aioresponses() as m: + m.get(f"{HA_URL}/api/states", payload=states) + m.get(f"{HA_URL}/api/trace/automation/automation.morning_lights", payload=traces) + async with make_session("test-token") as session: + client = HAClient(HA_URL, session) + check = AutomationFailuresCheck(client, storage, _settings()) + results = await check.run() + + assert results == [] + + +@pytest.mark.integration +async def test_cooldown_suppresses_duplicate(storage: Storage): + """Second run within cooldown window → no duplicate event.""" + states = _auto_states("automation.morning_lights") + traces = _fail_traces(3) + settings = _settings(alert_cooldown_hours=6.0) + + for _ in range(2): + with aioresponses() as m: + m.get(f"{HA_URL}/api/states", payload=states) + m.get(f"{HA_URL}/api/trace/automation/automation.morning_lights", payload=traces) + async with make_session("test-token") as session: + check = AutomationFailuresCheck( + HAClient(HA_URL, session), storage, settings + ) + results = await check.run() + if _ == 0: + assert len(results) == 1 + else: + assert results == [] diff --git a/services/ha-diag-agent/tests/integration/test_system_health_integration.py b/services/ha-diag-agent/tests/integration/test_system_health_integration.py new file mode 100644 index 0000000..80205b4 --- /dev/null +++ b/services/ha-diag-agent/tests/integration/test_system_health_integration.py @@ -0,0 +1,151 @@ +"""Integration tests for SystemHealthCheck using aioresponses. + +Uses real aiosqlite Storage + EventEmitter + mocked HTTP. +Marked 'integration' because it exercises the full stack end-to-end. +""" +from __future__ import annotations + +import json +from pathlib import Path +from typing import AsyncGenerator + +import pytest +import pytest_asyncio +from aioresponses import aioresponses + +from ha_diag.checks.system_health import SystemHealthCheck +from ha_diag.config import Settings +from ha_diag.event_emitter import EventEmitter +from ha_diag.ha_client import HAClient, make_session +from ha_diag.models import HAEventType +from ha_diag.storage import Storage + +HA_URL = "http://ha-test-ken:8123" + + +def _settings(**overrides) -> Settings: + defaults: dict = { + "ha_url": HA_URL, + "ha_token": "test-token", + "node_name": "piha", + "location_tag": "ken", + "alert_cooldown_hours": 0.0, + "check_interval": 60, + "check_interval_unavailable": 3600, + } + defaults.update(overrides) + return Settings(**defaults) + + +@pytest_asyncio.fixture +async def storage(tmp_path: Path) -> AsyncGenerator[Storage, None]: + s = Storage(tmp_path / "integration_test.db") + await s.open() + yield s + await s.close() + + +@pytest.fixture +def events_dir(tmp_path: Path) -> Path: + d = tmp_path / "events" + d.mkdir() + return d + + +@pytest.mark.integration +async def test_system_health_ok_components_no_event( + storage: Storage, events_dir: Path +): + """All components healthy on first run → no events emitted.""" + health = { + "homeassistant": {"type": "result", "data": {"version": "2025.5.0"}}, + "recorder": {"type": "result", "data": {"backlog": 0}}, + } + emitter = EventEmitter(events_dir, node_name="piha", location_tag="ken") + + with aioresponses() as m: + m.get(f"{HA_URL}/api/system_health", payload=health) + async with make_session("test-token") as session: + client = HAClient(HA_URL, session) + check = SystemHealthCheck(client, storage, _settings()) + results = await check.run() + + assert results == [] + assert not list(events_dir.glob("*.json")) + + +@pytest.mark.integration +async def test_system_health_degraded_emits_event_and_writes_file( + storage: Storage, events_dir: Path +): + """Component degrades: event emitted + file written with correct structure.""" + # First run: all ok + health_ok = {"cloud": {"type": "result", "data": {}}} + health_err = {"cloud": {"type": "error", "error": "Cloud connection lost"}} + emitter = EventEmitter(events_dir, node_name="piha", location_tag="ken") + + with aioresponses() as m: + m.get(f"{HA_URL}/api/system_health", payload=health_ok) + async with make_session("test-token") as session: + client = HAClient(HA_URL, session) + await SystemHealthCheck(client, storage, _settings()).run() + + # Second run: cloud errors + with aioresponses() as m: + m.get(f"{HA_URL}/api/system_health", payload=health_err) + async with make_session("test-token") as session: + client = HAClient(HA_URL, session) + check = SystemHealthCheck(client, storage, _settings()) + results = await check.run() + + assert len(results) == 1 + assert results[0].event_type == HAEventType.ha_system_health_degraded + + emitter.emit( + event_type=results[0].event_type, + severity=results[0].severity.value, + service="homeassistant", + message=results[0].message, + payload=results[0].payload, + ) + + files = list(events_dir.glob("*.json")) + assert len(files) == 1 + data = json.loads(files[0].read_text()) + assert data["type"] == "ha_system_health_degraded" + assert data["payload"]["component"] == "cloud" + assert data["payload"]["location_tag"] == "ken" + + +@pytest.mark.integration +async def test_system_health_recovery_and_re_degradation(storage: Storage): + """Full ok→error→ok→error cycle: events fire on degradation, not on recovery.""" + def _run(health): + with aioresponses() as m: + m.get(f"{HA_URL}/api/system_health", payload=health) + return make_session("test-token"), health + + settings = _settings() + + async def run_once(health): + with aioresponses() as m: + m.get(f"{HA_URL}/api/system_health", payload=health) + async with make_session("test-token") as session: + return await SystemHealthCheck( + HAClient(HA_URL, session), storage, settings + ).run() + + ok_h = {"cloud": {"type": "result", "data": {}}} + err_h = {"cloud": {"type": "error", "error": "timeout"}} + + r1 = await run_once(ok_h) # baseline ok + r2 = await run_once(err_h) # first degradation + r3 = await run_once(err_h) # sustained error (no dup) + r4 = await run_once(ok_h) # recovery + r5 = await run_once(err_h) # second degradation + + assert r1 == [] + assert len(r2) == 1 + assert r3 == [] + assert r4 == [] + assert len(r5) == 1 diff --git a/services/ha-diag-agent/tests/integration/test_updates_available_integration.py b/services/ha-diag-agent/tests/integration/test_updates_available_integration.py new file mode 100644 index 0000000..6587bd1 --- /dev/null +++ b/services/ha-diag-agent/tests/integration/test_updates_available_integration.py @@ -0,0 +1,169 @@ +"""Integration tests for UpdatesAvailableCheck and UpdatesDigestCheck. + +Uses real aiosqlite Storage + EventEmitter + mocked HTTP. +""" +from __future__ import annotations + +import json +from pathlib import Path +from typing import AsyncGenerator + +import pytest +import pytest_asyncio +from aioresponses import aioresponses + +from ha_diag.checks.updates_available import UpdatesAvailableCheck, UpdatesDigestCheck +from ha_diag.config import Settings +from ha_diag.event_emitter import EventEmitter +from ha_diag.ha_client import HAClient, make_session +from ha_diag.models import HAEventType +from ha_diag.storage import Storage + +HA_URL = "http://ha-test-ken:8123" + + +def _settings(**overrides) -> Settings: + defaults: dict = { + "ha_url": HA_URL, + "ha_token": "test-token", + "node_name": "piha", + "location_tag": "ken", + "alert_cooldown_hours": 0.0, + "updates_cooldown_days": 0, + "check_interval": 60, + "check_interval_unavailable": 3600, + } + defaults.update(overrides) + return Settings(**defaults) + + +@pytest_asyncio.fixture +async def storage(tmp_path: Path) -> AsyncGenerator[Storage, None]: + s = Storage(tmp_path / "integration_test.db") + await s.open() + yield s + await s.close() + + +@pytest.fixture +def events_dir(tmp_path: Path) -> Path: + d = tmp_path / "events" + d.mkdir() + return d + + +def _update_states(*entity_ids: str) -> list[dict]: + return [ + { + "entity_id": eid, + "state": "on", + "attributes": { + "title": eid.split(".")[-1].replace("_", " ").title(), + "installed_version": "1.0.0", + "latest_version": "2.0.0", + "in_progress": False, + "auto_update": False, + }, + } + for eid in entity_ids + ] + + +@pytest.mark.integration +async def test_individual_updates_written_to_disk(storage: Storage, events_dir: Path): + """2 pending updates → 2 event files with correct structure.""" + states = _update_states("update.ha_core", "update.mosquitto") + emitter = EventEmitter(events_dir, node_name="piha", location_tag="ken") + + with aioresponses() as m: + m.get(f"{HA_URL}/api/states", payload=states) + async with make_session("test-token") as session: + client = HAClient(HA_URL, session) + check = UpdatesAvailableCheck(client, storage, _settings()) + results = await check.run() + + assert len(results) == 2 + for r in results: + assert r.event_type == HAEventType.ha_update_available + emitter.emit( + event_type=r.event_type, + severity=r.severity.value, + service="homeassistant", + message=r.message, + payload=r.payload, + ) + + files = list(events_dir.glob("*.json")) + assert len(files) == 2 + for f in files: + data = json.loads(f.read_text()) + assert data["type"] == "ha_update_available" + assert data["payload"]["location_tag"] == "ken" + assert "entity_id" in data["payload"] + + +@pytest.mark.integration +async def test_digest_writes_single_event_file(storage: Storage, events_dir: Path): + """Sunday digest → single event file with digest=True payload.""" + states = _update_states("update.ha_core", "update.mosquitto", "update.esphome") + emitter = EventEmitter(events_dir, node_name="piha", location_tag="ken") + + with aioresponses() as m: + m.get(f"{HA_URL}/api/states", payload=states) + async with make_session("test-token") as session: + client = HAClient(HA_URL, session) + check = UpdatesDigestCheck(client, storage, _settings()) + results = await check.run() + + assert len(results) == 1 + r = results[0] + assert r.payload["digest"] is True + assert r.payload["count"] == 3 + + emitter.emit( + event_type=r.event_type, + severity=r.severity.value, + service="homeassistant", + message=r.message, + payload=r.payload, + ) + files = list(events_dir.glob("*.json")) + assert len(files) == 1 + data = json.loads(files[0].read_text()) + assert data["payload"]["digest"] is True + assert len(data["payload"]["updates"]) == 3 + + +@pytest.mark.integration +async def test_dedup_across_daily_and_digest_independent(storage: Storage): + """Daily dedup key doesn't suppress digest, and vice versa.""" + states = _update_states("update.ha_core") + settings = _settings(updates_cooldown_days=7) + + # Daily check + with aioresponses() as m: + m.get(f"{HA_URL}/api/states", payload=states) + async with make_session("test-token") as session: + r1 = await UpdatesAvailableCheck( + HAClient(HA_URL, session), storage, settings + ).run() + assert len(r1) == 1 + + # Daily again — cooldown active + with aioresponses() as m: + m.get(f"{HA_URL}/api/states", payload=states) + async with make_session("test-token") as session: + r2 = await UpdatesAvailableCheck( + HAClient(HA_URL, session), storage, settings + ).run() + assert r2 == [] + + # Digest — different key, should still fire + with aioresponses() as m: + m.get(f"{HA_URL}/api/states", payload=states) + async with make_session("test-token") as session: + r3 = await UpdatesDigestCheck( + HAClient(HA_URL, session), storage, settings + ).run() + assert len(r3) == 1 + assert r3[0].payload["digest"] is True diff --git a/services/ha-diag-agent/tests/test_automation_failures.py b/services/ha-diag-agent/tests/test_automation_failures.py new file mode 100644 index 0000000..39d3a9a --- /dev/null +++ b/services/ha-diag-agent/tests/test_automation_failures.py @@ -0,0 +1,217 @@ +"""Unit tests for AutomationFailuresCheck.""" +from __future__ import annotations + +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from ha_diag.checks.automation_failures import AutomationFailuresCheck, _is_trace_failure +from ha_diag.config import Settings +from ha_diag.models import HAEventType, Severity +from ha_diag.storage import Storage + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_settings(**overrides) -> Settings: + defaults: dict = { + "ha_url": "http://test.local:8123", + "ha_token": "test", + "node_name": "test-node", + "location_tag": "test-loc", + "alert_cooldown_hours": 0.0, + "automation_failure_threshold": 3, + "check_interval": 60, + "check_interval_unavailable": 3600, + } + defaults.update(overrides) + return Settings(**defaults) + + +def _make_client(states=None, traces_by_id=None, states_error=None): + client = MagicMock() + if states_error: + client.get_states = AsyncMock(side_effect=states_error) + else: + client.get_states = AsyncMock(return_value=states or []) + + traces_map = traces_by_id or {} + + async def _get_traces(eid): + if eid not in traces_map: + raise Exception(f"404 for {eid}") + return traces_map[eid] + + client.get_automation_traces = AsyncMock(side_effect=_get_traces) + return client + + +def _auto_state(entity_id: str, state: str = "on", friendly_name: str | None = None) -> dict: + attrs: dict = {} + if friendly_name: + attrs["friendly_name"] = friendly_name + return {"entity_id": entity_id, "state": state, "attributes": attrs} + + +def _trace(error: str | None = None, state: str = "stopped") -> dict: + return { + "run_id": "abc", + "timestamp": "2026-05-27T10:00:00+00:00", + "trigger": "state", + "state": state if error is None else "stopped", + "error": error, + } + + +def _fail(error: str = "Script error") -> dict: + return _trace(error=error) + + +def _ok() -> dict: + return _trace(error=None) + + +# --------------------------------------------------------------------------- +# _is_trace_failure unit tests +# --------------------------------------------------------------------------- + + +def test_trace_with_error_is_failure(): + assert _is_trace_failure({"error": "Something went wrong"}) is True + + +def test_trace_with_state_failed_is_failure(): + assert _is_trace_failure({"state": "failed", "error": None}) is True + + +def test_trace_with_null_error_is_success(): + assert _is_trace_failure({"error": None, "state": "stopped"}) is False + + +def test_trace_with_empty_string_error_is_success(): + assert _is_trace_failure({"error": "", "state": "stopped"}) is False + + +def test_trace_with_no_keys_is_success(): + assert _is_trace_failure({}) is False + + +# --------------------------------------------------------------------------- +# AutomationFailuresCheck.run() tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_no_automations_returns_empty(storage: Storage): + check = AutomationFailuresCheck(_make_client([]), storage, _make_settings()) + assert await check.run() == [] + + +@pytest.mark.asyncio +async def test_disabled_automation_skipped(storage: Storage): + states = [_auto_state("automation.morning_lights", state="off")] + check = AutomationFailuresCheck(_make_client(states, {}), storage, _make_settings()) + assert await check.run() == [] + + +@pytest.mark.asyncio +async def test_automation_with_no_traces_skipped(storage: Storage): + states = [_auto_state("automation.morning_lights")] + # _make_client raises exception for missing keys → graceful skip + check = AutomationFailuresCheck(_make_client(states, {}), storage, _make_settings()) + assert await check.run() == [] + + +@pytest.mark.asyncio +async def test_fewer_traces_than_threshold_skipped(storage: Storage): + states = [_auto_state("automation.a")] + traces = {"automation.a": [_fail(), _fail()]} # 2 failures, threshold=3 + check = AutomationFailuresCheck(_make_client(states, traces), storage, _make_settings()) + assert await check.run() == [] + + +@pytest.mark.asyncio +async def test_all_recent_failed_emits_event(storage: Storage): + states = [_auto_state("automation.a", friendly_name="Morning Lights")] + traces = {"automation.a": [_fail("step failed"), _fail("timeout"), _fail("no device")]} + check = AutomationFailuresCheck(_make_client(states, traces), storage, _make_settings()) + results = await check.run() + assert len(results) == 1 + r = results[0] + assert r.event_type == HAEventType.ha_automation_failing + assert r.severity == Severity.warning + assert r.payload["entity_id"] == "automation.a" + assert r.payload["friendly_name"] == "Morning Lights" + assert r.payload["total_recent_failures"] == 3 + assert len(r.payload["last_failures"]) == 3 + + +@pytest.mark.asyncio +async def test_partial_failures_no_event(storage: Storage): + states = [_auto_state("automation.a")] + # 2 failures, 1 success in recent 3 → not all failed + traces = {"automation.a": [_fail(), _ok(), _fail()]} + check = AutomationFailuresCheck(_make_client(states, traces), storage, _make_settings()) + assert await check.run() == [] + + +@pytest.mark.asyncio +async def test_cooldown_prevents_duplicate_event(storage: Storage): + states = [_auto_state("automation.a")] + traces = {"automation.a": [_fail(), _fail(), _fail()]} + settings = _make_settings(alert_cooldown_hours=6.0) + check = AutomationFailuresCheck(_make_client(states, traces), storage, settings) + r1 = await check.run() + r2 = await check.run() + assert len(r1) == 1 + assert r2 == [] + + +@pytest.mark.asyncio +async def test_multiple_failing_automations(storage: Storage): + states = [_auto_state("automation.a"), _auto_state("automation.b")] + traces = { + "automation.a": [_fail(), _fail(), _fail()], + "automation.b": [_fail(), _fail(), _fail()], + } + check = AutomationFailuresCheck(_make_client(states, traces), storage, _make_settings()) + results = await check.run() + assert len(results) == 2 + eids = {r.payload["entity_id"] for r in results} + assert eids == {"automation.a", "automation.b"} + + +@pytest.mark.asyncio +async def test_states_error_returns_empty(storage: Storage): + check = AutomationFailuresCheck( + _make_client(states_error=ConnectionError("down")), storage, _make_settings() + ) + assert await check.run() == [] + + +@pytest.mark.asyncio +async def test_custom_threshold(storage: Storage): + states = [_auto_state("automation.a")] + # threshold=2: 2 failures should trigger + traces = {"automation.a": [_fail(), _fail(), _ok()]} + settings = _make_settings(automation_failure_threshold=2) + check = AutomationFailuresCheck(_make_client(states, traces), storage, settings) + results = await check.run() + assert len(results) == 1 + + +@pytest.mark.asyncio +async def test_failure_with_state_failed_field(storage: Storage): + states = [_auto_state("automation.a")] + traces = {"automation.a": [ + {"run_id": "x", "state": "failed", "error": None, "timestamp": "2026-05-27T10:00:00Z"}, + {"run_id": "y", "state": "failed", "error": None, "timestamp": "2026-05-27T09:00:00Z"}, + {"run_id": "z", "state": "failed", "error": None, "timestamp": "2026-05-27T08:00:00Z"}, + ]} + check = AutomationFailuresCheck(_make_client(states, traces), storage, _make_settings()) + results = await check.run() + assert len(results) == 1 diff --git a/services/ha-diag-agent/tests/test_ha_client.py b/services/ha-diag-agent/tests/test_ha_client.py index 56388cb..41c991c 100644 --- a/services/ha-diag-agent/tests/test_ha_client.py +++ b/services/ha-diag-agent/tests/test_ha_client.py @@ -78,3 +78,58 @@ async def test_make_session_sets_auth_header(): await client.get_api_status() # Verify the Authorization header was sent assert session.headers.get("Authorization") == "Bearer my-secret-token" + + +# --------------------------------------------------------------------------- +# Entity registry TTL cache (Phase 3 Flag #3) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_entity_registry_cached_on_second_call(): + """Second call within TTL returns cache, making only one HTTP request.""" + payload = [{"entity_id": "light.hall", "platform": "zha", "area_id": "hallway"}] + with aioresponses() as m: + m.get(f"{HA_URL}/api/config/entity_registry", payload=payload) + async with make_session(TOKEN) as session: + client = HAClient(HA_URL, session, entity_registry_cache_ttl=300.0) + r1 = await client.get_entity_registry() + r2 = await client.get_entity_registry() # from cache — no second HTTP call + assert r1 == r2 + # aioresponses would raise ConnectionError on the unmocked second request + # if caching weren't working; reaching here means it used the cache. + + +@pytest.mark.asyncio +async def test_entity_registry_cache_bypassed_after_ttl(monkeypatch): + """After TTL expiry, next call fetches fresh data.""" + import time + payload = [{"entity_id": "light.hall", "platform": "zha", "area_id": "hallway"}] + with aioresponses() as m: + m.get(f"{HA_URL}/api/config/entity_registry", payload=payload) + m.get(f"{HA_URL}/api/config/entity_registry", payload=payload) + async with make_session(TOKEN) as session: + client = HAClient(HA_URL, session, entity_registry_cache_ttl=0.0) + await client.get_entity_registry() # fetches + await client.get_entity_registry() # TTL=0 → fetches again + + +@pytest.mark.asyncio +async def test_invalidate_registry_cache_forces_refetch(): + """invalidate_registry_cache() makes the next call hit the network.""" + payload = [{"entity_id": "light.hall", "platform": "zha", "area_id": ""}] + with aioresponses() as m: + m.get(f"{HA_URL}/api/config/entity_registry", payload=payload) + m.get(f"{HA_URL}/api/config/entity_registry", payload=payload) + async with make_session(TOKEN) as session: + client = HAClient(HA_URL, session, entity_registry_cache_ttl=300.0) + await client.get_entity_registry() + client.invalidate_registry_cache() + await client.get_entity_registry() # must hit network again + + +@pytest.mark.asyncio +async def test_entity_registry_cache_default_ttl_is_300(): + async with make_session(TOKEN) as session: + client = HAClient(HA_URL, session) + assert client._registry_cache_ttl == 300.0 diff --git a/services/ha-diag-agent/tests/test_system_health.py b/services/ha-diag-agent/tests/test_system_health.py new file mode 100644 index 0000000..43a6cae --- /dev/null +++ b/services/ha-diag-agent/tests/test_system_health.py @@ -0,0 +1,221 @@ +"""Unit tests for SystemHealthCheck.""" +from __future__ import annotations + +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from ha_diag.checks.system_health import SystemHealthCheck, _extract_component_statuses +from ha_diag.config import Settings +from ha_diag.models import HAEventType, Severity +from ha_diag.storage import Storage + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_settings(**overrides) -> Settings: + defaults: dict = { + "ha_url": "http://test.local:8123", + "ha_token": "test", + "node_name": "test-node", + "location_tag": "test-loc", + "alert_cooldown_hours": 0.0, + "check_interval": 60, + "check_interval_unavailable": 3600, + } + defaults.update(overrides) + return Settings(**defaults) + + +def _make_client(health=None, error=None): + client = MagicMock() + if error: + client.get_system_health = AsyncMock(side_effect=error) + else: + client.get_system_health = AsyncMock(return_value=health or {}) + return client + + +def _ok_response(*components: str) -> dict: + return {c: {"type": "result", "data": {"ok": True}} for c in components} + + +def _error_response(*components: str) -> dict: + return {c: {"type": "error", "error": f"{c} failed"} for c in components} + + +# --------------------------------------------------------------------------- +# _extract_component_statuses unit tests +# --------------------------------------------------------------------------- + + +def test_extract_typed_result_format(): + data = {"recorder": {"type": "result", "data": {"backlog": 0}}} + result = _extract_component_statuses(data) + assert result["recorder"]["status"] == "ok" + assert result["recorder"]["details"] == {"backlog": 0} + + +def test_extract_typed_error_format(): + data = {"cloud": {"type": "error", "error": "Connection refused"}} + result = _extract_component_statuses(data) + assert result["cloud"]["status"] == "error" + assert "Connection refused" in result["cloud"]["details"]["error"] + + +def test_extract_legacy_error_field(): + data = {"cloud": {"error": "Timeout"}} + result = _extract_component_statuses(data) + assert result["cloud"]["status"] == "error" + + +def test_extract_nested_checks_format(): + data = { + "info": {"version": "2024.12.0"}, + "checks": { + "homeassistant": {"type": "result", "data": {}}, + "recorder": {"type": "error", "error": "DB locked"}, + }, + } + result = _extract_component_statuses(data) + assert "homeassistant" not in result or result.get("homeassistant", {}).get("status") == "ok" + assert result["recorder"]["status"] == "error" + assert "info" not in result + + +def test_extract_plain_dict_treated_as_ok(): + data = {"homeassistant": {"version": "2024.12.0", "docker": True}} + result = _extract_component_statuses(data) + assert result["homeassistant"]["status"] == "ok" + + +def test_extract_non_dict_value_skipped(): + data = {"scalar_component": "just-a-string"} + result = _extract_component_statuses(data) + assert "scalar_component" not in result + + +# --------------------------------------------------------------------------- +# SystemHealthCheck run() tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_first_run_no_snapshot_no_event_for_ok(storage: Storage): + """All components ok on first run — record snapshots, emit nothing.""" + check = SystemHealthCheck(_make_client(_ok_response("homeassistant", "recorder")), + storage, _make_settings()) + results = await check.run() + assert results == [] + snap = await storage.get_system_health_snapshot("homeassistant") + assert snap is not None + assert snap["last_status"] == "ok" + + +@pytest.mark.asyncio +async def test_first_run_error_component_emits_event(storage: Storage): + """Component in error on first run (no prior snapshot) → ha_system_health_degraded.""" + check = SystemHealthCheck(_make_client(_error_response("cloud")), storage, _make_settings()) + results = await check.run() + assert len(results) == 1 + r = results[0] + assert r.event_type == HAEventType.ha_system_health_degraded + assert r.payload["component"] == "cloud" + assert r.payload["previous_status"] == "unknown" + assert r.payload["current_status"] == "error" + assert r.severity == Severity.warning + + +@pytest.mark.asyncio +async def test_ok_to_error_transition_emits_event(storage: Storage): + """Component transitions ok → error → event fired.""" + client_ok = _make_client(_ok_response("cloud")) + client_err = _make_client(_error_response("cloud")) + settings = _make_settings() + + await SystemHealthCheck(client_ok, storage, settings).run() + results = await SystemHealthCheck(client_err, storage, settings).run() + + assert len(results) == 1 + assert results[0].payload["previous_status"] == "ok" + assert results[0].payload["current_status"] == "error" + + +@pytest.mark.asyncio +async def test_sustained_error_no_duplicate_event(storage: Storage): + """Component stays in error across multiple runs — only first run emits.""" + client_ok = _make_client(_ok_response("cloud")) + client_err = _make_client(_error_response("cloud")) + settings = _make_settings() + + await SystemHealthCheck(client_ok, storage, settings).run() + results1 = await SystemHealthCheck(client_err, storage, settings).run() + results2 = await SystemHealthCheck(client_err, storage, settings).run() + results3 = await SystemHealthCheck(client_err, storage, settings).run() + + assert len(results1) == 1 # transition fires + assert results2 == [] + assert results3 == [] + + +@pytest.mark.asyncio +async def test_recovery_clears_alert_and_next_degradation_re_fires(storage: Storage): + """error → ok → error: second degradation fires a new event.""" + settings = _make_settings() + + # First degradation + await SystemHealthCheck(_make_client(_ok_response("cloud")), storage, settings).run() + r1 = await SystemHealthCheck(_make_client(_error_response("cloud")), storage, settings).run() + assert len(r1) == 1 + + # Recovery + r2 = await SystemHealthCheck(_make_client(_ok_response("cloud")), storage, settings).run() + assert r2 == [] + + # Second degradation + r3 = await SystemHealthCheck(_make_client(_error_response("cloud")), storage, settings).run() + assert len(r3) == 1 + assert r3[0].payload["previous_status"] == "ok" + + +@pytest.mark.asyncio +async def test_multiple_degraded_components_multiple_events(storage: Storage): + health = {**_error_response("cloud", "recorder"), **_ok_response("homeassistant")} + check = SystemHealthCheck(_make_client(health), storage, _make_settings()) + results = await check.run() + components = {r.payload["component"] for r in results} + assert components == {"cloud", "recorder"} + assert all(r.event_type == HAEventType.ha_system_health_degraded for r in results) + + +@pytest.mark.asyncio +async def test_api_error_returns_empty(storage: Storage): + """If /api/system_health is unreachable, return no results (not an error event).""" + check = SystemHealthCheck( + _make_client(error=Exception("timeout")), storage, _make_settings() + ) + results = await check.run() + assert results == [] + + +@pytest.mark.asyncio +async def test_payload_contains_details(storage: Storage): + health = {"recorder": {"type": "error", "error": "DB write lag 5000ms"}} + check = SystemHealthCheck(_make_client(health), storage, _make_settings()) + results = await check.run() + assert len(results) == 1 + assert "DB write lag" in results[0].payload["details"]["error"] + + +@pytest.mark.asyncio +async def test_snapshot_updated_after_recovery(storage: Storage): + """After a recovery cycle, snapshot shows last_status='ok'.""" + settings = _make_settings() + await SystemHealthCheck(_make_client(_error_response("cloud")), storage, settings).run() + await SystemHealthCheck(_make_client(_ok_response("cloud")), storage, settings).run() + snap = await storage.get_system_health_snapshot("cloud") + assert snap["last_status"] == "ok" diff --git a/services/ha-diag-agent/tests/test_unavailable_entities.py b/services/ha-diag-agent/tests/test_unavailable_entities.py index c596582..e179bfb 100644 --- a/services/ha-diag-agent/tests/test_unavailable_entities.py +++ b/services/ha-diag-agent/tests/test_unavailable_entities.py @@ -407,3 +407,87 @@ async def test_area_omitted_when_unknown(storage: Storage): ) results = await check.run() assert "area" not in results[0].payload + + +# --------------------------------------------------------------------------- +# Phase 3 Flag #1: since = min(last_changed, first_seen) +# --------------------------------------------------------------------------- + + +def _make_state_with_last_changed( + entity_id: str, state: str, last_changed_iso: str +) -> dict: + return { + "entity_id": entity_id, + "state": state, + "attributes": {}, + "last_changed": last_changed_iso, + } + + +@pytest.mark.asyncio +async def test_since_uses_last_changed_when_earlier_than_baseline(storage: Storage): + """Entity's last_changed predates our baseline → duration computed from last_changed.""" + import datetime as dt + + now = time.time() + # Baseline recorded 1h ago (agent just started) + await storage.set_entity_unavailable_since("light.k", "unavailable", now - 3600) + + # HA says entity changed to unavailable 48h ago + lc_iso = ( + dt.datetime.fromtimestamp(now - 48 * 3600, tz=dt.timezone.utc) + .isoformat() + .replace("+00:00", "Z") + ) + states = [_make_state_with_last_changed("light.k", "unavailable", lc_iso)] + check = UnavailableEntitiesCheck( + _make_client(states), storage, _make_settings(unavailable_threshold_hours=0.0) + ) + results = await check.run() + + assert len(results) == 1 + # Duration should be ~48h, not ~1h + assert results[0].payload["duration_hours"] == pytest.approx(48.0, abs=0.1) + + +@pytest.mark.asyncio +async def test_since_ignores_last_changed_when_later_than_baseline(storage: Storage): + """Baseline predates last_changed → use baseline (entity was unavailable before + last_changed, e.g. if HA reports last_changed as now for some reason).""" + import datetime as dt + + now = time.time() + # Baseline recorded 48h ago + await storage.set_entity_unavailable_since("light.k", "unavailable", now - 48 * 3600) + + # HA says last_changed is only 2h ago (shouldn't override the older baseline) + lc_iso = ( + dt.datetime.fromtimestamp(now - 2 * 3600, tz=dt.timezone.utc) + .isoformat() + .replace("+00:00", "Z") + ) + states = [_make_state_with_last_changed("light.k", "unavailable", lc_iso)] + check = UnavailableEntitiesCheck( + _make_client(states), storage, _make_settings(unavailable_threshold_hours=0.0) + ) + results = await check.run() + + assert len(results) == 1 + # Duration should be ~48h (from baseline), not ~2h + assert results[0].payload["duration_hours"] == pytest.approx(48.0, abs=0.1) + + +@pytest.mark.asyncio +async def test_since_falls_back_gracefully_when_last_changed_missing(storage: Storage): + """No last_changed in state → uses baseline first_seen without error.""" + await storage.set_entity_unavailable_since( + "light.k", "unavailable", time.time() - 25 * 3600 + ) + states = [_make_state("light.k", "unavailable")] # no last_changed key + check = UnavailableEntitiesCheck( + _make_client(states), storage, _make_settings(unavailable_threshold_hours=0.0) + ) + results = await check.run() + assert len(results) == 1 + assert results[0].event_type == HAEventType.ha_entity_unavailable_long diff --git a/services/ha-diag-agent/tests/test_updates_available.py b/services/ha-diag-agent/tests/test_updates_available.py new file mode 100644 index 0000000..6b54670 --- /dev/null +++ b/services/ha-diag-agent/tests/test_updates_available.py @@ -0,0 +1,256 @@ +"""Unit tests for UpdatesAvailableCheck and UpdatesDigestCheck.""" +from __future__ import annotations + +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from ha_diag.checks.updates_available import ( + UpdatesAvailableCheck, + UpdatesDigestCheck, + _build_update_payload, +) +from ha_diag.config import Settings +from ha_diag.models import HAEventType +from ha_diag.storage import Storage + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_settings(**overrides) -> Settings: + defaults: dict = { + "ha_url": "http://test.local:8123", + "ha_token": "test", + "node_name": "test-node", + "location_tag": "test-loc", + "alert_cooldown_hours": 0.0, + "updates_cooldown_days": 0, # no dedup in most tests + "check_interval": 60, + "check_interval_unavailable": 3600, + } + defaults.update(overrides) + return Settings(**defaults) + + +def _make_client(states=None, error=None): + client = MagicMock() + if error: + client.get_states = AsyncMock(side_effect=error) + else: + client.get_states = AsyncMock(return_value=states or []) + return client + + +def _update_state( + entity_id: str = "update.homeassistant_core", + state: str = "on", + title: str = "Home Assistant Core", + installed: str = "2025.5.0", + latest: str = "2025.6.0", + release_summary: str | None = None, + release_url: str | None = None, +) -> dict: + attrs: dict = { + "title": title, + "installed_version": installed, + "latest_version": latest, + "in_progress": False, + "auto_update": False, + } + if release_summary: + attrs["release_summary"] = release_summary + if release_url: + attrs["release_url"] = release_url + return {"entity_id": entity_id, "state": state, "attributes": attrs} + + +# --------------------------------------------------------------------------- +# _build_update_payload helper +# --------------------------------------------------------------------------- + + +def test_build_update_payload_basic(): + attrs = {"title": "HA Core", "installed_version": "1.0", "latest_version": "2.0"} + p = _build_update_payload("update.ha_core", attrs) + assert p["entity_id"] == "update.ha_core" + assert p["title"] == "HA Core" + assert p["installed_version"] == "1.0" + assert p["latest_version"] == "2.0" + + +def test_build_update_payload_release_summary_truncated(): + long_notes = "x" * 3000 + attrs = {"release_summary": long_notes} + p = _build_update_payload("update.ha_core", attrs) + assert len(p["release_summary"]) == 2000 + + +def test_build_update_payload_release_url_omitted_when_absent(): + p = _build_update_payload("update.ha_core", {}) + assert "release_url" not in p + + +def test_build_update_payload_release_url_included_when_present(): + attrs = {"release_url": "https://github.com/..."} + p = _build_update_payload("update.x", attrs) + assert p["release_url"] == "https://github.com/..." + + +# --------------------------------------------------------------------------- +# UpdatesAvailableCheck (daily individual events) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_no_updates_returns_empty(storage: Storage): + states = [{"entity_id": "light.living_room", "state": "on", "attributes": {}}] + check = UpdatesAvailableCheck(_make_client(states), storage, _make_settings()) + assert await check.run() == [] + + +@pytest.mark.asyncio +async def test_update_off_state_not_emitted(storage: Storage): + states = [_update_state(state="off")] + check = UpdatesAvailableCheck(_make_client(states), storage, _make_settings()) + assert await check.run() == [] + + +@pytest.mark.asyncio +async def test_single_update_emits_event(storage: Storage): + states = [_update_state()] + check = UpdatesAvailableCheck(_make_client(states), storage, _make_settings()) + results = await check.run() + assert len(results) == 1 + assert results[0].event_type == HAEventType.ha_update_available + assert "2025.5.0" in results[0].message + assert "2025.6.0" in results[0].message + + +@pytest.mark.asyncio +async def test_multiple_updates_emit_multiple_events(storage: Storage): + states = [ + _update_state("update.ha_core"), + _update_state("update.mosquitto", title="Mosquitto"), + ] + check = UpdatesAvailableCheck(_make_client(states), storage, _make_settings()) + results = await check.run() + assert len(results) == 2 + assert all(r.event_type == HAEventType.ha_update_available for r in results) + + +@pytest.mark.asyncio +async def test_cooldown_prevents_same_update_next_day(storage: Storage): + states = [_update_state()] + settings = _make_settings(updates_cooldown_days=7) + check = UpdatesAvailableCheck(_make_client(states), storage, settings) + r1 = await check.run() + r2 = await check.run() + assert len(r1) == 1 + assert r2 == [] + + +@pytest.mark.asyncio +async def test_no_cooldown_allows_repeat(storage: Storage): + states = [_update_state()] + check = UpdatesAvailableCheck(_make_client(states), storage, _make_settings(updates_cooldown_days=0)) + r1 = await check.run() + r2 = await check.run() + assert len(r1) == 1 + assert len(r2) == 1 + + +@pytest.mark.asyncio +async def test_payload_contains_version_fields(storage: Storage): + states = [_update_state(installed="2025.5.0", latest="2025.6.0")] + check = UpdatesAvailableCheck(_make_client(states), storage, _make_settings()) + results = await check.run() + p = results[0].payload + assert p["installed_version"] == "2025.5.0" + assert p["latest_version"] == "2025.6.0" + assert p["in_progress"] is False + + +@pytest.mark.asyncio +async def test_ha_error_returns_empty(storage: Storage): + check = UpdatesAvailableCheck( + _make_client(error=ConnectionError("HA down")), storage, _make_settings() + ) + assert await check.run() == [] + + +# --------------------------------------------------------------------------- +# UpdatesDigestCheck (Sunday digest) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_digest_no_updates_returns_empty(storage: Storage): + check = UpdatesDigestCheck(_make_client([]), storage, _make_settings()) + assert await check.run() == [] + + +@pytest.mark.asyncio +async def test_digest_emits_single_event_for_all_updates(storage: Storage): + states = [ + _update_state("update.ha_core"), + _update_state("update.mosquitto", title="Mosquitto"), + _update_state("update.esphome", title="ESPHome"), + ] + check = UpdatesDigestCheck(_make_client(states), storage, _make_settings()) + results = await check.run() + assert len(results) == 1 + p = results[0].payload + assert p["digest"] is True + assert p["count"] == 3 + assert len(p["updates"]) == 3 + + +@pytest.mark.asyncio +async def test_digest_payload_has_digest_true(storage: Storage): + states = [_update_state()] + check = UpdatesDigestCheck(_make_client(states), storage, _make_settings()) + results = await check.run() + assert results[0].payload["digest"] is True + + +@pytest.mark.asyncio +async def test_digest_weekly_dedup_prevents_same_week_refiring(storage: Storage): + states = [_update_state()] + check = UpdatesDigestCheck(_make_client(states), storage, _make_settings()) + r1 = await check.run() + r2 = await check.run() + assert len(r1) == 1 + assert r2 == [] + + +@pytest.mark.asyncio +async def test_digest_fires_independently_of_daily_dedup(storage: Storage): + """Daily cooldown on entity X doesn't suppress Sunday digest.""" + states = [_update_state()] + settings = _make_settings(updates_cooldown_days=7) + + # Daily check marks alert_key="update_available:update.homeassistant_core" + daily = UpdatesAvailableCheck(_make_client(states), storage, settings) + await daily.run() + + # Digest uses different key "update_digest:{week}" — should still fire + digest = UpdatesDigestCheck(_make_client(states), storage, settings) + r = await digest.run() + assert len(r) == 1 + assert r[0].payload["digest"] is True + + +@pytest.mark.asyncio +async def test_digest_name_is_updates_digest(storage: Storage): + check = UpdatesDigestCheck(_make_client([]), storage, _make_settings()) + assert check.name == "updates_digest" + + +@pytest.mark.asyncio +async def test_daily_check_name_is_updates_available(storage: Storage): + check = UpdatesAvailableCheck(_make_client([]), storage, _make_settings()) + assert check.name == "updates_available"