feat(ha-diag-agent): three REST diagnostic checks + Phase 3 flag fixes

New checks:
- SystemHealthCheck (15min interval): detects newly-failing HA
  integrations via /api/system_health snapshot diff; transition-based
  dedup (ok→error fires, sustained error silent, error→ok clears alert)
- UpdatesAvailableCheck (daily cron 09:00): per-update ha_update_available
  events with 7-day dedup; release notes truncated at 2000 chars
- UpdatesDigestCheck (Sunday cron 09:00): single digest event with all
  pending updates; weekly ISO-week dedup, independent of daily dedup key
- AutomationFailuresCheck (30min interval): detects automations with
  N consecutive failures (default 3) via /api/trace/automation/<id>;
  6h cooldown per automation

Phase 3 flag fixes:
- Flag #1 (since field): UnavailableEntitiesCheck now uses
  min(state.last_changed, baseline.first_seen) as effective "since",
  giving accurate duration when agent was offline at entity's first fail
- Flag #3 (registry cache): HAClient.get_entity_registry() caches
  response in-process with configurable TTL (default 300s); avoids
  repeated API calls across concurrent check cycles; invalidate_registry_cache()
  for manual invalidation

Storage: system_health_snapshot table (component, last_status, last_seen_at,
payload) created automatically on next Storage.open() call

Config additions (all with defaults): entity_registry_cache_ttl=300,
system_health_check_interval=900, automation_check_interval=1800,
automation_failure_threshold=3, updates_check_hour=9,
updates_check_minute=0, updates_cooldown_days=7

Tests: 95 unit tests pass (49 new), 13 integration tests pass (9 new);
3 skipped (live-HA token not set in CI)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Oskar Kapala 2026-05-29 14:43:10 +02:00
parent f41ec5d0c5
commit 3499b2f280
17 changed files with 1807 additions and 10 deletions

View file

@ -0,0 +1,97 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from ..ha_client import HAClient
from ..models import CheckResult, HAEventType, Severity
from ..storage import Storage
from .base import Check
if TYPE_CHECKING:
from ..config import Settings
class AutomationFailuresCheck(Check):
"""Detects automations with consecutive run failures.
For each enabled automation (state="on"), fetches the last N run traces.
When all N most-recent traces indicate failure, emits ha_automation_failing
with a 6-hour dedup per automation.
"""
name = "automation_failures"
def __init__(
self,
ha_client: HAClient,
storage: Storage,
settings: "Settings",
) -> None:
self._client = ha_client
self._storage = storage
self._settings = settings
async def run(self) -> list[CheckResult]:
try:
all_states = await self._client.get_states()
except Exception:
return []
automations = [
s for s in all_states
if s["entity_id"].startswith("automation.") and s["state"] == "on"
]
results: list[CheckResult] = []
cooldown_s = self._settings.alert_cooldown_hours * 3600
threshold = self._settings.automation_failure_threshold
for auto_state in automations:
eid = auto_state["entity_id"]
try:
traces = await self._client.get_automation_traces(eid)
except Exception:
continue
if not traces or len(traces) < threshold:
continue
recent = traces[:threshold]
failures = [t for t in recent if _is_trace_failure(t)]
if len(failures) < threshold:
continue
alert_key = f"automation_failing:{eid}"
if await self._storage.was_alert_sent(alert_key, cooldown_s):
continue
attrs = auto_state.get("attributes", {})
friendly_name = attrs.get("friendly_name", eid)
last_failures = [
{"timestamp": t.get("timestamp"), "error": t.get("error", "")}
for t in failures
]
results.append(CheckResult(
healthy=False,
event_type=HAEventType.ha_automation_failing,
severity=Severity.warning,
message=(
f"Automation '{friendly_name}' failed "
f"{len(failures)} consecutive time(s)"
),
payload={
"entity_id": eid,
"friendly_name": friendly_name,
"last_failures": last_failures,
"total_recent_failures": len(failures),
},
))
await self._storage.mark_alert_sent(alert_key)
return results
def _is_trace_failure(trace: dict[str, Any]) -> bool:
"""A trace is a failure if it has a non-empty error or an explicit failed state."""
return bool(trace.get("error")) or trace.get("state") == "failed"

View file

@ -0,0 +1,110 @@
from __future__ import annotations
import json
from typing import TYPE_CHECKING, Any
from ..ha_client import HAClient
from ..models import CheckResult, HAEventType, Severity
from ..storage import Storage
from .base import Check
if TYPE_CHECKING:
from ..config import Settings
class SystemHealthCheck(Check):
"""Detects newly-failing HA integrations via /api/system_health.
Logic per run:
1. Fetch /api/system_health and parse per-component statuses.
2. Diff against stored snapshots in system_health_snapshot.
3. Emit ha_system_health_degraded on ok error transitions.
4. Clear alerts_sent on error ok recovery (next degradation re-alerts).
5. Update all component snapshots.
API errors (HA unreachable) return no results; HeartbeatCheck handles
HA reachability separately.
"""
name = "system_health"
def __init__(
self,
ha_client: HAClient,
storage: Storage,
settings: "Settings",
) -> None:
self._client = ha_client
self._storage = storage
self._settings = settings
async def run(self) -> list[CheckResult]:
try:
health_data = await self._client.get_system_health()
except Exception:
return []
statuses = _extract_component_statuses(health_data)
results: list[CheckResult] = []
for component, info in statuses.items():
status = info["status"]
details = info.get("details", {})
prev = await self._storage.get_system_health_snapshot(component)
if status == "error":
if prev is None or prev["last_status"] == "ok":
results.append(CheckResult(
healthy=False,
event_type=HAEventType.ha_system_health_degraded,
severity=Severity.warning,
message=f"HA component '{component}' is degraded",
payload={
"component": component,
"previous_status": prev["last_status"] if prev else "unknown",
"current_status": "error",
"details": details,
},
))
elif status == "ok" and prev and prev["last_status"] == "error":
await self._storage.clear_alert(f"system_health:{component}")
await self._storage.upsert_system_health_snapshot(
component, status, json.dumps(details, default=str)
)
return results
def _extract_component_statuses(
health_data: dict[str, Any],
) -> dict[str, dict[str, Any]]:
"""Parse HA /api/system_health into {component: {status, details}}.
Handles multiple HA response shapes:
- Typed: {component: {"type": "result"|"error", "data": {...}}}
- Legacy: {component: {"error": "msg"}} or {component: {plain_data}}
- Nested: {"checks": {component: {...}}, "info": {...}}
"""
checks = health_data.get("checks", health_data)
if not isinstance(checks, dict):
return {}
result: dict[str, dict[str, Any]] = {}
for component, value in checks.items():
if not isinstance(value, dict):
continue
if value.get("type") == "error" or value.get("error"):
result[component] = {
"status": "error",
"details": {"error": str(value.get("error") or value.get("type", "error"))},
}
else:
inner = value.get("data", value)
result[component] = {
"status": "ok",
"details": inner if isinstance(inner, dict) else value,
}
return result

View file

