New checks: - SystemHealthCheck (15min interval): detects newly-failing HA integrations via /api/system_health snapshot diff; transition-based dedup (ok→error fires, sustained error silent, error→ok clears alert) - UpdatesAvailableCheck (daily cron 09:00): per-update ha_update_available events with 7-day dedup; release notes truncated at 2000 chars - UpdatesDigestCheck (Sunday cron 09:00): single digest event with all pending updates; weekly ISO-week dedup, independent of daily dedup key - AutomationFailuresCheck (30min interval): detects automations with N consecutive failures (default 3) via /api/trace/automation/<id>; 6h cooldown per automation Phase 3 flag fixes: - Flag #1 (since field): UnavailableEntitiesCheck now uses min(state.last_changed, baseline.first_seen) as effective "since", giving accurate duration when agent was offline at entity's first fail - Flag #3 (registry cache): HAClient.get_entity_registry() caches response in-process with configurable TTL (default 300s); avoids repeated API calls across concurrent check cycles; invalidate_registry_cache() for manual invalidation Storage: system_health_snapshot table (component, last_status, last_seen_at, payload) created automatically on next Storage.open() call Config additions (all with defaults): entity_registry_cache_ttl=300, system_health_check_interval=900, automation_check_interval=1800, automation_failure_threshold=3, updates_check_hour=9, updates_check_minute=0, updates_cooldown_days=7 Tests: 95 unit tests pass (49 new), 13 integration tests pass (9 new); 3 skipped (live-HA token not set in CI) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
136 lines
5.4 KiB
Python
136 lines
5.4 KiB
Python
"""Tests for HAClient using aioresponses to mock aiohttp."""
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
from aioresponses import aioresponses
|
|
|
|
from ha_diag.ha_client import HAClient, make_session
|
|
|
|
HA_URL = "http://homeassistant.test:8123"
|
|
TOKEN = "test-token"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_api_status_ok():
|
|
with aioresponses() as m:
|
|
m.get(f"{HA_URL}/api/", payload={"message": "API running."})
|
|
async with make_session(TOKEN) as session:
|
|
client = HAClient(HA_URL, session)
|
|
result = await client.get_api_status()
|
|
assert result == {"message": "API running."}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_api_status_unauthorized():
|
|
with aioresponses() as m:
|
|
m.get(f"{HA_URL}/api/", status=401)
|
|
async with make_session(TOKEN) as session:
|
|
client = HAClient(HA_URL, session)
|
|
with pytest.raises(Exception):
|
|
await client.get_api_status()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_states_returns_list():
|
|
payload = [{"entity_id": "light.living_room", "state": "on"}]
|
|
with aioresponses() as m:
|
|
m.get(f"{HA_URL}/api/states", payload=payload)
|
|
async with make_session(TOKEN) as session:
|
|
client = HAClient(HA_URL, session)
|
|
states = await client.get_states()
|
|
assert isinstance(states, list)
|
|
assert states[0]["entity_id"] == "light.living_room"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_config_returns_dict():
|
|
payload = {"version": "2024.1.0", "location_name": "Home"}
|
|
with aioresponses() as m:
|
|
m.get(f"{HA_URL}/api/config", payload=payload)
|
|
async with make_session(TOKEN) as session:
|
|
client = HAClient(HA_URL, session)
|
|
config = await client.get_config()
|
|
assert config["version"] == "2024.1.0"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_entity_registry_returns_list():
|
|
payload = [
|
|
{"entity_id": "light.hall", "platform": "zha", "area_id": "hallway"},
|
|
{"entity_id": "sensor.temp", "platform": "mqtt", "area_id": None},
|
|
]
|
|
with aioresponses() as m:
|
|
m.get(f"{HA_URL}/api/config/entity_registry", payload=payload)
|
|
async with make_session(TOKEN) as session:
|
|
client = HAClient(HA_URL, session)
|
|
registry = await client.get_entity_registry()
|
|
assert len(registry) == 2
|
|
assert registry[0]["platform"] == "zha"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_make_session_sets_auth_header():
|
|
"""make_session injects the Bearer token in all requests."""
|
|
with aioresponses() as m:
|
|
m.get(f"{HA_URL}/api/", payload={"message": "API running."})
|
|
async with make_session("my-secret-token") as session:
|
|
client = HAClient(HA_URL, session)
|
|
await client.get_api_status()
|
|
# Verify the Authorization header was sent
|
|
assert session.headers.get("Authorization") == "Bearer my-secret-token"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Entity registry TTL cache (Phase 3 Flag #3)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_entity_registry_cached_on_second_call():
|
|
"""Second call within TTL returns cache, making only one HTTP request."""
|
|
payload = [{"entity_id": "light.hall", "platform": "zha", "area_id": "hallway"}]
|
|
with aioresponses() as m:
|
|
m.get(f"{HA_URL}/api/config/entity_registry", payload=payload)
|
|
async with make_session(TOKEN) as session:
|
|
client = HAClient(HA_URL, session, entity_registry_cache_ttl=300.0)
|
|
r1 = await client.get_entity_registry()
|
|
r2 = await client.get_entity_registry() # from cache — no second HTTP call
|
|
assert r1 == r2
|
|
# aioresponses would raise ConnectionError on the unmocked second request
|
|
# if caching weren't working; reaching here means it used the cache.
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_entity_registry_cache_bypassed_after_ttl(monkeypatch):
|
|
"""After TTL expiry, next call fetches fresh data."""
|
|
import time
|
|
payload = [{"entity_id": "light.hall", "platform": "zha", "area_id": "hallway"}]
|
|
with aioresponses() as m:
|
|
m.get(f"{HA_URL}/api/config/entity_registry", payload=payload)
|
|
m.get(f"{HA_URL}/api/config/entity_registry", payload=payload)
|
|
async with make_session(TOKEN) as session:
|
|
client = HAClient(HA_URL, session, entity_registry_cache_ttl=0.0)
|
|
await client.get_entity_registry() # fetches
|
|
await client.get_entity_registry() # TTL=0 → fetches again
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalidate_registry_cache_forces_refetch():
|
|
"""invalidate_registry_cache() makes the next call hit the network."""
|
|
payload = [{"entity_id": "light.hall", "platform": "zha", "area_id": ""}]
|
|
with aioresponses() as m:
|
|
m.get(f"{HA_URL}/api/config/entity_registry", payload=payload)
|
|
m.get(f"{HA_URL}/api/config/entity_registry", payload=payload)
|
|
async with make_session(TOKEN) as session:
|
|
client = HAClient(HA_URL, session, entity_registry_cache_ttl=300.0)
|
|
await client.get_entity_registry()
|
|
client.invalidate_registry_cache()
|
|
await client.get_entity_registry() # must hit network again
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_entity_registry_cache_default_ttl_is_300():
|
|
async with make_session(TOKEN) as session:
|
|
client = HAClient(HA_URL, session)
|
|
assert client._registry_cache_ttl == 300.0
|