- persistent WS connection to HA with auth + state_changed subscription - watchdog detects silence > 5min → emits ha_websocket_dead - immediate ha_websocket_dead on disconnect, exponential reconnect with jitter - cooldown prevents alert spam (10min repeat window while HA stays down) - ha_websocket_recovered emitted on reconnect after a dead alert (allows supervisor to clear active incidents in Phase 5) - new monitors/ subpackage for long-running tasks (vs interval checks/) - /health endpoint now includes ws_connected field - 26 unit tests, 3 integration tests (real HA + container stop/restart) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
187 lines
6.4 KiB
Python
187 lines
6.4 KiB
Python
"""Integration tests for WebSocketMonitor against real HA instances.
|
|
|
|
Requires:
|
|
docker compose -f tests/integration/docker-compose.ken.yml up -d
|
|
tests/integration/scripts/wait-for-ha.sh http://localhost:8123
|
|
TEST_HA_TOKEN=<long-lived-token> pytest tests/ -m integration
|
|
|
|
Container stop/restart tests additionally need Docker access from the host.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import subprocess
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from ha_diag.config import Settings
|
|
from ha_diag.event_emitter import EventEmitter
|
|
from ha_diag.models import HAEventType
|
|
from ha_diag.monitors.websocket_monitor import WebSocketMonitor
|
|
from ha_diag.ha_client import make_session
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_settings(ha_url: str, ha_token: str, **overrides) -> Settings:
|
|
defaults: dict = {
|
|
"ha_url": ha_url,
|
|
"ha_token": ha_token,
|
|
"node_name": "test-piha",
|
|
"location_tag": "ken",
|
|
"websocket_enabled": True,
|
|
"websocket_silence_threshold_seconds": 30, # low for fast test
|
|
"websocket_watchdog_interval_seconds": 5,
|
|
"websocket_reconnect_initial_delay": 1.0,
|
|
"websocket_reconnect_max_delay": 10.0,
|
|
"websocket_reconnect_jitter": 0.0,
|
|
"websocket_down_alert_repeat_minutes": 0, # always re-alert
|
|
}
|
|
defaults.update(overrides)
|
|
return Settings(**defaults)
|
|
|
|
|
|
def _emitted_types(events_dir: Path) -> list[str]:
|
|
return [
|
|
__import__("json").loads(f.read_text())["type"]
|
|
for f in sorted(events_dir.glob("*.json"))
|
|
]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.integration
|
|
async def test_ws_normal_operation_no_false_alerts(
|
|
ha_ken_url: str, ha_token: str, tmp_path: Path
|
|
):
|
|
"""Normal operation: monitor connects, subscribes, no dead alerts emitted."""
|
|
events_dir = tmp_path / "events"
|
|
events_dir.mkdir()
|
|
settings = _make_settings(ha_ken_url, ha_token)
|
|
emitter = EventEmitter(events_dir, node_name="test-piha", location_tag="ken")
|
|
|
|
async with make_session(ha_token) as session:
|
|
monitor = WebSocketMonitor(
|
|
ha_url=ha_ken_url,
|
|
token=ha_token,
|
|
settings=settings,
|
|
emitter=emitter,
|
|
session=session,
|
|
)
|
|
await monitor.start()
|
|
await asyncio.sleep(5) # let it connect and settle
|
|
assert monitor.is_healthy, "Monitor should be subscribed and healthy"
|
|
await monitor.stop()
|
|
|
|
# No dead alerts during normal operation
|
|
types = _emitted_types(events_dir)
|
|
assert HAEventType.ha_websocket_dead.value not in types, (
|
|
f"Unexpected dead alerts during normal operation: {types}"
|
|
)
|
|
|
|
|
|
@pytest.mark.integration
|
|
async def test_ws_dead_emitted_when_ha_stops(ha_ken_url: str, ha_token: str, tmp_path: Path):
|
|
"""Stopping the HA container triggers ha_websocket_dead."""
|
|
events_dir = tmp_path / "events"
|
|
events_dir.mkdir()
|
|
settings = _make_settings(ha_ken_url, ha_token)
|
|
emitter = EventEmitter(events_dir, node_name="test-piha", location_tag="ken")
|
|
|
|
async with make_session(ha_token) as session:
|
|
monitor = WebSocketMonitor(
|
|
ha_url=ha_ken_url,
|
|
token=ha_token,
|
|
settings=settings,
|
|
emitter=emitter,
|
|
session=session,
|
|
)
|
|
await monitor.start()
|
|
# Wait for initial subscription
|
|
for _ in range(20):
|
|
if monitor.is_healthy:
|
|
break
|
|
await asyncio.sleep(0.5)
|
|
assert monitor.is_healthy, "Monitor did not subscribe within 10s"
|
|
|
|
# Stop HA container
|
|
subprocess.run(
|
|
["docker", "stop", "ha-test-ken"],
|
|
check=True, capture_output=True, timeout=30,
|
|
)
|
|
try:
|
|
# Wait for dead alert (up to 15s)
|
|
deadline = time.monotonic() + 15
|
|
while time.monotonic() < deadline:
|
|
types = _emitted_types(events_dir)
|
|
if HAEventType.ha_websocket_dead.value in types:
|
|
break
|
|
await asyncio.sleep(0.5)
|
|
|
|
types = _emitted_types(events_dir)
|
|
assert HAEventType.ha_websocket_dead.value in types, (
|
|
"ha_websocket_dead not emitted after HA container stopped"
|
|
)
|
|
finally:
|
|
await monitor.stop()
|
|
subprocess.run(
|
|
["docker", "start", "ha-test-ken"],
|
|
check=False, capture_output=True, timeout=30,
|
|
)
|
|
|
|
|
|
@pytest.mark.integration
|
|
async def test_ws_recovered_after_ha_restart(ha_ken_url: str, ha_token: str, tmp_path: Path):
|
|
"""After HA restarts, monitor reconnects and emits ha_websocket_recovered."""
|
|
events_dir = tmp_path / "events"
|
|
events_dir.mkdir()
|
|
settings = _make_settings(ha_ken_url, ha_token)
|
|
emitter = EventEmitter(events_dir, node_name="test-piha", location_tag="ken")
|
|
|
|
async with make_session(ha_token) as session:
|
|
monitor = WebSocketMonitor(
|
|
ha_url=ha_ken_url,
|
|
token=ha_token,
|
|
settings=settings,
|
|
emitter=emitter,
|
|
session=session,
|
|
)
|
|
await monitor.start()
|
|
for _ in range(20):
|
|
if monitor.is_healthy:
|
|
break
|
|
await asyncio.sleep(0.5)
|
|
assert monitor.is_healthy
|
|
|
|
# Stop then restart HA
|
|
subprocess.run(["docker", "stop", "ha-test-ken"], check=True, timeout=30)
|
|
await asyncio.sleep(2)
|
|
subprocess.run(["docker", "start", "ha-test-ken"], check=True, timeout=30)
|
|
|
|
try:
|
|
# Wait for recovery (up to 60s — HA takes time to start)
|
|
deadline = time.monotonic() + 60
|
|
while time.monotonic() < deadline:
|
|
types = _emitted_types(events_dir)
|
|
if HAEventType.ha_websocket_recovered.value in types:
|
|
break
|
|
await asyncio.sleep(1.0)
|
|
|
|
types = _emitted_types(events_dir)
|
|
assert HAEventType.ha_websocket_dead.value in types, (
|
|
"ha_websocket_dead not emitted after container stop"
|
|
)
|
|
assert HAEventType.ha_websocket_recovered.value in types, (
|
|
"ha_websocket_recovered not emitted after HA restarted"
|
|
)
|
|
finally:
|
|
await monitor.stop()
|