- dockerized ken + chelsty HA test instances with template fixtures - snapshot/reset/wait scripts for fixture management - integration test infrastructure with separate marker - location_tag promoted from metadata to event payload (Phase 1 flag #3) - chelsty-infra target_url points to chelsty-ha via tailnet (Phase 1 flag #1) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
54 lines
1.9 KiB
Python
54 lines
1.9 KiB
Python
"""Integration tests for HeartbeatCheck against real HA instances.
|
|
|
|
Requires:
|
|
- docker compose -f tests/integration/docker-compose.ken.yml up -d
|
|
- docker compose -f tests/integration/docker-compose.chelsty.yml up -d
|
|
- TEST_HA_TOKEN=<long-lived-token> pytest tests/ -m integration
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
|
|
from ha_diag.checks.heartbeat import HeartbeatCheck
|
|
from ha_diag.event_emitter import EventEmitter
|
|
from ha_diag.ha_client import HAClient
|
|
|
|
|
|
@pytest.mark.integration
|
|
async def test_heartbeat_ken_healthy(ha_ken_url: str, ha_token: str, tmp_path):
|
|
client = HAClient(ha_ken_url, ha_token)
|
|
check = HeartbeatCheck(client)
|
|
result = await check.run()
|
|
assert result.healthy is True, f"HA ken not healthy: {result.message}"
|
|
assert result.event_type is None
|
|
|
|
|
|
@pytest.mark.integration
|
|
async def test_heartbeat_chelsty_healthy(ha_chelsty_url: str, ha_token: str):
|
|
client = HAClient(ha_chelsty_url, ha_token)
|
|
check = HeartbeatCheck(client)
|
|
result = await check.run()
|
|
assert result.healthy is True, f"HA chelsty not healthy: {result.message}"
|
|
assert result.event_type is None
|
|
|
|
|
|
@pytest.mark.integration
|
|
async def test_heartbeat_emits_event_on_failure(tmp_path):
|
|
client = HAClient("http://127.0.0.1:19999", "bad-token") # nothing here
|
|
check = HeartbeatCheck(client)
|
|
result = await check.run()
|
|
assert result.healthy is False
|
|
assert result.event_type == "ha_websocket_dead"
|
|
|
|
|
|
@pytest.mark.integration
|
|
async def test_heartbeat_event_written_to_filesystem(ha_ken_url: str, ha_token: str, tmp_path):
|
|
emitter = EventEmitter(tmp_path / "events", node_name="test-piha", location_tag="ken")
|
|
client = HAClient(ha_ken_url, ha_token)
|
|
check = HeartbeatCheck(client)
|
|
result = await check.run()
|
|
|
|
assert result.healthy is True
|
|
# No event emitted for a healthy result
|
|
assert not list((tmp_path / "events").glob("*.json")) or result.event_type is None
|