homelab-codex-ws/services/ha-diag-agent/tests/test_event_emitter.py
Oskar Kapala ab8895d28b feat(ha-diag-agent): scaffold service with HA REST client and event emitter
- new per-host service, follows node-agent pattern
- 7 new HA event types defined (routing in supervisor — Phase 5)
- HeartbeatCheck as pipeline validator (pings /api/, emits ha_websocket_dead)
- service.yaml + host configs for piha (ken) and chelsty-infra (chelsty)
- test scaffolding with aiohttp/aiosqlite mocks (15/15 passing)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 12:26:34 +02:00

63 lines
1.9 KiB
Python

"""Tests for EventEmitter."""
from __future__ import annotations
import json
from pathlib import Path
import pytest
from ha_diag.event_emitter import EventEmitter
def test_emit_creates_json_file(tmp_events_dir: Path, emitter: EventEmitter):
event_id = emitter.emit(
event_type="ha_websocket_dead",
severity="error",
service="homeassistant",
message="HA unreachable",
payload={"error": "timeout"},
)
files = list(tmp_events_dir.glob("*.json"))
assert len(files) == 1
assert files[0].name == f"{event_id}.json"
def test_emit_event_schema(tmp_events_dir: Path, emitter: EventEmitter):
event_id = emitter.emit(
event_type="ha_websocket_dead",
severity="error",
service="homeassistant",
message="HA unreachable",
payload={"error": "timeout"},
)
data = json.loads((tmp_events_dir / f"{event_id}.json").read_text())
assert data["id"] == event_id
assert data["type"] == "ha_websocket_dead"
assert data["severity"] == "error"
assert data["node"] == "test-node"
assert data["service"] == "homeassistant"
assert data["message"] == "HA unreachable"
assert data["payload"] == {"error": "timeout"}
assert "timestamp" in data
assert "date" in data
def test_emit_multiple_events_unique_files(tmp_events_dir: Path, emitter: EventEmitter):
ids = [
emitter.emit("ha_websocket_dead", "error", "homeassistant", f"msg {i}")
for i in range(3)
]
assert len(set(ids)) == 3
assert len(list(tmp_events_dir.glob("*.json"))) == 3
def test_emit_no_tmp_file_left(tmp_events_dir: Path, emitter: EventEmitter):
emitter.emit("ha_websocket_dead", "error", "homeassistant", "msg")
assert not list(tmp_events_dir.glob("*.tmp"))
def test_emitter_creates_events_dir(tmp_path: Path):
new_dir = tmp_path / "nested" / "events"
emitter = EventEmitter(new_dir, "my-node")
assert new_dir.exists()