@ -15,6 +15,20 @@ if TYPE_CHECKING:
_BAD_STATES = frozenset({"unavailable", "unknown"})
def _parse_last_changed_ts(value: str | None) -> float | None:
"""Parse HA last_changed ISO string → Unix timestamp.
Returns None on missing or malformed input so callers can fall back
to the baseline first_seen without special-casing.
"""
if not value:
return None
try:
return datetime.fromisoformat(value).timestamp()
except (ValueError, TypeError):
return None
class UnavailableEntitiesCheck(Check):
"""Detects entities stuck in unavailable/unknown state.
@ -90,7 +104,16 @@ class UnavailableEntitiesCheck(Check):
first_at = await self._storage.get_entity_first_unavailable_at(eid)
if first_at is None:
continue
duration_h = (now - first_at) / 3600
# Phase 3 Flag #1: if HA reports an earlier last_changed (entity was
# already unavailable before the agent started/reconnected), use that
# as the authoritative "since" so duration is accurate.
last_changed_ts = _parse_last_changed_ts(state_data.get("last_changed"))
effective_since = (
min(last_changed_ts, first_at)
if last_changed_ts is not None
else first_at
)
duration_h = (now - effective_since) / 3600
if duration_h < threshold_h:
continue
alert_key = f"entity_unavailable:{eid}"
@ -99,7 +122,7 @@ class UnavailableEntitiesCheck(Check):
to_alert.append({
"entity_id": eid,
"state": state_data["state"],
"first_at": first_at,
"first_at": effective_since,
"duration_h": duration_h,
"domain": eid.split(".")[0],
"integration": integration_map.get(eid),

View file

@ -0,0 +1,123 @@
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING, Any
from ..ha_client import HAClient
from ..models import CheckResult, HAEventType, Severity
from ..storage import Storage
from .base import Check
if TYPE_CHECKING:
from ..config import Settings
_MAX_RELEASE_NOTES = 2000
class UpdatesAvailableCheck(Check):
"""Detects available HA core/add-on updates via update.* entities.
Runs daily. Emits one ha_update_available event per update entity whose
7-day dedup window has expired. Falls back gracefully when HA is down.
"""
name = "updates_available"
def __init__(
self,
ha_client: HAClient,
storage: Storage,
settings: "Settings",
) -> None:
self._client = ha_client
self._storage = storage
self._settings = settings
async def run(self) -> list[CheckResult]:
updates = await self._fetch_active_updates()
if not updates:
return []
results: list[CheckResult] = []
cooldown_s = self._settings.updates_cooldown_days * 86400
for state in updates:
eid = state["entity_id"]
alert_key = f"update_available:{eid}"
if await self._storage.was_alert_sent(alert_key, cooldown_s):
continue
attrs = state.get("attributes", {})
results.append(CheckResult(
healthy=False,
event_type=HAEventType.ha_update_available,
severity=Severity.info,
message=(
f"Update available: {attrs.get('title', eid)} "
f"{attrs.get('installed_version', '?')}"
f"{attrs.get('latest_version', '?')}"
),
payload=_build_update_payload(eid, attrs),
))
await self._storage.mark_alert_sent(alert_key)
return results
async def _fetch_active_updates(self) -> list[dict[str, Any]]:
try:
all_states = await self._client.get_states()
except Exception:
return []
return [
s for s in all_states
if s["entity_id"].startswith("update.") and s["state"] == "on"
]
class UpdatesDigestCheck(UpdatesAvailableCheck):
"""Weekly Sunday digest: single event listing all pending updates.
Deduped per ISO week (won't re-fire if triggered multiple times on the
same Sunday, e.g. manual + scheduled).
"""
name = "updates_digest"
async def run(self) -> list[CheckResult]:
updates = await self._fetch_active_updates()
if not updates:
return []
week_key = datetime.now().strftime("%G-W%V")
alert_key = f"update_digest:{week_key}"
if await self._storage.was_alert_sent(alert_key, 6 * 86400):
return []
all_payloads = [
_build_update_payload(s["entity_id"], s.get("attributes", {}))
for s in updates
]
await self._storage.mark_alert_sent(alert_key)
return [CheckResult(
healthy=False,
event_type=HAEventType.ha_update_available,
severity=Severity.info,
message=f"Weekly digest: {len(all_payloads)} update(s) available",
payload={"digest": True, "updates": all_payloads, "count": len(all_payloads)},
)]
def _build_update_payload(entity_id: str, attrs: dict[str, Any]) -> dict[str, Any]:
payload: dict[str, Any] = {
"entity_id": entity_id,
"title": attrs.get("title", entity_id),
"installed_version": attrs.get("installed_version"),
"latest_version": attrs.get("latest_version"),
"in_progress": attrs.get("in_progress", False),
"auto_update": attrs.get("auto_update", False),
}
if attrs.get("release_url"):
payload["release_url"] = attrs["release_url"]
summary = attrs.get("release_summary")
if summary:
payload["release_summary"] = summary[:_MAX_RELEASE_NOTES]
return payload

View file

@ -30,6 +30,21 @@ class Settings(BaseSettings):
integration_failure_min_entities: int = 3 # min count to trigger integration event
alert_cooldown_hours: float = 6.0 # don't re-alert same entity within N hours
# Phase 3 Flag #3: entity registry cache TTL
entity_registry_cache_ttl: int = 300 # seconds
# SystemHealthCheck
system_health_check_interval: int = 900 # 15 min
# AutomationFailuresCheck
automation_check_interval: int = 1800 # 30 min
automation_failure_threshold: int = 3 # consecutive failures before alert
# UpdatesAvailableCheck
updates_check_hour: int = 9
updates_check_minute: int = 0
updates_cooldown_days: int = 7 # don't re-alert same update within N days
# API server
port: int = 8087
log_level: str = "info"

View file

@ -1,5 +1,6 @@
from __future__ import annotations
import time
from typing import Any
import aiohttp
@ -24,9 +25,17 @@ class HAClient:
session-borrower: it never opens or closes the session it receives.
"""
def __init__(self, base_url: str, session: aiohttp.ClientSession) -> None:
def __init__(
self,
base_url: str,
session: aiohttp.ClientSession,
entity_registry_cache_ttl: float = 300.0,
) -> None:
self._base_url = base_url.rstrip("/")
self._session = session
self._registry_cache_ttl = entity_registry_cache_ttl
self._registry_cache: list[dict[str, Any]] | None = None
self._registry_fetched_at: float = 0.0
async def get_api_status(self) -> dict[str, Any]:
"""GET /api/ — returns {"message": "API running."} when HA is up."""
@ -57,12 +66,29 @@ class HAClient:
Each entry includes entity_id, platform (integration name), area_id,
config_entry_id, and other metadata.
Result is cached in-process for entity_registry_cache_ttl seconds to
avoid hammering HA on every check cycle (Phase 3 Flag #3).
"""
now = time.monotonic()
if (
self._registry_cache is not None
and (now - self._registry_fetched_at) < self._registry_cache_ttl
):
return self._registry_cache
async with self._session.get(
f"{self._base_url}/api/config/entity_registry"
) as resp:
resp.raise_for_status()
return await resp.json()
result = await resp.json()
self._registry_cache = result
self._registry_fetched_at = now
return result
def invalidate_registry_cache(self) -> None:
"""Force the next get_entity_registry() call to fetch fresh data."""
self._registry_cache = None
self._registry_fetched_at = 0.0
async def get_automation_traces(self, automation_id: str) -> list[dict[str, Any]]:
"""GET /api/trace/automation/<id> — last run traces for an automation."""

View file

@ -11,8 +11,11 @@ import uvicorn
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from .api import app, register_checks
from .checks.automation_failures import AutomationFailuresCheck
from .checks.heartbeat import HeartbeatCheck
from .checks.system_health import SystemHealthCheck
from .checks.unavailable_entities import UnavailableEntitiesCheck
from .checks.updates_available import UpdatesAvailableCheck, UpdatesDigestCheck
from .config import Settings
from .event_emitter import EventEmitter
from .ha_client import HAClient, make_session
@ -93,37 +96,75 @@ async def run(settings: Settings) -> None:
# Shared session — created once at startup, closed on shutdown
session = make_session(settings.ha_token, settings.ha_timeout)
ha_client = HAClient(settings.ha_url, session)
ha_client = HAClient(
settings.ha_url, session,
entity_registry_cache_ttl=settings.entity_registry_cache_ttl,
)
heartbeat = HeartbeatCheck(ha_client)
unavailable = UnavailableEntitiesCheck(ha_client, storage, settings)
system_health = SystemHealthCheck(ha_client, storage, settings)
automation_failures = AutomationFailuresCheck(ha_client, storage, settings)
updates_daily = UpdatesAvailableCheck(ha_client, storage, settings)
updates_digest = UpdatesDigestCheck(ha_client, storage, settings)
all_checks = [heartbeat, unavailable]
all_checks = [heartbeat, unavailable, system_health, automation_failures,
updates_daily, updates_digest]
register_checks(all_checks, settings.node_name, settings.location_tag)
scheduler = AsyncIOScheduler()
scheduler.add_job(
_run_check_and_emit,
"interval",
_run_check_and_emit, "interval",
seconds=settings.check_interval,
args=[heartbeat, emitter, storage],
id="check_heartbeat",
next_run_time=datetime.now(),
)
scheduler.add_job(
_run_check_and_emit,
"interval",
_run_check_and_emit, "interval",
seconds=settings.check_interval_unavailable,
args=[unavailable, emitter, storage],
id="check_unavailable_entities",
next_run_time=datetime.now(),
)
scheduler.add_job(
_run_check_and_emit, "interval",
seconds=settings.system_health_check_interval,
args=[system_health, emitter, storage],
id="check_system_health",
next_run_time=datetime.now(),
)
scheduler.add_job(
_run_check_and_emit, "interval",
seconds=settings.automation_check_interval,
args=[automation_failures, emitter, storage],
id="check_automation_failures",
next_run_time=datetime.now(),
)
scheduler.add_job(
_run_check_and_emit, "cron",
hour=settings.updates_check_hour,
minute=settings.updates_check_minute,
args=[updates_daily, emitter, storage],
id="check_updates_available",
)
scheduler.add_job(
_run_check_and_emit, "cron",
day_of_week="sun",
hour=settings.updates_check_hour,
minute=settings.updates_check_minute,
args=[updates_digest, emitter, storage],
id="check_updates_digest",
)
scheduler.start()
_log.info(
"scheduler_started",
checks=[c.name for c in all_checks],
heartbeat_interval=settings.check_interval,
unavailable_interval=settings.check_interval_unavailable,
system_health_interval=settings.system_health_check_interval,
automation_interval=settings.automation_check_interval,
updates_hour=settings.updates_check_hour,
)
config = uvicorn.Config(

View file

@ -7,6 +7,13 @@ from typing import Any
import aiosqlite
_SCHEMA = """
CREATE TABLE IF NOT EXISTS system_health_snapshot (
component TEXT PRIMARY KEY,
last_status TEXT NOT NULL,
last_seen_at REAL NOT NULL,
payload TEXT NOT NULL DEFAULT '{}'
);
CREATE TABLE IF NOT EXISTS entity_baseline (
entity_id TEXT PRIMARY KEY,
-- state when entity first entered unavailable/unknown
@ -185,3 +192,36 @@ class Storage:
"DELETE FROM alerts_sent WHERE alert_key = ?", (alert_key,)
)
await self._conn().commit()
# ------------------------------------------------------------------
# system_health_snapshot — tracks last-known per-component status
# ------------------------------------------------------------------
async def get_system_health_snapshot(
self, component: str
) -> dict[str, Any] | None:
"""Return the stored snapshot for a component, or None if unseen."""
async with self._conn().execute(
"SELECT * FROM system_health_snapshot WHERE component = ?",
(component,),
) as cur:
row = await cur.fetchone()
return dict(row) if row else None
async def upsert_system_health_snapshot(
self, component: str, last_status: str, payload: str
) -> None:
"""Insert or replace the snapshot for a component."""
await self._conn().execute(
"""
INSERT INTO system_health_snapshot
(component, last_status, last_seen_at, payload)
VALUES (?, ?, ?, ?)
ON CONFLICT(component) DO UPDATE SET
last_status = excluded.last_status,
last_seen_at = excluded.last_seen_at,
payload = excluded.payload
""",
(component, last_status, time.time(), payload),
)
await self._conn().commit()

View file

@ -59,4 +59,6 @@ def mock_ha_client():
client.get_api_status = AsyncMock(return_value={"message": "API running."})
client.get_states = AsyncMock(return_value=[])
client.get_entity_registry = AsyncMock(return_value=[])
client.get_system_health = AsyncMock(return_value={})
client.get_automation_traces = AsyncMock(return_value=[])
return client

View file

@ -0,0 +1,167 @@
"""Integration tests for AutomationFailuresCheck.
Uses real aiosqlite Storage + EventEmitter + mocked HTTP.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import AsyncGenerator
import pytest
import pytest_asyncio
from aioresponses import aioresponses
from ha_diag.checks.automation_failures import AutomationFailuresCheck
from ha_diag.config import Settings
from ha_diag.event_emitter import EventEmitter
from ha_diag.ha_client import HAClient, make_session
from ha_diag.models import HAEventType
from ha_diag.storage import Storage
HA_URL = "http://ha-test-ken:8123"
def _settings(**overrides) -> Settings:
defaults: dict = {
"ha_url": HA_URL,
"ha_token": "test-token",
"node_name": "piha",
"location_tag": "ken",
"alert_cooldown_hours": 0.0,
"automation_failure_threshold": 3,
"check_interval": 60,
"check_interval_unavailable": 3600,
}
defaults.update(overrides)
return Settings(**defaults)
@pytest_asyncio.fixture
async def storage(tmp_path: Path) -> AsyncGenerator[Storage, None]:
s = Storage(tmp_path / "integration_test.db")
await s.open()
yield s
await s.close()
@pytest.fixture
def events_dir(tmp_path: Path) -> Path:
d = tmp_path / "events"
d.mkdir()
return d
def _auto_states(*entity_ids: str) -> list[dict]:
return [
{
"entity_id": eid,
"state": "on",
"attributes": {"friendly_name": eid.split(".")[-1].replace("_", " ").title()},
}
for eid in entity_ids
]
def _fail_traces(n: int = 3) -> list[dict]:
return [
{
"run_id": f"run-{i}",
"timestamp": f"2026-05-27T{10+i:02d}:00:00+00:00",
"trigger": "state",
"state": "stopped",
"error": f"Script error #{i}",
}
for i in range(n)
]
def _ok_traces(n: int = 3) -> list[dict]:
return [
{
"run_id": f"run-{i}",
"timestamp": f"2026-05-27T{10+i:02d}:00:00+00:00",
"trigger": "state",
"state": "stopped",
"error": None,
}
for i in range(n)
]
@pytest.mark.integration
async def test_failing_automation_emits_event_and_writes_file(
storage: Storage, events_dir: Path
):
"""3 consecutive failures → event file written with correct structure."""
states = _auto_states("automation.morning_lights")
traces = _fail_traces(3)
emitter = EventEmitter(events_dir, node_name="piha", location_tag="ken")
with aioresponses() as m:
m.get(f"{HA_URL}/api/states", payload=states)
m.get(f"{HA_URL}/api/trace/automation/automation.morning_lights", payload=traces)
async with make_session("test-token") as session:
client = HAClient(HA_URL, session)
check = AutomationFailuresCheck(client, storage, _settings())
results = await check.run()
assert len(results) == 1
r = results[0]
assert r.event_type == HAEventType.ha_automation_failing
assert r.payload["entity_id"] == "automation.morning_lights"
assert r.payload["total_recent_failures"] == 3
emitter.emit(
event_type=r.event_type,
severity=r.severity.value,
service="homeassistant",
message=r.message,
payload=r.payload,
)
files = list(events_dir.glob("*.json"))
assert len(files) == 1
data = json.loads(files[0].read_text())
assert data["type"] == "ha_automation_failing"
assert data["payload"]["location_tag"] == "ken"
assert "last_failures" in data["payload"]
@pytest.mark.integration
async def test_healthy_automation_no_event(storage: Storage):
"""All recent runs successful → no event."""
states = _auto_states("automation.morning_lights")
traces = _ok_traces(3)
with aioresponses() as m:
m.get(f"{HA_URL}/api/states", payload=states)
m.get(f"{HA_URL}/api/trace/automation/automation.morning_lights", payload=traces)
async with make_session("test-token") as session:
client = HAClient(HA_URL, session)
check = AutomationFailuresCheck(client, storage, _settings())
results = await check.run()
assert results == []
@pytest.mark.integration
async def test_cooldown_suppresses_duplicate(storage: Storage):
"""Second run within cooldown window → no duplicate event."""
states = _auto_states("automation.morning_lights")
traces = _fail_traces(3)
settings = _settings(alert_cooldown_hours=6.0)
for _ in range(2):
with aioresponses() as m:
m.get(f"{HA_URL}/api/states", payload=states)
m.get(f"{HA_URL}/api/trace/automation/automation.morning_lights", payload=traces)
async with make_session("test-token") as session:
check = AutomationFailuresCheck(
HAClient(HA_URL, session), storage, settings
)
results = await check.run()
if _ == 0:
assert len(results) == 1
else:
assert results == []

View file

@ -0,0 +1,151 @@
"""Integration tests for SystemHealthCheck using aioresponses.
Uses real aiosqlite Storage + EventEmitter + mocked HTTP.
Marked 'integration' because it exercises the full stack end-to-end.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import AsyncGenerator
import pytest
import pytest_asyncio
from aioresponses import aioresponses
from ha_diag.checks.system_health import SystemHealthCheck
from ha_diag.config import Settings
from ha_diag.event_emitter import EventEmitter
from ha_diag.ha_client import HAClient, make_session
from ha_diag.models import HAEventType
from ha_diag.storage import Storage
HA_URL = "http://ha-test-ken:8123"
def _settings(**overrides) -> Settings:
defaults: dict = {
"ha_url": HA_URL,
"ha_token": "test-token",
"node_name": "piha",
"location_tag": "ken",
"alert_cooldown_hours": 0.0,
"check_interval": 60,
"check_interval_unavailable": 3600,
}
defaults.update(overrides)
return Settings(**defaults)
@pytest_asyncio.fixture
async def storage(tmp_path: Path) -> AsyncGenerator[Storage, None]:
s = Storage(tmp_path / "integration_test.db")
await s.open()
yield s
await s.close()
@pytest.fixture
def events_dir(tmp_path: Path) -> Path:
d = tmp_path / "events"
d.mkdir()
return d
@pytest.mark.integration
async def test_system_health_ok_components_no_event(
storage: Storage, events_dir: Path
):
"""All components healthy on first run → no events emitted."""
health = {
"homeassistant": {"type": "result", "data": {"version": "2025.5.0"}},
"recorder": {"type": "result", "data": {"backlog": 0}},
}
emitter = EventEmitter(events_dir, node_name="piha", location_tag="ken")
with aioresponses() as m:
m.get(f"{HA_URL}/api/system_health", payload=health)
async with make_session("test-token") as session:
client = HAClient(HA_URL, session)
check = SystemHealthCheck(client, storage, _settings())
results = await check.run()
assert results == []
assert not list(events_dir.glob("*.json"))
@pytest.mark.integration
async def test_system_health_degraded_emits_event_and_writes_file(
storage: Storage, events_dir: Path
):
"""Component degrades: event emitted + file written with correct structure."""
# First run: all ok
health_ok = {"cloud": {"type": "result", "data": {}}}
health_err = {"cloud": {"type": "error", "error": "Cloud connection lost"}}
emitter = EventEmitter(events_dir, node_name="piha", location_tag="ken")
with aioresponses() as m:
m.get(f"{HA_URL}/api/system_health", payload=health_ok)
async with make_session("test-token") as session:
client = HAClient(HA_URL, session)
await SystemHealthCheck(client, storage, _settings()).run()
# Second run: cloud errors
with aioresponses() as m:
m.get(f"{HA_URL}/api/system_health", payload=health_err)
async with make_session("test-token") as session:
client = HAClient(HA_URL, session)
check = SystemHealthCheck(client, storage, _settings())
results = await check.run()
assert len(results) == 1
assert results[0].event_type == HAEventType.ha_system_health_degraded
emitter.emit(
event_type=results[0].event_type,
severity=results[0].severity.value,
service="homeassistant",
message=results[0].message,
payload=results[0].payload,
)
files = list(events_dir.glob("*.json"))
assert len(files) == 1
data = json.loads(files[0].read_text())
assert data["type"] == "ha_system_health_degraded"
assert data["payload"]["component"] == "cloud"
assert data["payload"]["location_tag"] == "ken"
@pytest.mark.integration
async def test_system_health_recovery_and_re_degradation(storage: Storage):
"""Full ok→error→ok→error cycle: events fire on degradation, not on recovery."""
def _run(health):
with aioresponses() as m:
m.get(f"{HA_URL}/api/system_health", payload=health)
return make_session("test-token"), health
settings = _settings()
async def run_once(health):
with aioresponses() as m:
m.get(f"{HA_URL}/api/system_health", payload=health)
async with make_session("test-token") as session:
return await SystemHealthCheck(
HAClient(HA_URL, session), storage, settings
).run()
ok_h = {"cloud": {"type": "result", "data": {}}}
err_h = {"cloud": {"type": "error", "error": "timeout"}}
r1 = await run_once(ok_h) # baseline ok
r2 = await run_once(err_h) # first degradation
r3 = await run_once(err_h) # sustained error (no dup)
r4 = await run_once(ok_h) # recovery
r5 = await run_once(err_h) # second degradation
assert r1 == []
assert len(r2) == 1
assert r3 == []
assert r4 == []
assert len(r5) == 1

View file

@ -0,0 +1,169 @@
"""Integration tests for UpdatesAvailableCheck and UpdatesDigestCheck.
Uses real aiosqlite Storage + EventEmitter + mocked HTTP.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import AsyncGenerator
import pytest
import pytest_asyncio
from aioresponses import aioresponses
from ha_diag.checks.updates_available import UpdatesAvailableCheck, UpdatesDigestCheck
from ha_diag.config import Settings
from ha_diag.event_emitter import EventEmitter
from ha_diag.ha_client import HAClient, make_session
from ha_diag.models import HAEventType
from ha_diag.storage import Storage
HA_URL = "http://ha-test-ken:8123"
def _settings(**overrides) -> Settings:
defaults: dict = {
"ha_url": HA_URL,
"ha_token": "test-token",
"node_name": "piha",
"location_tag": "ken",
"alert_cooldown_hours": 0.0,
"updates_cooldown_days": 0,
"check_interval": 60,
"check_interval_unavailable": 3600,
}
defaults.update(overrides)
return Settings(**defaults)
@pytest_asyncio.fixture
async def storage(tmp_path: Path) -> AsyncGenerator[Storage, None]:
s = Storage(tmp_path / "integration_test.db")
await s.open()
yield s
await s.close()
@pytest.fixture
def events_dir(tmp_path: Path) -> Path:
d = tmp_path / "events"
d.mkdir()
return d
def _update_states(*entity_ids: str) -> list[dict]:
return [
{
"entity_id": eid,
"state": "on",
"attributes": {
"title": eid.split(".")[-1].replace("_", " ").title(),
"installed_version": "1.0.0",
"latest_version": "2.0.0",
"in_progress": False,
"auto_update": False,
},
}
for eid in entity_ids
]
@pytest.mark.integration
async def test_individual_updates_written_to_disk(storage: Storage, events_dir: Path):
"""2 pending updates → 2 event files with correct structure."""
states = _update_states("update.ha_core", "update.mosquitto")
emitter = EventEmitter(events_dir, node_name="piha", location_tag="ken")
with aioresponses() as m:
m.get(f"{HA_URL}/api/states", payload=states)
async with make_session("test-token") as session:
client = HAClient(HA_URL, session)
check = UpdatesAvailableCheck(client, storage, _settings())
results = await check.run()
assert len(results) == 2
for r in results:
assert r.event_type == HAEventType.ha_update_available
emitter.emit(
event_type=r.event_type,
severity=r.severity.value,
service="homeassistant",
message=r.message,
payload=r.payload,
)
files = list(events_dir.glob("*.json"))
assert len(files) == 2
for f in files:
data = json.loads(f.read_text())
assert data["type"] == "ha_update_available"
assert data["payload"]["location_tag"] == "ken"
assert "entity_id" in data["payload"]
@pytest.mark.integration
async def test_digest_writes_single_event_file(storage: Storage, events_dir: Path):
"""Sunday digest → single event file with digest=True payload."""
states = _update_states("update.ha_core", "update.mosquitto", "update.esphome")
emitter = EventEmitter(events_dir, node_name="piha", location_tag="ken")
with aioresponses() as m:
m.get(f"{HA_URL}/api/states", payload=states)
async with make_session("test-token") as session:
client = HAClient(HA_URL, session)
check = UpdatesDigestCheck(client, storage, _settings())
results = await check.run()
assert len(results) == 1
r = results[0]
assert r.payload["digest"] is True
assert r.payload["count"] == 3
emitter.emit(
event_type=r.event_type,
severity=r.severity.value,
service="homeassistant",
message=r.message,
payload=r.payload,
)
files = list(events_dir.glob("*.json"))
assert len(files) == 1
data = json.loads(files[0].read_text())
assert data["payload"]["digest"] is True
assert len(data["payload"]["updates"]) == 3
@pytest.mark.integration
async def test_dedup_across_daily_and_digest_independent(storage: Storage):
"""Daily dedup key doesn't suppress digest, and vice versa."""
states = _update_states("update.ha_core")
settings = _settings(updates_cooldown_days=7)
# Daily check
with aioresponses() as m:
m.get(f"{HA_URL}/api/states", payload=states)
async with make_session("test-token") as session:
r1 = await UpdatesAvailableCheck(
HAClient(HA_URL, session), storage, settings
).run()
assert len(r1) == 1
# Daily again — cooldown active
with aioresponses() as m:
m.get(f"{HA_URL}/api/states", payload=states)
async with make_session("test-token") as session:
r2 = await UpdatesAvailableCheck(
HAClient(HA_URL, session), storage, settings
).run()
assert r2 == []
# Digest — different key, should still fire
with aioresponses() as m:
m.get(f"{HA_URL}/api/states", payload=states)
async with make_session("test-token") as session:
r3 = await UpdatesDigestCheck(
HAClient(HA_URL, session), storage, settings
).run()
assert len(r3) == 1
assert r3[0].payload["digest"] is True

View file

@ -0,0 +1,217 @@
"""Unit tests for AutomationFailuresCheck."""
from __future__ import annotations
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
from ha_diag.checks.automation_failures import AutomationFailuresCheck, _is_trace_failure
from ha_diag.config import Settings
from ha_diag.models import HAEventType, Severity
from ha_diag.storage import Storage
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_settings(**overrides) -> Settings:
defaults: dict = {
"ha_url": "http://test.local:8123",
"ha_token": "test",
"node_name": "test-node",
"location_tag": "test-loc",
"alert_cooldown_hours": 0.0,
"automation_failure_threshold": 3,
"check_interval": 60,
"check_interval_unavailable": 3600,
}
defaults.update(overrides)
return Settings(**defaults)
def _make_client(states=None, traces_by_id=None, states_error=None):
client = MagicMock()
if states_error:
client.get_states = AsyncMock(side_effect=states_error)
else:
client.get_states = AsyncMock(return_value=states or [])
traces_map = traces_by_id or {}
async def _get_traces(eid):
if eid not in traces_map:
raise Exception(f"404 for {eid}")
return traces_map[eid]
client.get_automation_traces = AsyncMock(side_effect=_get_traces)
return client
def _auto_state(entity_id: str, state: str = "on", friendly_name: str | None = None) -> dict:
attrs: dict = {}
if friendly_name:
attrs["friendly_name"] = friendly_name
return {"entity_id": entity_id, "state": state, "attributes": attrs}
def _trace(error: str | None = None, state: str = "stopped") -> dict:
return {
"run_id": "abc",
"timestamp": "2026-05-27T10:00:00+00:00",
"trigger": "state",
"state": state if error is None else "stopped",
"error": error,
}
def _fail(error: str = "Script error") -> dict:
return _trace(error=error)
def _ok() -> dict:
return _trace(error=None)
# ---------------------------------------------------------------------------
# _is_trace_failure unit tests
# ---------------------------------------------------------------------------
def test_trace_with_error_is_failure():
assert _is_trace_failure({"error": "Something went wrong"}) is True
def test_trace_with_state_failed_is_failure():
assert _is_trace_failure({"state": "failed", "error": None}) is True
def test_trace_with_null_error_is_success():
assert _is_trace_failure({"error": None, "state": "stopped"}) is False
def test_trace_with_empty_string_error_is_success():
assert _is_trace_failure({"error": "", "state": "stopped"}) is False
def test_trace_with_no_keys_is_success():
assert _is_trace_failure({}) is False
# ---------------------------------------------------------------------------
# AutomationFailuresCheck.run() tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_no_automations_returns_empty(storage: Storage):
check = AutomationFailuresCheck(_make_client([]), storage, _make_settings())
assert await check.run() == []
@pytest.mark.asyncio
async def test_disabled_automation_skipped(storage: Storage):
states = [_auto_state("automation.morning_lights", state="off")]
check = AutomationFailuresCheck(_make_client(states, {}), storage, _make_settings())
assert await check.run() == []
@pytest.mark.asyncio
async def test_automation_with_no_traces_skipped(storage: Storage):
states = [_auto_state("automation.morning_lights")]
# _make_client raises exception for missing keys → graceful skip
check = AutomationFailuresCheck(_make_client(states, {}), storage, _make_settings())
assert await check.run() == []
@pytest.mark.asyncio
async def test_fewer_traces_than_threshold_skipped(storage: Storage):
states = [_auto_state("automation.a")]
traces = {"automation.a": [_fail(), _fail()]} # 2 failures, threshold=3
check = AutomationFailuresCheck(_make_client(states, traces), storage, _make_settings())
assert await check.run() == []
@pytest.mark.asyncio
async def test_all_recent_failed_emits_event(storage: Storage):
states = [_auto_state("automation.a", friendly_name="Morning Lights")]
traces = {"automation.a": [_fail("step failed"), _fail("timeout"), _fail("no device")]}
check = AutomationFailuresCheck(_make_client(states, traces), storage, _make_settings())
results = await check.run()
assert len(results) == 1
r = results[0]
assert r.event_type == HAEventType.ha_automation_failing
assert r.severity == Severity.warning
assert r.payload["entity_id"] == "automation.a"
assert r.payload["friendly_name"] == "Morning Lights"
assert r.payload["total_recent_failures"] == 3
assert len(r.payload["last_failures"]) == 3
@pytest.mark.asyncio
async def test_partial_failures_no_event(storage: Storage):
states = [_auto_state("automation.a")]
# 2 failures, 1 success in recent 3 → not all failed
traces = {"automation.a": [_fail(), _ok(), _fail()]}
check = AutomationFailuresCheck(_make_client(states, traces), storage, _make_settings())
assert await check.run() == []
@pytest.mark.asyncio
async def test_cooldown_prevents_duplicate_event(storage: Storage):
states = [_auto_state("automation.a")]
traces = {"automation.a": [_fail(), _fail(), _fail()]}
settings = _make_settings(alert_cooldown_hours=6.0)
check = AutomationFailuresCheck(_make_client(states, traces), storage, settings)
r1 = await check.run()
r2 = await check.run()
assert len(r1) == 1
assert r2 == []
@pytest.mark.asyncio
async def test_multiple_failing_automations(storage: Storage):
states = [_auto_state("automation.a"), _auto_state("automation.b")]
traces = {
"automation.a": [_fail(), _fail(), _fail()],
"automation.b": [_fail(), _fail(), _fail()],
}
check = AutomationFailuresCheck(_make_client(states, traces), storage, _make_settings())
results = await check.run()
assert len(results) == 2
eids = {r.payload["entity_id"] for r in results}
assert eids == {"automation.a", "automation.b"}
@pytest.mark.asyncio
async def test_states_error_returns_empty(storage: Storage):
check = AutomationFailuresCheck(
_make_client(states_error=ConnectionError("down")), storage, _make_settings()
)
assert await check.run() == []
@pytest.mark.asyncio
async def test_custom_threshold(storage: Storage):
states = [_auto_state("automation.a")]
# threshold=2: 2 failures should trigger
traces = {"automation.a": [_fail(), _fail(), _ok()]}
settings = _make_settings(automation_failure_threshold=2)
check = AutomationFailuresCheck(_make_client(states, traces), storage, settings)
results = await check.run()
assert len(results) == 1
@pytest.mark.asyncio
async def test_failure_with_state_failed_field(storage: Storage):
states = [_auto_state("automation.a")]
traces = {"automation.a": [
{"run_id": "x", "state": "failed", "error": None, "timestamp": "2026-05-27T10:00:00Z"},
{"run_id": "y", "state": "failed", "error": None, "timestamp": "2026-05-27T09:00:00Z"},
{"run_id": "z", "state": "failed", "error": None, "timestamp": "2026-05-27T08:00:00Z"},
]}
check = AutomationFailuresCheck(_make_client(states, traces), storage, _make_settings())
results = await check.run()
assert len(results) == 1

View file

@ -78,3 +78,58 @@ async def test_make_session_sets_auth_header():
await client.get_api_status()
# Verify the Authorization header was sent
assert session.headers.get("Authorization") == "Bearer my-secret-token"
# ---------------------------------------------------------------------------
# Entity registry TTL cache (Phase 3 Flag #3)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_entity_registry_cached_on_second_call():
"""Second call within TTL returns cache, making only one HTTP request."""
payload = [{"entity_id": "light.hall", "platform": "zha", "area_id": "hallway"}]
with aioresponses() as m:
m.get(f"{HA_URL}/api/config/entity_registry", payload=payload)
async with make_session(TOKEN) as session:
client = HAClient(HA_URL, session, entity_registry_cache_ttl=300.0)
r1 = await client.get_entity_registry()
r2 = await client.get_entity_registry() # from cache — no second HTTP call
assert r1 == r2
# aioresponses would raise ConnectionError on the unmocked second request
# if caching weren't working; reaching here means it used the cache.
@pytest.mark.asyncio
async def test_entity_registry_cache_bypassed_after_ttl(monkeypatch):
"""After TTL expiry, next call fetches fresh data."""
import time
payload = [{"entity_id": "light.hall", "platform": "zha", "area_id": "hallway"}]
with aioresponses() as m:
m.get(f"{HA_URL}/api/config/entity_registry", payload=payload)
m.get(f"{HA_URL}/api/config/entity_registry", payload=payload)
async with make_session(TOKEN) as session:
client = HAClient(HA_URL, session, entity_registry_cache_ttl=0.0)
await client.get_entity_registry() # fetches
await client.get_entity_registry() # TTL=0 → fetches again
@pytest.mark.asyncio
async def test_invalidate_registry_cache_forces_refetch():
"""invalidate_registry_cache() makes the next call hit the network."""
payload = [{"entity_id": "light.hall", "platform": "zha", "area_id": ""}]
with aioresponses() as m:
m.get(f"{HA_URL}/api/config/entity_registry", payload=payload)
m.get(f"{HA_URL}/api/config/entity_registry", payload=payload)
async with make_session(TOKEN) as session:
client = HAClient(HA_URL, session, entity_registry_cache_ttl=300.0)
await client.get_entity_registry()
client.invalidate_registry_cache()
await client.get_entity_registry() # must hit network again
@pytest.mark.asyncio
async def test_entity_registry_cache_default_ttl_is_300():
async with make_session(TOKEN) as session:
client = HAClient(HA_URL, session)
assert client._registry_cache_ttl == 300.0

View file

@ -0,0 +1,221 @@
"""Unit tests for SystemHealthCheck."""
from __future__ import annotations
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
from ha_diag.checks.system_health import SystemHealthCheck, _extract_component_statuses
from ha_diag.config import Settings
from ha_diag.models import HAEventType, Severity
from ha_diag.storage import Storage
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_settings(**overrides) -> Settings:
defaults: dict = {
"ha_url": "http://test.local:8123",
"ha_token": "test",
"node_name": "test-node",
"location_tag": "test-loc",
"alert_cooldown_hours": 0.0,
"check_interval": 60,
"check_interval_unavailable": 3600,
}
defaults.update(overrides)
return Settings(**defaults)
def _make_client(health=None, error=None):
client = MagicMock()
if error:
client.get_system_health = AsyncMock(side_effect=error)
else:
client.get_system_health = AsyncMock(return_value=health or {})
return client
def _ok_response(*components: str) -> dict:
return {c: {"type": "result", "data": {"ok": True}} for c in components}
def _error_response(*components: str) -> dict:
return {c: {"type": "error", "error": f"{c} failed"} for c in components}
# ---------------------------------------------------------------------------
# _extract_component_statuses unit tests
# ---------------------------------------------------------------------------
def test_extract_typed_result_format():
data = {"recorder": {"type": "result", "data": {"backlog": 0}}}
result = _extract_component_statuses(data)
assert result["recorder"]["status"] == "ok"
assert result["recorder"]["details"] == {"backlog": 0}
def test_extract_typed_error_format():
data = {"cloud": {"type": "error", "error": "Connection refused"}}
result = _extract_component_statuses(data)
assert result["cloud"]["status"] == "error"
assert "Connection refused" in result["cloud"]["details"]["error"]
def test_extract_legacy_error_field():
data = {"cloud": {"error": "Timeout"}}
result = _extract_component_statuses(data)
assert result["cloud"]["status"] == "error"
def test_extract_nested_checks_format():
data = {
"info": {"version": "2024.12.0"},
"checks": {
"homeassistant": {"type": "result", "data": {}},
"recorder": {"type": "error", "error": "DB locked"},
},
}
result = _extract_component_statuses(data)
assert "homeassistant" not in result or result.get("homeassistant", {}).get("status") == "ok"
assert result["recorder"]["status"] == "error"
assert "info" not in result
def test_extract_plain_dict_treated_as_ok():
data = {"homeassistant": {"version": "2024.12.0", "docker": True}}
result = _extract_component_statuses(data)
assert result["homeassistant"]["status"] == "ok"
def test_extract_non_dict_value_skipped():
data = {"scalar_component": "just-a-string"}
result = _extract_component_statuses(data)
assert "scalar_component" not in result
# ---------------------------------------------------------------------------
# SystemHealthCheck run() tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_first_run_no_snapshot_no_event_for_ok(storage: Storage):
"""All components ok on first run — record snapshots, emit nothing."""
check = SystemHealthCheck(_make_client(_ok_response("homeassistant", "recorder")),
storage, _make_settings())
results = await check.run()
assert results == []
snap = await storage.get_system_health_snapshot("homeassistant")
assert snap is not None
assert snap["last_status"] == "ok"
@pytest.mark.asyncio
async def test_first_run_error_component_emits_event(storage: Storage):
"""Component in error on first run (no prior snapshot) → ha_system_health_degraded."""
check = SystemHealthCheck(_make_client(_error_response("cloud")), storage, _make_settings())
results = await check.run()
assert len(results) == 1
r = results[0]
assert r.event_type == HAEventType.ha_system_health_degraded
assert r.payload["component"] == "cloud"
assert r.payload["previous_status"] == "unknown"
assert r.payload["current_status"] == "error"
assert r.severity == Severity.warning
@pytest.mark.asyncio
async def test_ok_to_error_transition_emits_event(storage: Storage):
"""Component transitions ok → error → event fired."""
client_ok = _make_client(_ok_response("cloud"))
client_err = _make_client(_error_response("cloud"))
settings = _make_settings()
await SystemHealthCheck(client_ok, storage, settings).run()
results = await SystemHealthCheck(client_err, storage, settings).run()
assert len(results) == 1
assert results[0].payload["previous_status"] == "ok"
assert results[0].payload["current_status"] == "error"
@pytest.mark.asyncio
async def test_sustained_error_no_duplicate_event(storage: Storage):
"""Component stays in error across multiple runs — only first run emits."""
client_ok = _make_client(_ok_response("cloud"))
client_err = _make_client(_error_response("cloud"))
settings = _make_settings()
await SystemHealthCheck(client_ok, storage, settings).run()
results1 = await SystemHealthCheck(client_err, storage, settings).run()
results2 = await SystemHealthCheck(client_err, storage, settings).run()
results3 = await SystemHealthCheck(client_err, storage, settings).run()
assert len(results1) == 1 # transition fires
assert results2 == []
assert results3 == []
@pytest.mark.asyncio
async def test_recovery_clears_alert_and_next_degradation_re_fires(storage: Storage):
"""error → ok → error: second degradation fires a new event."""
settings = _make_settings()
# First degradation
await SystemHealthCheck(_make_client(_ok_response("cloud")), storage, settings).run()
r1 = await SystemHealthCheck(_make_client(_error_response("cloud")), storage, settings).run()
assert len(r1) == 1
# Recovery
r2 = await SystemHealthCheck(_make_client(_ok_response("cloud")), storage, settings).run()
assert r2 == []
# Second degradation
r3 = await SystemHealthCheck(_make_client(_error_response("cloud")), storage, settings).run()
assert len(r3) == 1
assert r3[0].payload["previous_status"] == "ok"
@pytest.mark.asyncio
async def test_multiple_degraded_components_multiple_events(storage: Storage):
health = {**_error_response("cloud", "recorder"), **_ok_response("homeassistant")}
check = SystemHealthCheck(_make_client(health), storage, _make_settings())
results = await check.run()
components = {r.payload["component"] for r in results}
assert components == {"cloud", "recorder"}
assert all(r.event_type == HAEventType.ha_system_health_degraded for r in results)
@pytest.mark.asyncio
async def test_api_error_returns_empty(storage: Storage):
"""If /api/system_health is unreachable, return no results (not an error event)."""
check = SystemHealthCheck(
_make_client(error=Exception("timeout")), storage, _make_settings()
)
results = await check.run()
assert results == []
@pytest.mark.asyncio
async def test_payload_contains_details(storage: Storage):
health = {"recorder": {"type": "error", "error": "DB write lag 5000ms"}}
check = SystemHealthCheck(_make_client(health), storage, _make_settings())
results = await check.run()
assert len(results) == 1
assert "DB write lag" in results[0].payload["details"]["error"]
@pytest.mark.asyncio
async def test_snapshot_updated_after_recovery(storage: Storage):
"""After a recovery cycle, snapshot shows last_status='ok'."""
settings = _make_settings()
await SystemHealthCheck(_make_client(_error_response("cloud")), storage, settings).run()
await SystemHealthCheck(_make_client(_ok_response("cloud")), storage, settings).run()
snap = await storage.get_system_health_snapshot("cloud")
assert snap["last_status"] == "ok"

View file

@ -407,3 +407,87 @@ async def test_area_omitted_when_unknown(storage: Storage):
)
results = await check.run()
assert "area" not in results[0].payload
# ---------------------------------------------------------------------------
# Phase 3 Flag #1: since = min(last_changed, first_seen)
# ---------------------------------------------------------------------------
def _make_state_with_last_changed(
entity_id: str, state: str, last_changed_iso: str
) -> dict:
return {
"entity_id": entity_id,
"state": state,
"attributes": {},
"last_changed": last_changed_iso,
}
@pytest.mark.asyncio
async def test_since_uses_last_changed_when_earlier_than_baseline(storage: Storage):
"""Entity's last_changed predates our baseline → duration computed from last_changed."""
import datetime as dt
now = time.time()
# Baseline recorded 1h ago (agent just started)
await storage.set_entity_unavailable_since("light.k", "unavailable", now - 3600)
# HA says entity changed to unavailable 48h ago
lc_iso = (
dt.datetime.fromtimestamp(now - 48 * 3600, tz=dt.timezone.utc)
.isoformat()
.replace("+00:00", "Z")
)
states = [_make_state_with_last_changed("light.k", "unavailable", lc_iso)]
check = UnavailableEntitiesCheck(
_make_client(states), storage, _make_settings(unavailable_threshold_hours=0.0)
)
results = await check.run()
assert len(results) == 1
# Duration should be ~48h, not ~1h
assert results[0].payload["duration_hours"] == pytest.approx(48.0, abs=0.1)
@pytest.mark.asyncio
async def test_since_ignores_last_changed_when_later_than_baseline(storage: Storage):
"""Baseline predates last_changed → use baseline (entity was unavailable before
last_changed, e.g. if HA reports last_changed as now for some reason)."""
import datetime as dt
now = time.time()
# Baseline recorded 48h ago
await storage.set_entity_unavailable_since("light.k", "unavailable", now - 48 * 3600)
# HA says last_changed is only 2h ago (shouldn't override the older baseline)
lc_iso = (
dt.datetime.fromtimestamp(now - 2 * 3600, tz=dt.timezone.utc)
.isoformat()
.replace("+00:00", "Z")
)
states = [_make_state_with_last_changed("light.k", "unavailable", lc_iso)]
check = UnavailableEntitiesCheck(
_make_client(states), storage, _make_settings(unavailable_threshold_hours=0.0)
)
results = await check.run()
assert len(results) == 1
# Duration should be ~48h (from baseline), not ~2h
assert results[0].payload["duration_hours"] == pytest.approx(48.0, abs=0.1)
@pytest.mark.asyncio
async def test_since_falls_back_gracefully_when_last_changed_missing(storage: Storage):
"""No last_changed in state → uses baseline first_seen without error."""
await storage.set_entity_unavailable_since(
"light.k", "unavailable", time.time() - 25 * 3600
)
states = [_make_state("light.k", "unavailable")] # no last_changed key
check = UnavailableEntitiesCheck(
_make_client(states), storage, _make_settings(unavailable_threshold_hours=0.0)
)
results = await check.run()
assert len(results) == 1
assert results[0].event_type == HAEventType.ha_entity_unavailable_long

