homelab-codex-ws/services/ha-diag-agent/tests/integration/test_websocket_monitor_integration.py

187 lines
6.4 KiB
Python
Raw Normal View History

"""Integration tests for WebSocketMonitor against real HA instances.
Requires:
docker compose -f tests/integration/docker-compose.ken.yml up -d
tests/integration/scripts/wait-for-ha.sh http://localhost:8123
TEST_HA_TOKEN=<long-lived-token> pytest tests/ -m integration
Container stop/restart tests additionally need Docker access from the host.
"""
from __future__ import annotations
import asyncio
import subprocess
import time
from pathlib import Path
import pytest
from ha_diag.config import Settings
from ha_diag.event_emitter import EventEmitter
from ha_diag.models import HAEventType
from ha_diag.monitors.websocket_monitor import WebSocketMonitor
from ha_diag.ha_client import make_session
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_settings(ha_url: str, ha_token: str, **overrides) -> Settings:
defaults: dict = {
"ha_url": ha_url,
"ha_token": ha_token,
"node_name": "test-piha",
"location_tag": "ken",
"websocket_enabled": True,
"websocket_silence_threshold_seconds": 30, # low for fast test
"websocket_watchdog_interval_seconds": 5,
"websocket_reconnect_initial_delay": 1.0,
"websocket_reconnect_max_delay": 10.0,
"websocket_reconnect_jitter": 0.0,
"websocket_down_alert_repeat_minutes": 0, # always re-alert
}
defaults.update(overrides)
return Settings(**defaults)
def _emitted_types(events_dir: Path) -> list[str]:
return [
__import__("json").loads(f.read_text())["type"]
for f in sorted(events_dir.glob("*.json"))
]
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
@pytest.mark.integration
async def test_ws_normal_operation_no_false_alerts(
ha_ken_url: str, ha_token: str, tmp_path: Path
):
"""Normal operation: monitor connects, subscribes, no dead alerts emitted."""
events_dir = tmp_path / "events"
events_dir.mkdir()
settings = _make_settings(ha_ken_url, ha_token)
emitter = EventEmitter(events_dir, node_name="test-piha", location_tag="ken")
async with make_session(ha_token) as session:
monitor = WebSocketMonitor(
ha_url=ha_ken_url,
token=ha_token,
settings=settings,
emitter=emitter,
session=session,
)
await monitor.start()
await asyncio.sleep(5) # let it connect and settle
assert monitor.is_healthy, "Monitor should be subscribed and healthy"
await monitor.stop()
# No dead alerts during normal operation
types = _emitted_types(events_dir)
assert HAEventType.ha_websocket_dead.value not in types, (
f"Unexpected dead alerts during normal operation: {types}"
)
@pytest.mark.integration
async def test_ws_dead_emitted_when_ha_stops(ha_ken_url: str, ha_token: str, tmp_path: Path):
"""Stopping the HA container triggers ha_websocket_dead."""
events_dir = tmp_path / "events"
events_dir.mkdir()
settings = _make_settings(ha_ken_url, ha_token)
emitter = EventEmitter(events_dir, node_name="test-piha", location_tag="ken")
async with make_session(ha_token) as session:
monitor = WebSocketMonitor(
ha_url=ha_ken_url,
token=ha_token,
settings=settings,
emitter=emitter,
session=session,
)
await monitor.start()
# Wait for initial subscription
for _ in range(20):
if monitor.is_healthy:
break
await asyncio.sleep(0.5)
assert monitor.is_healthy, "Monitor did not subscribe within 10s"
# Stop HA container
subprocess.run(
["docker", "stop", "ha-test-ken"],
check=True, capture_output=True, timeout=30,
)
try:
# Wait for dead alert (up to 15s)
deadline = time.monotonic() + 15
while time.monotonic() < deadline:
types = _emitted_types(events_dir)
if HAEventType.ha_websocket_dead.value in types:
break
await asyncio.sleep(0.5)
types = _emitted_types(events_dir)
assert HAEventType.ha_websocket_dead.value in types, (
"ha_websocket_dead not emitted after HA container stopped"
)
finally:
await monitor.stop()
subprocess.run(
["docker", "start", "ha-test-ken"],
check=False, capture_output=True, timeout=30,
)
@pytest.mark.integration
async def test_ws_recovered_after_ha_restart(ha_ken_url: str, ha_token: str, tmp_path: Path):
"""After HA restarts, monitor reconnects and emits ha_websocket_recovered."""
events_dir = tmp_path / "events"
events_dir.mkdir()
settings = _make_settings(ha_ken_url, ha_token)
emitter = EventEmitter(events_dir, node_name="test-piha", location_tag="ken")
async with make_session(ha_token) as session:
monitor = WebSocketMonitor(
ha_url=ha_ken_url,
token=ha_token,
settings=settings,
emitter=emitter,
session=session,
)
await monitor.start()
for _ in range(20):
if monitor.is_healthy:
break
await asyncio.sleep(0.5)
assert monitor.is_healthy
# Stop then restart HA
subprocess.run(["docker", "stop", "ha-test-ken"], check=True, timeout=30)
await asyncio.sleep(2)
subprocess.run(["docker", "start", "ha-test-ken"], check=True, timeout=30)
try:
# Wait for recovery (up to 60s — HA takes time to start)
deadline = time.monotonic() + 60
while time.monotonic() < deadline:
types = _emitted_types(events_dir)
if HAEventType.ha_websocket_recovered.value in types:
break
await asyncio.sleep(1.0)
types = _emitted_types(events_dir)
assert HAEventType.ha_websocket_dead.value in types, (
"ha_websocket_dead not emitted after container stop"
)
assert HAEventType.ha_websocket_recovered.value in types, (
"ha_websocket_recovered not emitted after HA restarted"
)
finally:
await monitor.stop()