homelab-codex-ws/services/ha-diag-agent/tests/integration/test_unavailable_entities_integration.py

193 lines
7.1 KiB
Python
Raw Permalink Normal View History

"""Functional integration test for UnavailableEntitiesCheck.
Uses aioresponses for HA HTTP (controlled, deterministic) and real aiosqlite +
EventEmitter (tests the full agent pipeline end-to-end without a live HA).
Marked 'integration' because it exercises the complete multi-component stack.
For a live-HA variant, start the ken testenv Docker instances, set
TEST_HA_TOKEN, and extend with tests that call real HA endpoints.
"""
from __future__ import annotations
import json
import time
from pathlib import Path
from typing import AsyncGenerator
import pytest
import pytest_asyncio
from aioresponses import aioresponses
from ha_diag.checks.unavailable_entities import UnavailableEntitiesCheck
from ha_diag.config import Settings
from ha_diag.event_emitter import EventEmitter
from ha_diag.ha_client import HAClient, make_session
from ha_diag.models import HAEventType
from ha_diag.storage import Storage
HA_URL = "http://ha-test-ken:8123"
def _settings(**overrides) -> Settings:
defaults: dict = {
"ha_url": HA_URL,
"ha_token": "test-token",
"node_name": "piha",
"location_tag": "ken",
"unavailable_threshold_hours": 0.0,
"integration_failure_threshold_pct": 0.5,
"integration_failure_min_entities": 3,
"alert_cooldown_hours": 0.0,
"check_interval": 60,
"check_interval_unavailable": 3600,
}
defaults.update(overrides)
return Settings(**defaults)
@pytest_asyncio.fixture
async def storage(tmp_path: Path) -> AsyncGenerator[Storage, None]:
s = Storage(tmp_path / "integration_test.db")
await s.open()
yield s
await s.close()
@pytest.fixture
def events_dir(tmp_path: Path) -> Path:
d = tmp_path / "events"
d.mkdir()
return d
@pytest.mark.integration
async def test_full_pipeline_integration_event(storage: Storage, events_dir: Path):
"""3/3 zha entities unavailable → ha_integration_failed, 1 event file on disk."""
unavailable_entities = [
{"entity_id": f"light.test_{i}", "state": "unavailable", "attributes": {}}
for i in range(3)
]
available_entities = [{"entity_id": "sensor.ok", "state": "on", "attributes": {}}]
all_states = unavailable_entities + available_entities
registry = [
{"entity_id": e["entity_id"], "platform": "zha", "area_id": "living_room"}
for e in unavailable_entities
]
for e in unavailable_entities:
await storage.set_entity_unavailable_since(
e["entity_id"], "unavailable", time.time() - 25 * 3600
)
emitter = EventEmitter(events_dir, node_name="piha", location_tag="ken")
with aioresponses() as m:
m.get(f"{HA_URL}/api/states", payload=all_states)
m.get(f"{HA_URL}/api/config/entity_registry", payload=registry)
async with make_session("test-token") as session:
client = HAClient(HA_URL, session)
check = UnavailableEntitiesCheck(client, storage, _settings())
results = await check.run()
# 3/3 zha entities (100% >= 50%, count 3 >= 3) → integration event
assert len(results) == 1
assert results[0].event_type == HAEventType.ha_integration_failed
assert results[0].payload["integration"] == "zha"
emitter.emit(
event_type=results[0].event_type,
severity=results[0].severity.value,
service="homeassistant",
message=results[0].message,
payload=results[0].payload,
)
event_files = list(events_dir.glob("*.json"))
assert len(event_files) == 1
event_data = json.loads(event_files[0].read_text())
assert event_data["node"] == "piha"
assert event_data["payload"]["location_tag"] == "ken"
assert event_data["payload"]["integration"] == "zha"
assert event_data["type"] == "ha_integration_failed"
@pytest.mark.integration
async def test_full_pipeline_individual_entity_events(
storage: Storage, events_dir: Path
):
"""2 unavailable entities from different integrations → 2 individual events."""
states = [
{"entity_id": "light.zha_one", "state": "unavailable", "attributes": {}},
{"entity_id": "sensor.mqtt_one", "state": "unavailable", "attributes": {}},
{"entity_id": "switch.ok", "state": "on", "attributes": {}},
]
registry = [
{"entity_id": "light.zha_one", "platform": "zha", "area_id": ""},
{"entity_id": "sensor.mqtt_one", "platform": "mqtt", "area_id": ""},
]
for e in ["light.zha_one", "sensor.mqtt_one"]:
await storage.set_entity_unavailable_since(e, "unavailable", time.time() - 25 * 3600)
emitter = EventEmitter(events_dir, node_name="piha", location_tag="ken")
with aioresponses() as m:
m.get(f"{HA_URL}/api/states", payload=states)
m.get(f"{HA_URL}/api/config/entity_registry", payload=registry)
async with make_session("test-token") as session:
client = HAClient(HA_URL, session)
check = UnavailableEntitiesCheck(client, storage, _settings())
results = await check.run()
# Both integrations have only 1 entity each → below min_entities threshold
assert len(results) == 2
assert all(r.event_type == HAEventType.ha_entity_unavailable_long for r in results)
for result in results:
emitter.emit(
event_type=result.event_type,
severity=result.severity.value,
service="homeassistant",
message=result.message,
payload=result.payload,
)
files = list(events_dir.glob("*.json"))
assert len(files) == 2
for f in files:
data = json.loads(f.read_text())
assert data["payload"]["location_tag"] == "ken"
assert "entity_id" in data["payload"]
assert "since" in data["payload"]
assert data["payload"]["since"].endswith("Z")
@pytest.mark.integration
async def test_recovery_removes_tracking(storage: Storage, events_dir: Path):
"""Entity recovers between check cycles → baseline cleared, no event next cycle."""
eid = "light.recoverable"
await storage.set_entity_unavailable_since(eid, "unavailable", time.time() - 25 * 3600)
# Cycle 1: entity unavailable → event
states_cycle1 = [{"entity_id": eid, "state": "unavailable", "attributes": {}}]
with aioresponses() as m:
m.get(f"{HA_URL}/api/states", payload=states_cycle1)
m.get(f"{HA_URL}/api/config/entity_registry", payload=[])
async with make_session("test-token") as session:
client = HAClient(HA_URL, session)
check = UnavailableEntitiesCheck(client, storage, _settings())
results1 = await check.run()
assert len(results1) == 1
# Cycle 2: entity recovered → no event, baseline cleared
states_cycle2 = [{"entity_id": eid, "state": "on", "attributes": {}}]
with aioresponses() as m:
m.get(f"{HA_URL}/api/states", payload=states_cycle2)
m.get(f"{HA_URL}/api/config/entity_registry", payload=[])
async with make_session("test-token") as session:
client = HAClient(HA_URL, session)
check2 = UnavailableEntitiesCheck(client, storage, _settings())
results2 = await check2.run()
assert results2 == []
assert await storage.get_entity_first_unavailable_at(eid) is None