"""Unit tests for UnavailableEntitiesCheck.""" from __future__ import annotations import time from pathlib import Path from unittest.mock import AsyncMock, MagicMock import pytest from ha_diag.checks.unavailable_entities import UnavailableEntitiesCheck from ha_diag.config import Settings from ha_diag.models import HAEventType from ha_diag.storage import Storage # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_settings(**overrides) -> Settings: """Settings with safe test defaults (alert immediately, no cooldown).""" defaults: dict = { "ha_url": "http://test.local:8123", "ha_token": "test", "node_name": "test-node", "location_tag": "test-loc", "unavailable_threshold_hours": 0.0, # alert immediately "integration_failure_threshold_pct": 0.5, "integration_failure_min_entities": 3, "alert_cooldown_hours": 0.0, # no dedup window in most tests "check_interval": 60, "check_interval_unavailable": 3600, } defaults.update(overrides) return Settings(**defaults) def _make_state(entity_id: str, state: str = "on") -> dict: return {"entity_id": entity_id, "state": state, "attributes": {}} def _make_registry_entry(entity_id: str, platform: str, area_id: str = "") -> dict: return {"entity_id": entity_id, "platform": platform, "area_id": area_id} def _make_client(states=None, registry=None, states_error=None): client = MagicMock() if states_error: client.get_states = AsyncMock(side_effect=states_error) else: client.get_states = AsyncMock(return_value=states or []) client.get_entity_registry = AsyncMock(return_value=registry or []) return client # --------------------------------------------------------------------------- # Basic unavailability detection # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_no_unavailable_entities_returns_empty(storage: Storage): states = [_make_state("light.a", "on"), _make_state("sensor.b", "off")] check = UnavailableEntitiesCheck(_make_client(states), storage, _make_settings()) assert await check.run() == [] @pytest.mark.asyncio async def test_first_cycle_records_baseline_no_event(storage: Storage): """First observation of unavailable entity: record, don't alert yet.""" states = [_make_state("light.kitchen", "unavailable")] settings = _make_settings(unavailable_threshold_hours=1.0) # needs 1h before alert check = UnavailableEntitiesCheck(_make_client(states), storage, settings) results = await check.run() assert results == [] # Baseline should be recorded first_at = await storage.get_entity_first_unavailable_at("light.kitchen") assert first_at is not None @pytest.mark.asyncio async def test_unavailable_below_threshold_no_event(storage: Storage): states = [_make_state("light.kitchen", "unavailable")] settings = _make_settings(unavailable_threshold_hours=24.0) check = UnavailableEntitiesCheck(_make_client(states), storage, settings) # Seed the baseline as if entity just became unavailable await storage.set_entity_unavailable_since("light.kitchen", "unavailable", time.time()) results = await check.run() assert results == [] @pytest.mark.asyncio async def test_unavailable_above_threshold_emits_event(storage: Storage): states = [_make_state("light.kitchen", "unavailable")] check = UnavailableEntitiesCheck( _make_client(states), storage, _make_settings() ) # Seed baseline as if 25h ago await storage.set_entity_unavailable_since( "light.kitchen", "unavailable", time.time() - 25 * 3600 ) results = await check.run() assert len(results) == 1 assert results[0].event_type == HAEventType.ha_entity_unavailable_long assert results[0].payload["entity_id"] == "light.kitchen" assert results[0].payload["duration_hours"] == pytest.approx(25.0, abs=0.1) assert results[0].payload["domain"] == "light" @pytest.mark.asyncio async def test_unknown_state_treated_as_unavailable(storage: Storage): states = [_make_state("sensor.temp", "unknown")] await storage.set_entity_unavailable_since( "sensor.temp", "unknown", time.time() - 25 * 3600 ) check = UnavailableEntitiesCheck( _make_client(states), storage, _make_settings() ) results = await check.run() assert len(results) == 1 assert results[0].payload["state"] == "unknown" @pytest.mark.asyncio async def test_payload_contains_since_timestamp(storage: Storage): first_at = time.time() - 27 * 3600 await storage.set_entity_unavailable_since("light.k", "unavailable", first_at) states = [_make_state("light.k", "unavailable")] check = UnavailableEntitiesCheck( _make_client(states), storage, _make_settings() ) results = await check.run() assert len(results) == 1 assert "since" in results[0].payload assert "Z" in results[0].payload["since"] # ISO UTC timestamp # --------------------------------------------------------------------------- # Recovery # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_recovery_clears_baseline(storage: Storage): await storage.set_entity_unavailable_since("light.k", "unavailable", time.time()) # Entity is now back online states = [_make_state("light.k", "on")] check = UnavailableEntitiesCheck( _make_client(states), storage, _make_settings() ) await check.run() assert await storage.get_entity_first_unavailable_at("light.k") is None @pytest.mark.asyncio async def test_recovery_clears_alert_dedup(storage: Storage): await storage.set_entity_unavailable_since( "light.k", "unavailable", time.time() - 25 * 3600 ) await storage.mark_alert_sent("entity_unavailable:light.k") # Entity recovers states = [_make_state("light.k", "on")] check = UnavailableEntitiesCheck( _make_client(states), storage, _make_settings() ) await check.run() # Alert dedup should be gone assert not await storage.was_alert_sent("entity_unavailable:light.k", 9999) # --------------------------------------------------------------------------- # Alert cooldown / deduplication # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_cooldown_prevents_duplicate_event(storage: Storage): await storage.set_entity_unavailable_since( "light.k", "unavailable", time.time() - 25 * 3600 ) settings = _make_settings(alert_cooldown_hours=6.0) states = [_make_state("light.k", "unavailable")] check = UnavailableEntitiesCheck(_make_client(states), storage, settings) results1 = await check.run() assert len(results1) == 1 # first alert fires results2 = await check.run() assert results2 == [] # cooldown active @pytest.mark.asyncio async def test_no_cooldown_allows_repeat_event(storage: Storage): await storage.set_entity_unavailable_since( "light.k", "unavailable", time.time() - 25 * 3600 ) settings = _make_settings(alert_cooldown_hours=0.0) states = [_make_state("light.k", "unavailable")] check = UnavailableEntitiesCheck(_make_client(states), storage, settings) results1 = await check.run() results2 = await check.run() assert len(results1) == 1 assert len(results2) == 1 # --------------------------------------------------------------------------- # Integration root-cause grouping # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_integration_failure_emits_single_event(storage: Storage): """5/8 entities from zha unavailable → ha_integration_failed, not 5 entity events.""" zha_entities = [f"light.zha_{i}" for i in range(8)] states = [ _make_state(eid, "unavailable" if i < 5 else "on") for i, eid in enumerate(zha_entities) ] registry = [_make_registry_entry(eid, "zha") for eid in zha_entities] # Seed baselines for unavailable entities as 25h ago for eid in zha_entities[:5]: await storage.set_entity_unavailable_since(eid, "unavailable", time.time() - 25 * 3600) settings = _make_settings( integration_failure_threshold_pct=0.5, integration_failure_min_entities=3, ) check = UnavailableEntitiesCheck( _make_client(states, registry), storage, settings ) results = await check.run() assert len(results) == 1 assert results[0].event_type == HAEventType.ha_integration_failed assert results[0].payload["integration"] == "zha" assert results[0].payload["unavailable_count"] == 5 assert results[0].payload["total_count"] == 8 assert set(results[0].payload["affected_entities"]) == set(zha_entities[:5]) @pytest.mark.asyncio async def test_integration_failure_below_pct_threshold(storage: Storage): """2/8 entities from zha unavailable (25%) → per-entity events, not integration event.""" zha_entities = [f"light.zha_{i}" for i in range(8)] states = [ _make_state(eid, "unavailable" if i < 2 else "on") for i, eid in enumerate(zha_entities) ] registry = [_make_registry_entry(eid, "zha") for eid in zha_entities] for eid in zha_entities[:2]: await storage.set_entity_unavailable_since(eid, "unavailable", time.time() - 25 * 3600) settings = _make_settings( integration_failure_threshold_pct=0.5, integration_failure_min_entities=3, ) check = UnavailableEntitiesCheck( _make_client(states, registry), storage, settings ) results = await check.run() # Below count threshold (2 < 3) so individual events assert all(r.event_type == HAEventType.ha_entity_unavailable_long for r in results) assert len(results) == 2 @pytest.mark.asyncio async def test_integration_failure_below_count_threshold(storage: Storage): """3/6 entities unavailable (50%) but min_entities=5 → per-entity events.""" zha_entities = [f"light.zha_{i}" for i in range(6)] states = [ _make_state(eid, "unavailable" if i < 3 else "on") for i, eid in enumerate(zha_entities) ] registry = [_make_registry_entry(eid, "zha") for eid in zha_entities] for eid in zha_entities[:3]: await storage.set_entity_unavailable_since(eid, "unavailable", time.time() - 25 * 3600) settings = _make_settings( integration_failure_threshold_pct=0.5, integration_failure_min_entities=5, # need 5, only have 3 ) check = UnavailableEntitiesCheck( _make_client(states, registry), storage, settings ) results = await check.run() assert all(r.event_type == HAEventType.ha_entity_unavailable_long for r in results) @pytest.mark.asyncio async def test_entity_without_integration_gets_individual_event(storage: Storage): """Entity not in entity registry gets per-entity event regardless of integration grouping.""" await storage.set_entity_unavailable_since( "light.mystery", "unavailable", time.time() - 25 * 3600 ) states = [_make_state("light.mystery", "unavailable")] # Empty registry — no integration info check = UnavailableEntitiesCheck( _make_client(states, []), storage, _make_settings() ) results = await check.run() assert len(results) == 1 assert results[0].event_type == HAEventType.ha_entity_unavailable_long assert "integration" not in results[0].payload @pytest.mark.asyncio async def test_mixed_integrations_correctly_partitioned(storage: Storage): """5 zha entities unavailable (triggers integration event) + 1 mqtt entity (individual).""" zha_entities = [f"light.zha_{i}" for i in range(8)] mqtt_entity = "sensor.mqtt_temp" all_entities = zha_entities + [mqtt_entity] states = ( [_make_state(eid, "unavailable" if i < 5 else "on") for i, eid in enumerate(zha_entities)] + [_make_state(mqtt_entity, "unavailable")] ) registry = ( [_make_registry_entry(eid, "zha") for eid in zha_entities] + [_make_registry_entry(mqtt_entity, "mqtt")] ) for eid in zha_entities[:5]: await storage.set_entity_unavailable_since(eid, "unavailable", time.time() - 25 * 3600) await storage.set_entity_unavailable_since(mqtt_entity, "unavailable", time.time() - 25 * 3600) settings = _make_settings( integration_failure_threshold_pct=0.5, integration_failure_min_entities=3, ) check = UnavailableEntitiesCheck( _make_client(states, registry), storage, settings ) results = await check.run() event_types = {r.event_type for r in results} assert HAEventType.ha_integration_failed in event_types assert HAEventType.ha_entity_unavailable_long in event_types # Exactly 2 events: 1 integration + 1 individual mqtt entity assert len(results) == 2 # --------------------------------------------------------------------------- # Error handling # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_ha_client_error_returns_dead_event(storage: Storage): client = _make_client(states_error=ConnectionError("HA down")) check = UnavailableEntitiesCheck(client, storage, _make_settings()) results = await check.run() assert len(results) == 1 assert results[0].event_type == HAEventType.ha_websocket_dead @pytest.mark.asyncio async def test_registry_failure_falls_back_gracefully(storage: Storage): """Registry endpoint failure → individual entity events without integration info.""" states = [_make_state("light.k", "unavailable")] client = _make_client(states) client.get_entity_registry = AsyncMock(side_effect=Exception("registry unavailable")) await storage.set_entity_unavailable_since( "light.k", "unavailable", time.time() - 25 * 3600 ) check = UnavailableEntitiesCheck(client, storage, _make_settings()) results = await check.run() assert len(results) == 1 assert results[0].event_type == HAEventType.ha_entity_unavailable_long assert "integration" not in results[0].payload # --------------------------------------------------------------------------- # Area / integration in payload # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_area_included_in_payload_when_known(storage: Storage): await storage.set_entity_unavailable_since( "light.hall", "unavailable", time.time() - 25 * 3600 ) states = [_make_state("light.hall", "unavailable")] registry = [_make_registry_entry("light.hall", "zha", "hallway")] check = UnavailableEntitiesCheck( _make_client(states, registry), storage, _make_settings() ) results = await check.run() assert len(results) == 1 assert results[0].payload.get("area") == "hallway" assert results[0].payload.get("integration") == "zha" @pytest.mark.asyncio async def test_area_omitted_when_unknown(storage: Storage): await storage.set_entity_unavailable_since( "light.k", "unavailable", time.time() - 25 * 3600 ) states = [_make_state("light.k", "unavailable")] registry = [_make_registry_entry("light.k", "zha", "")] check = UnavailableEntitiesCheck( _make_client(states, registry), storage, _make_settings() ) results = await check.run() assert "area" not in results[0].payload # --------------------------------------------------------------------------- # Phase 3 Flag #1: since = min(last_changed, first_seen) # --------------------------------------------------------------------------- def _make_state_with_last_changed( entity_id: str, state: str, last_changed_iso: str ) -> dict: return { "entity_id": entity_id, "state": state, "attributes": {}, "last_changed": last_changed_iso, } @pytest.mark.asyncio async def test_since_uses_last_changed_when_earlier_than_baseline(storage: Storage): """Entity's last_changed predates our baseline → duration computed from last_changed.""" import datetime as dt now = time.time() # Baseline recorded 1h ago (agent just started) await storage.set_entity_unavailable_since("light.k", "unavailable", now - 3600) # HA says entity changed to unavailable 48h ago lc_iso = ( dt.datetime.fromtimestamp(now - 48 * 3600, tz=dt.timezone.utc) .isoformat() .replace("+00:00", "Z") ) states = [_make_state_with_last_changed("light.k", "unavailable", lc_iso)] check = UnavailableEntitiesCheck( _make_client(states), storage, _make_settings(unavailable_threshold_hours=0.0) ) results = await check.run() assert len(results) == 1 # Duration should be ~48h, not ~1h assert results[0].payload["duration_hours"] == pytest.approx(48.0, abs=0.1) @pytest.mark.asyncio async def test_since_ignores_last_changed_when_later_than_baseline(storage: Storage): """Baseline predates last_changed → use baseline (entity was unavailable before last_changed, e.g. if HA reports last_changed as now for some reason).""" import datetime as dt now = time.time() # Baseline recorded 48h ago await storage.set_entity_unavailable_since("light.k", "unavailable", now - 48 * 3600) # HA says last_changed is only 2h ago (shouldn't override the older baseline) lc_iso = ( dt.datetime.fromtimestamp(now - 2 * 3600, tz=dt.timezone.utc) .isoformat() .replace("+00:00", "Z") ) states = [_make_state_with_last_changed("light.k", "unavailable", lc_iso)] check = UnavailableEntitiesCheck( _make_client(states), storage, _make_settings(unavailable_threshold_hours=0.0) ) results = await check.run() assert len(results) == 1 # Duration should be ~48h (from baseline), not ~2h assert results[0].payload["duration_hours"] == pytest.approx(48.0, abs=0.1) @pytest.mark.asyncio async def test_since_falls_back_gracefully_when_last_changed_missing(storage: Storage): """No last_changed in state → uses baseline first_seen without error.""" await storage.set_entity_unavailable_since( "light.k", "unavailable", time.time() - 25 * 3600 ) states = [_make_state("light.k", "unavailable")] # no last_changed key check = UnavailableEntitiesCheck( _make_client(states), storage, _make_settings(unavailable_threshold_hours=0.0) ) results = await check.run() assert len(results) == 1 assert results[0].event_type == HAEventType.ha_entity_unavailable_long