homelab-codex-ws/services/ha-diag-agent/tests/test_ha_client.py
Oskar Kapala c255a021d1 fix(observer): quarantine malformed event files to prevent processing wedge
Was: malformed event (bad JSON / truncated / corrupted bytes) wedged the
node's checkpoint forever — every cycle re-tried, logged, never advanced
past the bad file; all subsequent good events for that node lost.

Now: first parse failure -> atomic os.replace to STATE_DIR/observer_failed_events/<node>/
with collision handling. Checkpoint advances, downstream events flow.
Move failures themselves are logged but don't crash the loop.

Complementary to yesterday's atomic_write_json fix (state files);
this addresses the same race-pattern on event files instead.

Regression test asserts: bad event quarantined to failed_events dir,
removed from hot path, subsequent good event processed (node online),
checkpoint moves to good event.
2026-06-12 11:22:56 +02:00

136 lines
5.4 KiB
Python

"""Tests for HAClient using aioresponses to mock aiohttp."""
from __future__ import annotations
import pytest
from aioresponses import aioresponses
from ha_diag.ha_client import HAClient, make_session
HA_URL = "http://homeassistant.test:8123"
TOKEN = "test-token"
@pytest.mark.asyncio
async def test_get_api_status_ok():
with aioresponses() as m:
m.get(f"{HA_URL}/api/", payload={"message": "API running."})
async with make_session(TOKEN) as session:
client = HAClient(HA_URL, session)
result = await client.get_api_status()
assert result == {"message": "API running."}
@pytest.mark.asyncio
async def test_get_api_status_unauthorized():
with aioresponses() as m:
m.get(f"{HA_URL}/api/", status=401)
async with make_session(TOKEN) as session:
client = HAClient(HA_URL, session)
with pytest.raises(Exception):
await client.get_api_status()
@pytest.mark.asyncio
async def test_get_states_returns_list():
payload = [{"entity_id": "light.living_room", "state": "on"}]
with aioresponses() as m:
m.get(f"{HA_URL}/api/states", payload=payload)
async with make_session(TOKEN) as session:
client = HAClient(HA_URL, session)
states = await client.get_states()
assert isinstance(states, list)
assert states[0]["entity_id"] == "light.living_room"
@pytest.mark.asyncio
async def test_get_config_returns_dict():
payload = {"version": "2024.1.0", "location_name": "Home"}
with aioresponses() as m:
m.get(f"{HA_URL}/api/config", payload=payload)
async with make_session(TOKEN) as session:
client = HAClient(HA_URL, session)
config = await client.get_config()
assert config["version"] == "2024.1.0"
@pytest.mark.asyncio
async def test_get_entity_registry_returns_list():
payload = [
{"entity_id": "light.hall", "platform": "zha", "area_id": "hallway"},
{"entity_id": "sensor.temp", "platform": "mqtt", "area_id": None},
]
with aioresponses() as m:
m.get(f"{HA_URL}/api/config/entity_registry", payload=payload)
async with make_session(TOKEN) as session:
client = HAClient(HA_URL, session)
registry = await client.get_entity_registry()
assert len(registry) == 2
assert registry[0]["platform"] == "zha"
@pytest.mark.asyncio
async def test_make_session_sets_auth_header():
"""make_session injects the Bearer token in all requests."""
with aioresponses() as m:
m.get(f"{HA_URL}/api/", payload={"message": "API running."})
async with make_session("my-secret-token") as session:
client = HAClient(HA_URL, session)
await client.get_api_status()
# Verify the Authorization header was sent
assert session.headers.get("Authorization") == "Bearer my-secret-token"
# ---------------------------------------------------------------------------
# Entity registry TTL cache (Phase 3 Flag #3)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_entity_registry_cached_on_second_call():
"""Second call within TTL returns cache, making only one HTTP request."""
payload = [{"entity_id": "light.hall", "platform": "zha", "area_id": "hallway"}]
with aioresponses() as m:
m.get(f"{HA_URL}/api/config/entity_registry", payload=payload)
async with make_session(TOKEN) as session:
client = HAClient(HA_URL, session, entity_registry_cache_ttl=300.0)
r1 = await client.get_entity_registry()
r2 = await client.get_entity_registry() # from cache — no second HTTP call
assert r1 == r2
# aioresponses would raise ConnectionError on the unmocked second request
# if caching weren't working; reaching here means it used the cache.
@pytest.mark.asyncio
async def test_entity_registry_cache_bypassed_after_ttl(monkeypatch):
"""After TTL expiry, next call fetches fresh data."""
import time
payload = [{"entity_id": "light.hall", "platform": "zha", "area_id": "hallway"}]
with aioresponses() as m:
m.get(f"{HA_URL}/api/config/entity_registry", payload=payload)
m.get(f"{HA_URL}/api/config/entity_registry", payload=payload)
async with make_session(TOKEN) as session:
client = HAClient(HA_URL, session, entity_registry_cache_ttl=0.0)
await client.get_entity_registry() # fetches
await client.get_entity_registry() # TTL=0 → fetches again
@pytest.mark.asyncio
async def test_invalidate_registry_cache_forces_refetch():
"""invalidate_registry_cache() makes the next call hit the network."""
payload = [{"entity_id": "light.hall", "platform": "zha", "area_id": ""}]
with aioresponses() as m:
m.get(f"{HA_URL}/api/config/entity_registry", payload=payload)
m.get(f"{HA_URL}/api/config/entity_registry", payload=payload)
async with make_session(TOKEN) as session:
client = HAClient(HA_URL, session, entity_registry_cache_ttl=300.0)
await client.get_entity_registry()
client.invalidate_registry_cache()
await client.get_entity_registry() # must hit network again
@pytest.mark.asyncio
async def test_entity_registry_cache_default_ttl_is_300():
async with make_session(TOKEN) as session:
client = HAClient(HA_URL, session)
assert client._registry_cache_ttl == 300.0