homelab-codex-ws/services/ha-diag-agent/tests/test_unavailable_entities.py

494 lines
18 KiB
Python
Raw Permalink Normal View History

"""Unit tests for UnavailableEntitiesCheck."""
from __future__ import annotations
import time
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
from ha_diag.checks.unavailable_entities import UnavailableEntitiesCheck
from ha_diag.config import Settings
from ha_diag.models import HAEventType
from ha_diag.storage import Storage
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_settings(**overrides) -> Settings:
"""Settings with safe test defaults (alert immediately, no cooldown)."""
defaults: dict = {
"ha_url": "http://test.local:8123",
"ha_token": "test",
"node_name": "test-node",
"location_tag": "test-loc",
"unavailable_threshold_hours": 0.0, # alert immediately
"integration_failure_threshold_pct": 0.5,
"integration_failure_min_entities": 3,
"alert_cooldown_hours": 0.0, # no dedup window in most tests
"check_interval": 60,
"check_interval_unavailable": 3600,
}
defaults.update(overrides)
return Settings(**defaults)
def _make_state(entity_id: str, state: str = "on") -> dict:
return {"entity_id": entity_id, "state": state, "attributes": {}}
def _make_registry_entry(entity_id: str, platform: str, area_id: str = "") -> dict:
return {"entity_id": entity_id, "platform": platform, "area_id": area_id}
def _make_client(states=None, registry=None, states_error=None):
client = MagicMock()
if states_error:
client.get_states = AsyncMock(side_effect=states_error)
else:
client.get_states = AsyncMock(return_value=states or [])
client.get_entity_registry = AsyncMock(return_value=registry or [])
return client
# ---------------------------------------------------------------------------
# Basic unavailability detection
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_no_unavailable_entities_returns_empty(storage: Storage):
states = [_make_state("light.a", "on"), _make_state("sensor.b", "off")]
check = UnavailableEntitiesCheck(_make_client(states), storage, _make_settings())
assert await check.run() == []
@pytest.mark.asyncio
async def test_first_cycle_records_baseline_no_event(storage: Storage):
"""First observation of unavailable entity: record, don't alert yet."""
states = [_make_state("light.kitchen", "unavailable")]
settings = _make_settings(unavailable_threshold_hours=1.0) # needs 1h before alert
check = UnavailableEntitiesCheck(_make_client(states), storage, settings)
results = await check.run()
assert results == []
# Baseline should be recorded
first_at = await storage.get_entity_first_unavailable_at("light.kitchen")
assert first_at is not None
@pytest.mark.asyncio
async def test_unavailable_below_threshold_no_event(storage: Storage):
states = [_make_state("light.kitchen", "unavailable")]
settings = _make_settings(unavailable_threshold_hours=24.0)
check = UnavailableEntitiesCheck(_make_client(states), storage, settings)
# Seed the baseline as if entity just became unavailable
await storage.set_entity_unavailable_since("light.kitchen", "unavailable", time.time())
results = await check.run()
assert results == []
@pytest.mark.asyncio
async def test_unavailable_above_threshold_emits_event(storage: Storage):
states = [_make_state("light.kitchen", "unavailable")]
check = UnavailableEntitiesCheck(
_make_client(states), storage, _make_settings()
)
# Seed baseline as if 25h ago
await storage.set_entity_unavailable_since(
"light.kitchen", "unavailable", time.time() - 25 * 3600
)
results = await check.run()
assert len(results) == 1
assert results[0].event_type == HAEventType.ha_entity_unavailable_long
assert results[0].payload["entity_id"] == "light.kitchen"
assert results[0].payload["duration_hours"] == pytest.approx(25.0, abs=0.1)
assert results[0].payload["domain"] == "light"
@pytest.mark.asyncio
async def test_unknown_state_treated_as_unavailable(storage: Storage):
states = [_make_state("sensor.temp", "unknown")]
await storage.set_entity_unavailable_since(
"sensor.temp", "unknown", time.time() - 25 * 3600
)
check = UnavailableEntitiesCheck(
_make_client(states), storage, _make_settings()
)
results = await check.run()
assert len(results) == 1
assert results[0].payload["state"] == "unknown"
@pytest.mark.asyncio
async def test_payload_contains_since_timestamp(storage: Storage):
first_at = time.time() - 27 * 3600
await storage.set_entity_unavailable_since("light.k", "unavailable", first_at)
states = [_make_state("light.k", "unavailable")]
check = UnavailableEntitiesCheck(
_make_client(states), storage, _make_settings()
)
results = await check.run()
assert len(results) == 1
assert "since" in results[0].payload
assert "Z" in results[0].payload["since"] # ISO UTC timestamp
# ---------------------------------------------------------------------------
# Recovery
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_recovery_clears_baseline(storage: Storage):
await storage.set_entity_unavailable_since("light.k", "unavailable", time.time())
# Entity is now back online
states = [_make_state("light.k", "on")]
check = UnavailableEntitiesCheck(
_make_client(states), storage, _make_settings()
)
await check.run()
assert await storage.get_entity_first_unavailable_at("light.k") is None
@pytest.mark.asyncio
async def test_recovery_clears_alert_dedup(storage: Storage):
await storage.set_entity_unavailable_since(
"light.k", "unavailable", time.time() - 25 * 3600
)
await storage.mark_alert_sent("entity_unavailable:light.k")
# Entity recovers
states = [_make_state("light.k", "on")]
check = UnavailableEntitiesCheck(
_make_client(states), storage, _make_settings()
)
await check.run()
# Alert dedup should be gone
assert not await storage.was_alert_sent("entity_unavailable:light.k", 9999)
# ---------------------------------------------------------------------------
# Alert cooldown / deduplication
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_cooldown_prevents_duplicate_event(storage: Storage):
await storage.set_entity_unavailable_since(
"light.k", "unavailable", time.time() - 25 * 3600
)
settings = _make_settings(alert_cooldown_hours=6.0)
states = [_make_state("light.k", "unavailable")]
check = UnavailableEntitiesCheck(_make_client(states), storage, settings)
results1 = await check.run()
assert len(results1) == 1 # first alert fires
results2 = await check.run()
assert results2 == [] # cooldown active
@pytest.mark.asyncio
async def test_no_cooldown_allows_repeat_event(storage: Storage):
await storage.set_entity_unavailable_since(
"light.k", "unavailable", time.time() - 25 * 3600
)
settings = _make_settings(alert_cooldown_hours=0.0)
states = [_make_state("light.k", "unavailable")]
check = UnavailableEntitiesCheck(_make_client(states), storage, settings)
results1 = await check.run()
results2 = await check.run()
assert len(results1) == 1
assert len(results2) == 1
# ---------------------------------------------------------------------------
# Integration root-cause grouping
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_integration_failure_emits_single_event(storage: Storage):
"""5/8 entities from zha unavailable → ha_integration_failed, not 5 entity events."""
zha_entities = [f"light.zha_{i}" for i in range(8)]
states = [
_make_state(eid, "unavailable" if i < 5 else "on")
for i, eid in enumerate(zha_entities)
]
registry = [_make_registry_entry(eid, "zha") for eid in zha_entities]
# Seed baselines for unavailable entities as 25h ago
for eid in zha_entities[:5]:
await storage.set_entity_unavailable_since(eid, "unavailable", time.time() - 25 * 3600)
settings = _make_settings(
integration_failure_threshold_pct=0.5,
integration_failure_min_entities=3,
)
check = UnavailableEntitiesCheck(
_make_client(states, registry), storage, settings
)
results = await check.run()
assert len(results) == 1
assert results[0].event_type == HAEventType.ha_integration_failed
assert results[0].payload["integration"] == "zha"
assert results[0].payload["unavailable_count"] == 5
assert results[0].payload["total_count"] == 8
assert set(results[0].payload["affected_entities"]) == set(zha_entities[:5])
@pytest.mark.asyncio
async def test_integration_failure_below_pct_threshold(storage: Storage):
"""2/8 entities from zha unavailable (25%) → per-entity events, not integration event."""
zha_entities = [f"light.zha_{i}" for i in range(8)]
states = [
_make_state(eid, "unavailable" if i < 2 else "on")
for i, eid in enumerate(zha_entities)
]
registry = [_make_registry_entry(eid, "zha") for eid in zha_entities]
for eid in zha_entities[:2]:
await storage.set_entity_unavailable_since(eid, "unavailable", time.time() - 25 * 3600)
settings = _make_settings(
integration_failure_threshold_pct=0.5,
integration_failure_min_entities=3,
)
check = UnavailableEntitiesCheck(
_make_client(states, registry), storage, settings
)
results = await check.run()
# Below count threshold (2 < 3) so individual events
assert all(r.event_type == HAEventType.ha_entity_unavailable_long for r in results)
assert len(results) == 2
@pytest.mark.asyncio
async def test_integration_failure_below_count_threshold(storage: Storage):
"""3/6 entities unavailable (50%) but min_entities=5 → per-entity events."""
zha_entities = [f"light.zha_{i}" for i in range(6)]
states = [
_make_state(eid, "unavailable" if i < 3 else "on")
for i, eid in enumerate(zha_entities)
]
registry = [_make_registry_entry(eid, "zha") for eid in zha_entities]
for eid in zha_entities[:3]:
await storage.set_entity_unavailable_since(eid, "unavailable", time.time() - 25 * 3600)
settings = _make_settings(
integration_failure_threshold_pct=0.5,
integration_failure_min_entities=5, # need 5, only have 3
)
check = UnavailableEntitiesCheck(
_make_client(states, registry), storage, settings
)
results = await check.run()
assert all(r.event_type == HAEventType.ha_entity_unavailable_long for r in results)
@pytest.mark.asyncio
async def test_entity_without_integration_gets_individual_event(storage: Storage):
"""Entity not in entity registry gets per-entity event regardless of integration grouping."""
await storage.set_entity_unavailable_since(
"light.mystery", "unavailable", time.time() - 25 * 3600
)
states = [_make_state("light.mystery", "unavailable")]
# Empty registry — no integration info
check = UnavailableEntitiesCheck(
_make_client(states, []), storage, _make_settings()
)
results = await check.run()
assert len(results) == 1
assert results[0].event_type == HAEventType.ha_entity_unavailable_long
assert "integration" not in results[0].payload
@pytest.mark.asyncio
async def test_mixed_integrations_correctly_partitioned(storage: Storage):
"""5 zha entities unavailable (triggers integration event) + 1 mqtt entity (individual)."""
zha_entities = [f"light.zha_{i}" for i in range(8)]
mqtt_entity = "sensor.mqtt_temp"
all_entities = zha_entities + [mqtt_entity]
states = (
[_make_state(eid, "unavailable" if i < 5 else "on") for i, eid in enumerate(zha_entities)]
+ [_make_state(mqtt_entity, "unavailable")]
)
registry = (
[_make_registry_entry(eid, "zha") for eid in zha_entities]
+ [_make_registry_entry(mqtt_entity, "mqtt")]
)
for eid in zha_entities[:5]:
await storage.set_entity_unavailable_since(eid, "unavailable", time.time() - 25 * 3600)
await storage.set_entity_unavailable_since(mqtt_entity, "unavailable", time.time() - 25 * 3600)
settings = _make_settings(
integration_failure_threshold_pct=0.5,
integration_failure_min_entities=3,
)
check = UnavailableEntitiesCheck(
_make_client(states, registry), storage, settings
)
results = await check.run()
event_types = {r.event_type for r in results}
assert HAEventType.ha_integration_failed in event_types
assert HAEventType.ha_entity_unavailable_long in event_types
# Exactly 2 events: 1 integration + 1 individual mqtt entity
assert len(results) == 2
# ---------------------------------------------------------------------------
# Error handling
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ha_client_error_returns_dead_event(storage: Storage):
client = _make_client(states_error=ConnectionError("HA down"))
check = UnavailableEntitiesCheck(client, storage, _make_settings())
results = await check.run()
assert len(results) == 1
assert results[0].event_type == HAEventType.ha_websocket_dead
@pytest.mark.asyncio
async def test_registry_failure_falls_back_gracefully(storage: Storage):
"""Registry endpoint failure → individual entity events without integration info."""
states = [_make_state("light.k", "unavailable")]
client = _make_client(states)
client.get_entity_registry = AsyncMock(side_effect=Exception("registry unavailable"))
await storage.set_entity_unavailable_since(
"light.k", "unavailable", time.time() - 25 * 3600
)
check = UnavailableEntitiesCheck(client, storage, _make_settings())
results = await check.run()
assert len(results) == 1
assert results[0].event_type == HAEventType.ha_entity_unavailable_long
assert "integration" not in results[0].payload
# ---------------------------------------------------------------------------
# Area / integration in payload
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_area_included_in_payload_when_known(storage: Storage):
await storage.set_entity_unavailable_since(
"light.hall", "unavailable", time.time() - 25 * 3600
)
states = [_make_state("light.hall", "unavailable")]
registry = [_make_registry_entry("light.hall", "zha", "hallway")]
check = UnavailableEntitiesCheck(
_make_client(states, registry), storage, _make_settings()
)
results = await check.run()
assert len(results) == 1
assert results[0].payload.get("area") == "hallway"
assert results[0].payload.get("integration") == "zha"
@pytest.mark.asyncio
async def test_area_omitted_when_unknown(storage: Storage):
await storage.set_entity_unavailable_since(
"light.k", "unavailable", time.time() - 25 * 3600
)
states = [_make_state("light.k", "unavailable")]
registry = [_make_registry_entry("light.k", "zha", "")]
check = UnavailableEntitiesCheck(
_make_client(states, registry), storage, _make_settings()
)
results = await check.run()
assert "area" not in results[0].payload
feat(ha-diag-agent): three REST diagnostic checks + Phase 3 flag fixes New checks: - SystemHealthCheck (15min interval): detects newly-failing HA integrations via /api/system_health snapshot diff; transition-based dedup (ok→error fires, sustained error silent, error→ok clears alert) - UpdatesAvailableCheck (daily cron 09:00): per-update ha_update_available events with 7-day dedup; release notes truncated at 2000 chars - UpdatesDigestCheck (Sunday cron 09:00): single digest event with all pending updates; weekly ISO-week dedup, independent of daily dedup key - AutomationFailuresCheck (30min interval): detects automations with N consecutive failures (default 3) via /api/trace/automation/<id>; 6h cooldown per automation Phase 3 flag fixes: - Flag #1 (since field): UnavailableEntitiesCheck now uses min(state.last_changed, baseline.first_seen) as effective "since", giving accurate duration when agent was offline at entity's first fail - Flag #3 (registry cache): HAClient.get_entity_registry() caches response in-process with configurable TTL (default 300s); avoids repeated API calls across concurrent check cycles; invalidate_registry_cache() for manual invalidation Storage: system_health_snapshot table (component, last_status, last_seen_at, payload) created automatically on next Storage.open() call Config additions (all with defaults): entity_registry_cache_ttl=300, system_health_check_interval=900, automation_check_interval=1800, automation_failure_threshold=3, updates_check_hour=9, updates_check_minute=0, updates_cooldown_days=7 Tests: 95 unit tests pass (49 new), 13 integration tests pass (9 new); 3 skipped (live-HA token not set in CI) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 14:43:10 +02:00
# ---------------------------------------------------------------------------
# Phase 3 Flag #1: since = min(last_changed, first_seen)
# ---------------------------------------------------------------------------
def _make_state_with_last_changed(
entity_id: str, state: str, last_changed_iso: str
) -> dict:
return {
"entity_id": entity_id,
"state": state,
"attributes": {},
"last_changed": last_changed_iso,
}
@pytest.mark.asyncio
async def test_since_uses_last_changed_when_earlier_than_baseline(storage: Storage):
"""Entity's last_changed predates our baseline → duration computed from last_changed."""
import datetime as dt
now = time.time()
# Baseline recorded 1h ago (agent just started)
await storage.set_entity_unavailable_since("light.k", "unavailable", now - 3600)
# HA says entity changed to unavailable 48h ago
lc_iso = (
dt.datetime.fromtimestamp(now - 48 * 3600, tz=dt.timezone.utc)
.isoformat()
.replace("+00:00", "Z")
)
states = [_make_state_with_last_changed("light.k", "unavailable", lc_iso)]
check = UnavailableEntitiesCheck(
_make_client(states), storage, _make_settings(unavailable_threshold_hours=0.0)
)
results = await check.run()
assert len(results) == 1
# Duration should be ~48h, not ~1h
assert results[0].payload["duration_hours"] == pytest.approx(48.0, abs=0.1)
@pytest.mark.asyncio
async def test_since_ignores_last_changed_when_later_than_baseline(storage: Storage):
"""Baseline predates last_changed → use baseline (entity was unavailable before
last_changed, e.g. if HA reports last_changed as now for some reason)."""
import datetime as dt
now = time.time()
# Baseline recorded 48h ago
await storage.set_entity_unavailable_since("light.k", "unavailable", now - 48 * 3600)
# HA says last_changed is only 2h ago (shouldn't override the older baseline)
lc_iso = (
dt.datetime.fromtimestamp(now - 2 * 3600, tz=dt.timezone.utc)
.isoformat()
.replace("+00:00", "Z")
)
states = [_make_state_with_last_changed("light.k", "unavailable", lc_iso)]
check = UnavailableEntitiesCheck(
_make_client(states), storage, _make_settings(unavailable_threshold_hours=0.0)
)
results = await check.run()
assert len(results) == 1
# Duration should be ~48h (from baseline), not ~2h
assert results[0].payload["duration_hours"] == pytest.approx(48.0, abs=0.1)
@pytest.mark.asyncio
async def test_since_falls_back_gracefully_when_last_changed_missing(storage: Storage):
"""No last_changed in state → uses baseline first_seen without error."""
await storage.set_entity_unavailable_since(
"light.k", "unavailable", time.time() - 25 * 3600
)
states = [_make_state("light.k", "unavailable")] # no last_changed key
check = UnavailableEntitiesCheck(
_make_client(states), storage, _make_settings(unavailable_threshold_hours=0.0)
)
results = await check.run()
assert len(results) == 1
assert results[0].event_type == HAEventType.ha_entity_unavailable_long