170 lines
5.4 KiB
Python
170 lines
5.4 KiB
Python
|
|
"""Integration tests for UpdatesAvailableCheck and UpdatesDigestCheck.
|
||
|
|
|
||
|
|
Uses real aiosqlite Storage + EventEmitter + mocked HTTP.
|
||
|
|
"""
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import json
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import AsyncGenerator
|
||
|
|
|
||
|
|
import pytest
|
||
|
|
import pytest_asyncio
|
||
|
|
from aioresponses import aioresponses
|
||
|
|
|
||
|
|
from ha_diag.checks.updates_available import UpdatesAvailableCheck, UpdatesDigestCheck
|
||
|
|
from ha_diag.config import Settings
|
||
|
|
from ha_diag.event_emitter import EventEmitter
|
||
|
|
from ha_diag.ha_client import HAClient, make_session
|
||
|
|
from ha_diag.models import HAEventType
|
||
|
|
from ha_diag.storage import Storage
|
||
|
|
|
||
|
|
HA_URL = "http://ha-test-ken:8123"
|
||
|
|
|
||
|
|
|
||
|
|
def _settings(**overrides) -> Settings:
|
||
|
|
defaults: dict = {
|
||
|
|
"ha_url": HA_URL,
|
||
|
|
"ha_token": "test-token",
|
||
|
|
"node_name": "piha",
|
||
|
|
"location_tag": "ken",
|
||
|
|
"alert_cooldown_hours": 0.0,
|
||
|
|
"updates_cooldown_days": 0,
|
||
|
|
"check_interval": 60,
|
||
|
|
"check_interval_unavailable": 3600,
|
||
|
|
}
|
||
|
|
defaults.update(overrides)
|
||
|
|
return Settings(**defaults)
|
||
|
|
|
||
|
|
|
||
|
|
@pytest_asyncio.fixture
|
||
|
|
async def storage(tmp_path: Path) -> AsyncGenerator[Storage, None]:
|
||
|
|
s = Storage(tmp_path / "integration_test.db")
|
||
|
|
await s.open()
|
||
|
|
yield s
|
||
|
|
await s.close()
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def events_dir(tmp_path: Path) -> Path:
|
||
|
|
d = tmp_path / "events"
|
||
|
|
d.mkdir()
|
||
|
|
return d
|
||
|
|
|
||
|
|
|
||
|
|
def _update_states(*entity_ids: str) -> list[dict]:
|
||
|
|
return [
|
||
|
|
{
|
||
|
|
"entity_id": eid,
|
||
|
|
"state": "on",
|
||
|
|
"attributes": {
|
||
|
|
"title": eid.split(".")[-1].replace("_", " ").title(),
|
||
|
|
"installed_version": "1.0.0",
|
||
|
|
"latest_version": "2.0.0",
|
||
|
|
"in_progress": False,
|
||
|
|
"auto_update": False,
|
||
|
|
},
|
||
|
|
}
|
||
|
|
for eid in entity_ids
|
||
|
|
]
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.integration
|
||
|
|
async def test_individual_updates_written_to_disk(storage: Storage, events_dir: Path):
|
||
|
|
"""2 pending updates → 2 event files with correct structure."""
|
||
|
|
states = _update_states("update.ha_core", "update.mosquitto")
|
||
|
|
emitter = EventEmitter(events_dir, node_name="piha", location_tag="ken")
|
||
|
|
|
||
|
|
with aioresponses() as m:
|
||
|
|
m.get(f"{HA_URL}/api/states", payload=states)
|
||
|
|
async with make_session("test-token") as session:
|
||
|
|
client = HAClient(HA_URL, session)
|
||
|
|
check = UpdatesAvailableCheck(client, storage, _settings())
|
||
|
|
results = await check.run()
|
||
|
|
|
||
|
|
assert len(results) == 2
|
||
|
|
for r in results:
|
||
|
|
assert r.event_type == HAEventType.ha_update_available
|
||
|
|
emitter.emit(
|
||
|
|
event_type=r.event_type,
|
||
|
|
severity=r.severity.value,
|
||
|
|
service="homeassistant",
|
||
|
|
message=r.message,
|
||
|
|
payload=r.payload,
|
||
|
|
)
|
||
|
|
|
||
|
|
files = list(events_dir.glob("*.json"))
|
||
|
|
assert len(files) == 2
|
||
|
|
for f in files:
|
||
|
|
data = json.loads(f.read_text())
|
||
|
|
assert data["type"] == "ha_update_available"
|
||
|
|
assert data["payload"]["location_tag"] == "ken"
|
||
|
|
assert "entity_id" in data["payload"]
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.integration
|
||
|
|
async def test_digest_writes_single_event_file(storage: Storage, events_dir: Path):
|
||
|
|
"""Sunday digest → single event file with digest=True payload."""
|
||
|
|
states = _update_states("update.ha_core", "update.mosquitto", "update.esphome")
|
||
|
|
emitter = EventEmitter(events_dir, node_name="piha", location_tag="ken")
|
||
|
|
|
||
|
|
with aioresponses() as m:
|
||
|
|
m.get(f"{HA_URL}/api/states", payload=states)
|
||
|
|
async with make_session("test-token") as session:
|
||
|
|
client = HAClient(HA_URL, session)
|
||
|
|
check = UpdatesDigestCheck(client, storage, _settings())
|
||
|
|
results = await check.run()
|
||
|
|
|
||
|
|
assert len(results) == 1
|
||
|
|
r = results[0]
|
||
|
|
assert r.payload["digest"] is True
|
||
|
|
assert r.payload["count"] == 3
|
||
|
|
|
||
|
|
emitter.emit(
|
||
|
|
event_type=r.event_type,
|
||
|
|
severity=r.severity.value,
|
||
|
|
service="homeassistant",
|
||
|
|
message=r.message,
|
||
|
|
payload=r.payload,
|
||
|
|
)
|
||
|
|
files = list(events_dir.glob("*.json"))
|
||
|
|
assert len(files) == 1
|
||
|
|
data = json.loads(files[0].read_text())
|
||
|
|
assert data["payload"]["digest"] is True
|
||
|
|
assert len(data["payload"]["updates"]) == 3
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.integration
|
||
|
|
async def test_dedup_across_daily_and_digest_independent(storage: Storage):
|
||
|
|
"""Daily dedup key doesn't suppress digest, and vice versa."""
|
||
|
|
states = _update_states("update.ha_core")
|
||
|
|
settings = _settings(updates_cooldown_days=7)
|
||
|
|
|
||
|
|
# Daily check
|
||
|
|
with aioresponses() as m:
|
||
|
|
m.get(f"{HA_URL}/api/states", payload=states)
|
||
|
|
async with make_session("test-token") as session:
|
||
|
|
r1 = await UpdatesAvailableCheck(
|
||
|
|
HAClient(HA_URL, session), storage, settings
|
||
|
|
).run()
|
||
|
|
assert len(r1) == 1
|
||
|
|
|
||
|
|
# Daily again — cooldown active
|
||
|
|
with aioresponses() as m:
|
||
|
|
m.get(f"{HA_URL}/api/states", payload=states)
|
||
|
|
async with make_session("test-token") as session:
|
||
|
|
r2 = await UpdatesAvailableCheck(
|
||
|
|
HAClient(HA_URL, session), storage, settings
|
||
|
|
).run()
|
||
|
|
assert r2 == []
|
||
|
|
|
||
|
|
# Digest — different key, should still fire
|
||
|
|
with aioresponses() as m:
|
||
|
|
m.get(f"{HA_URL}/api/states", payload=states)
|
||
|
|
async with make_session("test-token") as session:
|
||
|
|
r3 = await UpdatesDigestCheck(
|
||
|
|
HAClient(HA_URL, session), storage, settings
|
||
|
|
).run()
|
||
|
|
assert len(r3) == 1
|
||
|
|
assert r3[0].payload["digest"] is True
|