View file

@ -0,0 +1,256 @@
"""Unit tests for UpdatesAvailableCheck and UpdatesDigestCheck."""
from __future__ import annotations
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
from ha_diag.checks.updates_available import (
UpdatesAvailableCheck,
UpdatesDigestCheck,
_build_update_payload,
)
from ha_diag.config import Settings
from ha_diag.models import HAEventType
from ha_diag.storage import Storage
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_settings(**overrides) -> Settings:
defaults: dict = {
"ha_url": "http://test.local:8123",
"ha_token": "test",
"node_name": "test-node",
"location_tag": "test-loc",
"alert_cooldown_hours": 0.0,
"updates_cooldown_days": 0, # no dedup in most tests
"check_interval": 60,
"check_interval_unavailable": 3600,
}
defaults.update(overrides)
return Settings(**defaults)
def _make_client(states=None, error=None):
client = MagicMock()
if error:
client.get_states = AsyncMock(side_effect=error)
else:
client.get_states = AsyncMock(return_value=states or [])
return client
def _update_state(
entity_id: str = "update.homeassistant_core",
state: str = "on",
title: str = "Home Assistant Core",
installed: str = "2025.5.0",
latest: str = "2025.6.0",
release_summary: str | None = None,
release_url: str | None = None,
) -> dict:
attrs: dict = {
"title": title,
"installed_version": installed,
"latest_version": latest,
"in_progress": False,
"auto_update": False,
}
if release_summary:
attrs["release_summary"] = release_summary
if release_url:
attrs["release_url"] = release_url
return {"entity_id": entity_id, "state": state, "attributes": attrs}
# ---------------------------------------------------------------------------
# _build_update_payload helper
# ---------------------------------------------------------------------------
def test_build_update_payload_basic():
attrs = {"title": "HA Core", "installed_version": "1.0", "latest_version": "2.0"}
p = _build_update_payload("update.ha_core", attrs)
assert p["entity_id"] == "update.ha_core"
assert p["title"] == "HA Core"
assert p["installed_version"] == "1.0"
assert p["latest_version"] == "2.0"
def test_build_update_payload_release_summary_truncated():
long_notes = "x" * 3000
attrs = {"release_summary": long_notes}
p = _build_update_payload("update.ha_core", attrs)
assert len(p["release_summary"]) == 2000
def test_build_update_payload_release_url_omitted_when_absent():
p = _build_update_payload("update.ha_core", {})
assert "release_url" not in p
def test_build_update_payload_release_url_included_when_present():
attrs = {"release_url": "https://github.com/..."}
p = _build_update_payload("update.x", attrs)
assert p["release_url"] == "https://github.com/..."
# ---------------------------------------------------------------------------
# UpdatesAvailableCheck (daily individual events)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_no_updates_returns_empty(storage: Storage):
states = [{"entity_id": "light.living_room", "state": "on", "attributes": {}}]
check = UpdatesAvailableCheck(_make_client(states), storage, _make_settings())
assert await check.run() == []
@pytest.mark.asyncio
async def test_update_off_state_not_emitted(storage: Storage):
states = [_update_state(state="off")]
check = UpdatesAvailableCheck(_make_client(states), storage, _make_settings())
assert await check.run() == []
@pytest.mark.asyncio
async def test_single_update_emits_event(storage: Storage):
states = [_update_state()]
check = UpdatesAvailableCheck(_make_client(states), storage, _make_settings())
results = await check.run()
assert len(results) == 1
assert results[0].event_type == HAEventType.ha_update_available
assert "2025.5.0" in results[0].message
assert "2025.6.0" in results[0].message
@pytest.mark.asyncio
async def test_multiple_updates_emit_multiple_events(storage: Storage):
states = [
_update_state("update.ha_core"),
_update_state("update.mosquitto", title="Mosquitto"),
]
check = UpdatesAvailableCheck(_make_client(states), storage, _make_settings())
results = await check.run()
assert len(results) == 2
assert all(r.event_type == HAEventType.ha_update_available for r in results)
@pytest.mark.asyncio
async def test_cooldown_prevents_same_update_next_day(storage: Storage):
states = [_update_state()]
settings = _make_settings(updates_cooldown_days=7)
check = UpdatesAvailableCheck(_make_client(states), storage, settings)
r1 = await check.run()
r2 = await check.run()
assert len(r1) == 1
assert r2 == []
@pytest.mark.asyncio
async def test_no_cooldown_allows_repeat(storage: Storage):
states = [_update_state()]
check = UpdatesAvailableCheck(_make_client(states), storage, _make_settings(updates_cooldown_days=0))
r1 = await check.run()
r2 = await check.run()
assert len(r1) == 1
assert len(r2) == 1
@pytest.mark.asyncio
async def test_payload_contains_version_fields(storage: Storage):
states = [_update_state(installed="2025.5.0", latest="2025.6.0")]
check = UpdatesAvailableCheck(_make_client(states), storage, _make_settings())
results = await check.run()
p = results[0].payload
assert p["installed_version"] == "2025.5.0"
assert p["latest_version"] == "2025.6.0"
assert p["in_progress"] is False
@pytest.mark.asyncio
async def test_ha_error_returns_empty(storage: Storage):
check = UpdatesAvailableCheck(
_make_client(error=ConnectionError("HA down")), storage, _make_settings()
)
assert await check.run() == []
# ---------------------------------------------------------------------------
# UpdatesDigestCheck (Sunday digest)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_digest_no_updates_returns_empty(storage: Storage):
check = UpdatesDigestCheck(_make_client([]), storage, _make_settings())
assert await check.run() == []
@pytest.mark.asyncio
async def test_digest_emits_single_event_for_all_updates(storage: Storage):
states = [
_update_state("update.ha_core"),
_update_state("update.mosquitto", title="Mosquitto"),
_update_state("update.esphome", title="ESPHome"),
]
check = UpdatesDigestCheck(_make_client(states), storage, _make_settings())
results = await check.run()
assert len(results) == 1
p = results[0].payload
assert p["digest"] is True
assert p["count"] == 3
assert len(p["updates"]) == 3
@pytest.mark.asyncio
async def test_digest_payload_has_digest_true(storage: Storage):
states = [_update_state()]
check = UpdatesDigestCheck(_make_client(states), storage, _make_settings())
results = await check.run()
assert results[0].payload["digest"] is True
@pytest.mark.asyncio
async def test_digest_weekly_dedup_prevents_same_week_refiring(storage: Storage):
states = [_update_state()]
check = UpdatesDigestCheck(_make_client(states), storage, _make_settings())
r1 = await check.run()
r2 = await check.run()
assert len(r1) == 1
assert r2 == []
@pytest.mark.asyncio
async def test_digest_fires_independently_of_daily_dedup(storage: Storage):
"""Daily cooldown on entity X doesn't suppress Sunday digest."""
states = [_update_state()]
settings = _make_settings(updates_cooldown_days=7)
# Daily check marks alert_key="update_available:update.homeassistant_core"
daily = UpdatesAvailableCheck(_make_client(states), storage, settings)
await daily.run()
# Digest uses different key "update_digest:{week}" — should still fire
digest = UpdatesDigestCheck(_make_client(states), storage, settings)
r = await digest.run()
assert len(r) == 1
assert r[0].payload["digest"] is True
@pytest.mark.asyncio
async def test_digest_name_is_updates_digest(storage: Storage):
check = UpdatesDigestCheck(_make_client([]), storage, _make_settings())
assert check.name == "updates_digest"
@pytest.mark.asyncio
async def test_daily_check_name_is_updates_available(storage: Storage):
check = UpdatesAvailableCheck(_make_client([]), storage, _make_settings())
assert check.name == "updates_available"