- main.py: rename event= to ha_event= in _log.warning() — structlog treats 'event' as a reserved positional arg; the old name caused TypeError when any check returned unhealthy results (events were still emitted, but the check was logged as check_error instead of check_unhealthy) - tests/test_ha_client.py: replace aioresponses with unittest.mock — aioresponses 0.7.8 is incompatible with aiohttp >=3.12 (missing stream_writer kwarg) - pyproject.toml: remove aioresponses from dev dependencies Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
138 lines
4.9 KiB
Python
138 lines
4.9 KiB
Python
"""Tests for HAClient using unittest.mock to avoid aioresponses/aiohttp version coupling."""
|
|
from __future__ import annotations
|
|
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
import pytest
|
|
|
|
from ha_diag.ha_client import HAClient, make_session
|
|
|
|
HA_URL = "http://homeassistant.test:8123"
|
|
TOKEN = "test-token"
|
|
|
|
|
|
def _mock_resp(payload=None, text=None, status=200):
|
|
"""Return a mock that behaves like an aiohttp response context manager."""
|
|
resp = MagicMock()
|
|
resp.status = status
|
|
if status >= 400:
|
|
resp.raise_for_status.side_effect = Exception(f"HTTP {status}")
|
|
else:
|
|
resp.raise_for_status = MagicMock()
|
|
resp.json = AsyncMock(return_value=payload if payload is not None else {})
|
|
resp.text = AsyncMock(return_value=text or "")
|
|
resp.__aenter__ = AsyncMock(return_value=resp)
|
|
resp.__aexit__ = AsyncMock(return_value=False)
|
|
return resp
|
|
|
|
|
|
def _mock_session(get_resp=None):
|
|
session = MagicMock()
|
|
session.get.return_value = get_resp or _mock_resp()
|
|
return session
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_api_status_ok():
|
|
session = _mock_session(_mock_resp({"message": "API running."}))
|
|
client = HAClient(HA_URL, session)
|
|
result = await client.get_api_status()
|
|
assert result == {"message": "API running."}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_api_status_unauthorized():
|
|
session = _mock_session(_mock_resp(status=401))
|
|
client = HAClient(HA_URL, session)
|
|
with pytest.raises(Exception):
|
|
await client.get_api_status()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_states_returns_list():
|
|
payload = [{"entity_id": "light.living_room", "state": "on"}]
|
|
session = _mock_session(_mock_resp(payload))
|
|
client = HAClient(HA_URL, session)
|
|
states = await client.get_states()
|
|
assert isinstance(states, list)
|
|
assert states[0]["entity_id"] == "light.living_room"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_config_returns_dict():
|
|
payload = {"version": "2024.1.0", "location_name": "Home"}
|
|
session = _mock_session(_mock_resp(payload))
|
|
client = HAClient(HA_URL, session)
|
|
config = await client.get_config()
|
|
assert config["version"] == "2024.1.0"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_entity_registry_returns_list():
|
|
payload = [
|
|
{"entity_id": "light.hall", "platform": "zha", "area_id": "hallway"},
|
|
{"entity_id": "sensor.temp", "platform": "mqtt", "area_id": None},
|
|
]
|
|
session = _mock_session(_mock_resp(payload))
|
|
client = HAClient(HA_URL, session)
|
|
registry = await client.get_entity_registry()
|
|
assert len(registry) == 2
|
|
assert registry[0]["platform"] == "zha"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_make_session_sets_auth_header():
|
|
"""make_session injects the Bearer token in all requests."""
|
|
async with make_session("my-secret-token") as session:
|
|
assert session.headers.get("Authorization") == "Bearer my-secret-token"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Entity registry TTL cache (Phase 3 Flag #3)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_entity_registry_cached_on_second_call():
|
|
"""Second call within TTL returns cache, making only one HTTP request."""
|
|
payload = [{"entity_id": "light.hall", "platform": "zha", "area_id": "hallway"}]
|
|
session = _mock_session(_mock_resp(payload))
|
|
client = HAClient(HA_URL, session, entity_registry_cache_ttl=300.0)
|
|
r1 = await client.get_entity_registry()
|
|
r2 = await client.get_entity_registry() # from cache
|
|
assert r1 == r2
|
|
session.get.assert_called_once() # only one HTTP request
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_entity_registry_cache_bypassed_after_ttl():
|
|
"""After TTL expiry (ttl=0), next call fetches fresh data."""
|
|
payload = [{"entity_id": "light.hall", "platform": "zha", "area_id": "hallway"}]
|
|
session = _mock_session(_mock_resp(payload))
|
|
# TTL=0 means every call is stale → two fetches
|
|
session.get.side_effect = [_mock_resp(payload), _mock_resp(payload)]
|
|
client = HAClient(HA_URL, session, entity_registry_cache_ttl=0.0)
|
|
await client.get_entity_registry()
|
|
await client.get_entity_registry()
|
|
assert session.get.call_count == 2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalidate_registry_cache_forces_refetch():
|
|
"""invalidate_registry_cache() makes the next call hit the network."""
|
|
payload = [{"entity_id": "light.hall", "platform": "zha", "area_id": ""}]
|
|
session = _mock_session(_mock_resp(payload))
|
|
session.get.side_effect = [_mock_resp(payload), _mock_resp(payload)]
|
|
client = HAClient(HA_URL, session, entity_registry_cache_ttl=300.0)
|
|
await client.get_entity_registry()
|
|
client.invalidate_registry_cache()
|
|
await client.get_entity_registry()
|
|
assert session.get.call_count == 2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_entity_registry_cache_default_ttl_is_300():
|
|
session = _mock_session()
|
|
client = HAClient(HA_URL, session)
|
|
assert client._registry_cache_ttl == 300.0
|