New checks: - SystemHealthCheck (15min interval): detects newly-failing HA integrations via /api/system_health snapshot diff; transition-based dedup (ok→error fires, sustained error silent, error→ok clears alert) - UpdatesAvailableCheck (daily cron 09:00): per-update ha_update_available events with 7-day dedup; release notes truncated at 2000 chars - UpdatesDigestCheck (Sunday cron 09:00): single digest event with all pending updates; weekly ISO-week dedup, independent of daily dedup key - AutomationFailuresCheck (30min interval): detects automations with N consecutive failures (default 3) via /api/trace/automation/<id>; 6h cooldown per automation Phase 3 flag fixes: - Flag #1 (since field): UnavailableEntitiesCheck now uses min(state.last_changed, baseline.first_seen) as effective "since", giving accurate duration when agent was offline at entity's first fail - Flag #3 (registry cache): HAClient.get_entity_registry() caches response in-process with configurable TTL (default 300s); avoids repeated API calls across concurrent check cycles; invalidate_registry_cache() for manual invalidation Storage: system_health_snapshot table (component, last_status, last_seen_at, payload) created automatically on next Storage.open() call Config additions (all with defaults): entity_registry_cache_ttl=300, system_health_check_interval=900, automation_check_interval=1800, automation_failure_threshold=3, updates_check_hour=9, updates_check_minute=0, updates_cooldown_days=7 Tests: 95 unit tests pass (49 new), 13 integration tests pass (9 new); 3 skipped (live-HA token not set in CI) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
168 lines
5 KiB
Python
168 lines
5 KiB
Python
"""Integration tests for AutomationFailuresCheck.
|
|
|
|
Uses real aiosqlite Storage + EventEmitter + mocked HTTP.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from pathlib import Path
|
|
from typing import AsyncGenerator
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
from aioresponses import aioresponses
|
|
|
|
from ha_diag.checks.automation_failures import AutomationFailuresCheck
|
|
from ha_diag.config import Settings
|
|
from ha_diag.event_emitter import EventEmitter
|
|
from ha_diag.ha_client import HAClient, make_session
|
|
from ha_diag.models import HAEventType
|
|
from ha_diag.storage import Storage
|
|
|
|
HA_URL = "http://ha-test-ken:8123"
|
|
|
|
|
|
def _settings(**overrides) -> Settings:
|
|
defaults: dict = {
|
|
"ha_url": HA_URL,
|
|
"ha_token": "test-token",
|
|
"node_name": "piha",
|
|
"location_tag": "ken",
|
|
"alert_cooldown_hours": 0.0,
|
|
"automation_failure_threshold": 3,
|
|
"check_interval": 60,
|
|
"check_interval_unavailable": 3600,
|
|
}
|
|
defaults.update(overrides)
|
|
return Settings(**defaults)
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def storage(tmp_path: Path) -> AsyncGenerator[Storage, None]:
|
|
s = Storage(tmp_path / "integration_test.db")
|
|
await s.open()
|
|
yield s
|
|
await s.close()
|
|
|
|
|
|
@pytest.fixture
|
|
def events_dir(tmp_path: Path) -> Path:
|
|
d = tmp_path / "events"
|
|
d.mkdir()
|
|
return d
|
|
|
|
|
|
def _auto_states(*entity_ids: str) -> list[dict]:
|
|
return [
|
|
{
|
|
"entity_id": eid,
|
|
"state": "on",
|
|
"attributes": {"friendly_name": eid.split(".")[-1].replace("_", " ").title()},
|
|
}
|
|
for eid in entity_ids
|
|
]
|
|
|
|
|
|
def _fail_traces(n: int = 3) -> list[dict]:
|
|
return [
|
|
{
|
|
"run_id": f"run-{i}",
|
|
"timestamp": f"2026-05-27T{10+i:02d}:00:00+00:00",
|
|
"trigger": "state",
|
|
"state": "stopped",
|
|
"error": f"Script error #{i}",
|
|
}
|
|
for i in range(n)
|
|
]
|
|
|
|
|
|
def _ok_traces(n: int = 3) -> list[dict]:
|
|
return [
|
|
{
|
|
"run_id": f"run-{i}",
|
|
"timestamp": f"2026-05-27T{10+i:02d}:00:00+00:00",
|
|
"trigger": "state",
|
|
"state": "stopped",
|
|
"error": None,
|
|
}
|
|
for i in range(n)
|
|
]
|
|
|
|
|
|
@pytest.mark.integration
|
|
async def test_failing_automation_emits_event_and_writes_file(
|
|
storage: Storage, events_dir: Path
|
|
):
|
|
"""3 consecutive failures → event file written with correct structure."""
|
|
states = _auto_states("automation.morning_lights")
|
|
traces = _fail_traces(3)
|
|
emitter = EventEmitter(events_dir, node_name="piha", location_tag="ken")
|
|
|
|
with aioresponses() as m:
|
|
m.get(f"{HA_URL}/api/states", payload=states)
|
|
m.get(f"{HA_URL}/api/trace/automation/automation.morning_lights", payload=traces)
|
|
async with make_session("test-token") as session:
|
|
client = HAClient(HA_URL, session)
|
|
check = AutomationFailuresCheck(client, storage, _settings())
|
|
results = await check.run()
|
|
|
|
assert len(results) == 1
|
|
r = results[0]
|
|
assert r.event_type == HAEventType.ha_automation_failing
|
|
assert r.payload["entity_id"] == "automation.morning_lights"
|
|
assert r.payload["total_recent_failures"] == 3
|
|
|
|
emitter.emit(
|
|
event_type=r.event_type,
|
|
severity=r.severity.value,
|
|
service="homeassistant",
|
|
message=r.message,
|
|
payload=r.payload,
|
|
)
|
|
|
|
files = list(events_dir.glob("*.json"))
|
|
assert len(files) == 1
|
|
data = json.loads(files[0].read_text())
|
|
assert data["type"] == "ha_automation_failing"
|
|
assert data["payload"]["location_tag"] == "ken"
|
|
assert "last_failures" in data["payload"]
|
|
|
|
|
|
@pytest.mark.integration
|
|
async def test_healthy_automation_no_event(storage: Storage):
|
|
"""All recent runs successful → no event."""
|
|
states = _auto_states("automation.morning_lights")
|
|
traces = _ok_traces(3)
|
|
|
|
with aioresponses() as m:
|
|
m.get(f"{HA_URL}/api/states", payload=states)
|
|
m.get(f"{HA_URL}/api/trace/automation/automation.morning_lights", payload=traces)
|
|
async with make_session("test-token") as session:
|
|
client = HAClient(HA_URL, session)
|
|
check = AutomationFailuresCheck(client, storage, _settings())
|
|
results = await check.run()
|
|
|
|
assert results == []
|
|
|
|
|
|
@pytest.mark.integration
|
|
async def test_cooldown_suppresses_duplicate(storage: Storage):
|
|
"""Second run within cooldown window → no duplicate event."""
|
|
states = _auto_states("automation.morning_lights")
|
|
traces = _fail_traces(3)
|
|
settings = _settings(alert_cooldown_hours=6.0)
|
|
|
|
for _ in range(2):
|
|
with aioresponses() as m:
|
|
m.get(f"{HA_URL}/api/states", payload=states)
|
|
m.get(f"{HA_URL}/api/trace/automation/automation.morning_lights", payload=traces)
|
|
async with make_session("test-token") as session:
|
|
check = AutomationFailuresCheck(
|
|
HAClient(HA_URL, session), storage, settings
|
|
)
|
|
results = await check.run()
|
|
if _ == 0:
|
|
assert len(results) == 1
|
|
else:
|
|
assert results == []
|