"""Integration tests for HeartbeatCheck against real HA instances. Requires: - docker compose -f tests/integration/docker-compose.ken.yml up -d - docker compose -f tests/integration/docker-compose.chelsty.yml up -d - TEST_HA_TOKEN= pytest tests/ -m integration """ from __future__ import annotations import pytest from ha_diag.checks.heartbeat import HeartbeatCheck from ha_diag.event_emitter import EventEmitter from ha_diag.ha_client import HAClient, make_session from ha_diag.models import HAEventType @pytest.mark.integration async def test_heartbeat_ken_healthy(ha_ken_url: str, ha_token: str): async with make_session(ha_token) as session: client = HAClient(ha_ken_url, session) check = HeartbeatCheck(client) results = await check.run() assert results == [], f"HA ken not healthy: {results}" @pytest.mark.integration async def test_heartbeat_chelsty_healthy(ha_chelsty_url: str, ha_token: str): async with make_session(ha_token) as session: client = HAClient(ha_chelsty_url, session) check = HeartbeatCheck(client) results = await check.run() assert results == [], f"HA chelsty not healthy: {results}" @pytest.mark.integration async def test_heartbeat_emits_event_on_failure(): """Connecting to a closed port should yield ha_websocket_dead.""" async with make_session("bad-token") as session: client = HAClient("http://127.0.0.1:19999", session) # nothing here check = HeartbeatCheck(client) results = await check.run() assert len(results) == 1 assert results[0].event_type == HAEventType.ha_websocket_dead @pytest.mark.integration async def test_heartbeat_event_written_to_filesystem( ha_ken_url: str, ha_token: str, tmp_path ): emitter = EventEmitter(tmp_path / "events", node_name="test-piha", location_tag="ken") async with make_session(ha_token) as session: client = HAClient(ha_ken_url, session) check = HeartbeatCheck(client) results = await check.run() # Healthy HA → no events assert results == [] assert not list((tmp_path / "events").glob("*.json"))