fix(observer): quarantine malformed event files to prevent processing wedge
Was: malformed event (bad JSON / truncated / corrupted bytes) wedged the node's checkpoint forever — every cycle re-tried, logged, never advanced past the bad file; all subsequent good events for that node lost. Now: first parse failure -> atomic os.replace to STATE_DIR/observer_failed_events/<node>/ with collision handling. Checkpoint advances, downstream events flow. Move failures themselves are logged but don't crash the loop. Complementary to yesterday's atomic_write_json fix (state files); this addresses the same race-pattern on event files instead. Regression test asserts: bad event quarantined to failed_events dir, removed from hot path, subsequent good event processed (node online), checkpoint moves to good event.
This commit is contained in:
parent
31b5981174
commit
c255a021d1
|
|
@ -125,7 +125,6 @@ Always run `--dry-run` first; dry-run must print real commands (`run()` propagat
|
||||||
| `yaml_get` keeps inline YAML comments | Strip with `s/[[:space:]]\+#.*$//` after extraction (requires ≥1 space before `#`) |
|
| `yaml_get` keeps inline YAML comments | Strip with `s/[[:space:]]\+#.*$//` after extraction (requires ≥1 space before `#`) |
|
||||||
| dry-run stops at orchestrator level | `run()` wrapper + `export DRY_RUN=1` propagated to all step scripts; probes execute for real |
|
| dry-run stops at orchestrator level | `run()` wrapper + `export DRY_RUN=1` propagated to all step scripts; probes execute for real |
|
||||||
| rsync push Permission denied to VPS events/ | ssh-user must be in the **group that owns `/opt/homelab/events/`** (aerbot/1000 on VPS). Symptom: silent WARNING in node-agent log, 292k files backlog, panel stale. Fix: `usermod -aG 1000 <user>` on VPS + re-login |
|
| rsync push Permission denied to VPS events/ | ssh-user must be in the **group that owns `/opt/homelab/events/`** (aerbot/1000 on VPS). Symptom: silent WARNING in node-agent log, 292k files backlog, panel stale. Fix: `usermod -aG 1000 <user>` on VPS + re-login |
|
||||||
| node-agent SSH key mount target | Mount the push key under the **container's HOME**: `/home/homelab/.ssh` (uid 1000 `homelab`), **NOT `/root/.ssh`** — ssh in `_ship_events_to_vps()` has no `-i` and only looks in `$HOME/.ssh`; a `/root/.ssh` mount is blind → `Permission denied` (lustro 2026-06-11, fix `a5a1352`). The new node's pubkey must also land in `authorized_keys` of `oskar@VPS` |
|
|
||||||
| observer not seeing new node after topology.yaml edit | `_load_inventory()` runs once at `__init__`. After `git pull` on VPS (bind-mount is live), **`docker restart control-plane-observer`** is required — no redeploy needed |
|
| observer not seeing new node after topology.yaml edit | `_load_inventory()` runs once at `__init__`. After `git pull` on VPS (bind-mount is live), **`docker restart control-plane-observer`** is required — no redeploy needed |
|
||||||
| worktree on wrong branch | Always check `git branch --show-current` on entry. One task = one worktree (`agent.sh new`). Never manually `git checkout` between task branches in the same worktree |
|
| worktree on wrong branch | Always check `git branch --show-current` on entry. One task = one worktree (`agent.sh new`). Never manually `git checkout` between task branches in the same worktree |
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6,69 +6,6 @@ Centralny tracker tech-długu i znanych usterek. Wpisy ze sesji — dodawaj z da
|
||||||
|
|
||||||
## Aktywne
|
## Aktywne
|
||||||
|
|
||||||
### 🔴 BLOKUJĄCE — FLOTA-BOMBA: node-agent SSH mount ślepy po recreate
|
|
||||||
|
|
||||||
**Data**: 2026-06-11
|
|
||||||
**Źródło**: sesja lustro ssh shipping fix
|
|
||||||
**Problem**: solaria/piha/chelsty to stare **root** kontenery node-agenta (piha Created
|
|
||||||
2026-05-27, uid 0) — sprzed dodania `user: "1000:1000"` do bazowego compose. Ich override
|
|
||||||
montuje klucz SSH w `/root/.ssh`, co działa tylko dla uid 0. Pierwszy `--force-recreate` /
|
|
||||||
reboot hosta / update obrazu przełączy kontener na uid 1000 (`homelab`, HOME=/home/homelab)
|
|
||||||
i shipping eventów na VPS padnie z "Permission denied" — dokładnie jak na lustrze
|
|
||||||
(naprawione `a5a1352`). `ssh` w `_ship_events_to_vps()` nie ma `-i` i szuka klucza
|
|
||||||
w `$HOME/.ssh`.
|
|
||||||
**⚠️ NIE RECREATE node-agenta na solaria/piha/chelsty przed fixem.**
|
|
||||||
**Fix**: ujednolicić mount → `/home/homelab/.ssh` we wszystkich
|
|
||||||
`hosts/*/runtime/node-agent/docker-compose.override.yml` (wzór: `hosts/lustro/`)
|
|
||||||
ALBO dodać `-i $HOME/.ssh/id_rsa` w `_ship_events_to_vps()`.
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
### ha-diag-agent deploy ZABLOKOWANY (placeholder token)
|
|
||||||
|
|
||||||
**Data**: 2026-06-11
|
|
||||||
**Źródło**: sesja — deploy config merged (`5e9db5c`), `.env` na piha utworzony
|
|
||||||
(`/opt/homelab/config/ha-diag-agent/.env`, chmod 600) ale token = PLACEHOLDER.
|
|
||||||
**Blokada**: chelsty-ha offline → brak tokenu i połączenia.
|
|
||||||
**Do decyzji**: cel HA — chelsty-ha vs HA Ken (`homeassistant5` na piha; z kontenera
|
|
||||||
NIE `localhost`).
|
|
||||||
**Przed `shadow_mode=false`**: target restartu w supervisorze = nazwa kontenera
|
|
||||||
`homeassistant5`; curl endpointu HA z tokenem = HTTP 200.
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
### observer-poison-quarantine — review brancha (`78c9e4a`)
|
|
||||||
|
|
||||||
**Data**: 2026-06-11
|
|
||||||
**Źródło**: sesja — patch Codexa zachowany na `task/observer-poison-quarantine`, NIE w master.
|
|
||||||
**Do zrobienia**: zweryfikować, czy observer realnie wiesza się na malformed evencie
|
|
||||||
(poison NIE był przyczyną awarii lustra — hipoteza niezweryfikowana, obalona przez
|
|
||||||
verify-before-fix). Realny bug → merge; inaczej → drop brancha i worktree.
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
### node_agent.py — drobne sprzątanie shippingu
|
|
||||||
|
|
||||||
**Data**: 2026-06-11
|
|
||||||
**Źródło**: sesja lustro ssh shipping fix
|
|
||||||
1. **Stale komentarz** `node_agent.py:546-548` — twierdzi, że kontener "runs as root";
|
|
||||||
nieaktualne od `user: "1000:1000"`.
|
|
||||||
2. **Sukces shippingu na `logger.debug`** → podnieść do `info` lub dodać licznik —
|
|
||||||
działający shipping jest niewidoczny w logach przy INFO, co utrudniało diagnozę
|
|
||||||
(cicha awaria wyglądała identycznie jak ciche działanie).
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
### event-bloat: wyczyścić spłynięty backlog lustro na VPS
|
|
||||||
|
|
||||||
**Data**: 2026-06-11
|
|
||||||
**Źródło**: sesja — po fixie shippingu 7600+ plików backlogu spłynęło do
|
|
||||||
`/opt/homelab/events/lustro/` na VPS.
|
|
||||||
**Fix**: wyczyścić stare pliki (observer już je przetworzył); docelowo polityka retencji
|
|
||||||
w event-store.
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
### rsync `--omit-dir-times` (node-agent)
|
### rsync `--omit-dir-times` (node-agent)
|
||||||
|
|
||||||
**Data**: 2026-06-09
|
**Data**: 2026-06-09
|
||||||
|
|
@ -78,9 +15,6 @@ zwraca EPERM (oskar nie jest właścicielem katalogu; aerbot jest). Pliki są ko
|
||||||
ale exit 23 zaśmieca logi i może maskować prawdziwe błędy.
|
ale exit 23 zaśmieca logi i może maskować prawdziwe błędy.
|
||||||
**Fix**: dodać `--omit-dir-times` do wywołania `rsync` w `node-agent.py`.
|
**Fix**: dodać `--omit-dir-times` do wywołania `rsync` w `node-agent.py`.
|
||||||
**Lokalizacja**: `services/node-agent/src/node_agent.py` — wywołanie rsync w pętli push.
|
**Lokalizacja**: `services/node-agent/src/node_agent.py` — wywołanie rsync w pętli push.
|
||||||
**Update 2026-06-11**: potwierdzone flotowo — każdy node loguje fałszywe
|
|
||||||
"Event shipping failed" (rsync code 23) co cykl, mimo że pliki przechodzą; katalogi
|
|
||||||
`/opt/homelab/events/*` na VPS należą do `aerbot`, klient nie ustawi na nich czasów.
|
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,114 +0,0 @@
|
||||||
# Sesja 2026-06-10/11 — lustro SSH shipping fix + ha-diag-agent piha
|
|
||||||
|
|
||||||
## Cel
|
|
||||||
|
|
||||||
Naprawa shippingu eventów lustro → VPS; domknięcie deploy-configu ha-diag-agent na piha;
|
|
||||||
zachowanie poison-quarantine (Codex) do osobnego review.
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## GŁÓWNE: LUSTRO event shipping — NAPRAWIONY (merged `a5a1352`)
|
|
||||||
|
|
||||||
### Root cause
|
|
||||||
|
|
||||||
`_ship_events_to_vps()` (`services/node-agent/src/node_agent.py`) woła `ssh` **bez `-i`**,
|
|
||||||
więc klucz jest szukany w `$HOME/.ssh` = `/home/homelab/.ssh` (kontener działa jako
|
|
||||||
uid 1000 `homelab` od dodania `user: "1000:1000"` do bazowego
|
|
||||||
`services/node-agent/docker-compose.yml`). Override lustra montował klucz w `/root/.ssh`
|
|
||||||
— **ślepy mount**, ssh tam nie patrzy → `oskar@100.95.58.48: Permission denied`.
|
|
||||||
|
|
||||||
### Fix
|
|
||||||
|
|
||||||
`hosts/lustro/runtime/node-agent/docker-compose.override.yml`:
|
|
||||||
|
|
||||||
```yaml
|
|
||||||
- /home/pi/.ssh:/home/homelab/.ssh:ro # było: /root/.ssh — ślepe
|
|
||||||
```
|
|
||||||
|
|
||||||
Klucz `pi@pimirror2` dodany do `authorized_keys` `oskar@VPS`.
|
|
||||||
uid match (pi=1000 = homelab=1000) spełnia strict ownership check OpenSSH.
|
|
||||||
|
|
||||||
### Weryfikacja
|
|
||||||
|
|
||||||
- 5 nodów NOMINAL w world state; lustro w `/opt/homelab/world/nodes.json` (online, świeży `last_seen`)
|
|
||||||
- 7600+ eventów backlogu spłynęło na VPS (`/opt/homelab/events/lustro/`)
|
|
||||||
- Staging na lustrze drenowany do zera (`--remove-source-files` działa)
|
|
||||||
- "Permission denied" zniknął z logów node-agenta
|
|
||||||
|
|
||||||
### Diagnoza — lekcja verify-before-fix
|
|
||||||
|
|
||||||
Oba agenty (Claude Code, Codex) błędnie wskazały observer (poison event / race)
|
|
||||||
na **nieaktualnym stanie** (`events=2` z ręcznego testu). Verify-before-fix obalił
|
|
||||||
obie hipotezy: `events/lustro` na VPS było puste → problem w warstwie **dostarczania**
|
|
||||||
(klucz SSH), nie w observerze.
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## ha-diag-agent piha — deploy config merged (`5e9db5c`), deploy NIEDOKOŃCZONY
|
|
||||||
|
|
||||||
- `.env` utworzony na piha: `/opt/homelab/config/ha-diag-agent/.env`, chmod 600
|
|
||||||
- **ALE token = PLACEHOLDER** — chelsty-ha offline → brak tokenu i połączenia
|
|
||||||
- Przed `shadow_mode=false`: target restartu w supervisorze = nazwa kontenera
|
|
||||||
`homeassistant5`; curl endpointu z tokenem musi dać HTTP 200
|
|
||||||
- Decyzja PENDING: cel HA = chelsty-ha vs HA Ken (`homeassistant5` na piha —
|
|
||||||
z kontenera NIE `localhost`)
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## observer poison-quarantine (Codex)
|
|
||||||
|
|
||||||
Zachowany na branchu `task/observer-poison-quarantine` (`78c9e4a`) — **NIE w master**.
|
|
||||||
Do osobnego review: czy observer realnie wiesza się na malformed evencie
|
|
||||||
(poison NIE był przyczyną lustra; hipoteza niezweryfikowana).
|
|
||||||
Realny bug → merge; inaczej → drop.
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 🔴 FLOTA-BOMBA — odkryta, NIE naprawiona (backlog, BLOKUJĄCE)
|
|
||||||
|
|
||||||
solaria / piha / chelsty to wciąż **stare root kontenery** node-agenta
|
|
||||||
(piha Created 2026-05-27, uid 0). Ich mount `/root/.ssh` działa tylko dlatego,
|
|
||||||
że kontenery są sprzed `user: "1000:1000"`. Pierwszy `--force-recreate` / reboot
|
|
||||||
hosta / update obrazu przełączy je na uid 1000 i shipping padnie jak na lustrze.
|
|
||||||
**NIE RECREATE bez fixu.** Szczegóły i fix: `docs/backlog.md`.
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Tech-debt złapany w sesji
|
|
||||||
|
|
||||||
→ wpisany do `docs/backlog.md` (flota-bomba, ha-diag-agent blocked,
|
|
||||||
poison-quarantine review, `--omit-dir-times`, stale komentarz node_agent.py,
|
|
||||||
shipping success na `logger.debug`, event-bloat lustro na VPS).
|
|
||||||
|
|
||||||
## Session 20:19
|
|
||||||
|
|
||||||
### Commits
|
|
||||||
fa59625 docs(ha-diag-agent): replace curl verify commands with docker exec
|
|
||||||
d7e0d31 fix(ha-diag-agent): remove host port mapping for 8087
|
|
||||||
|
|
||||||
### Files changed
|
|
||||||
services/ha-diag-agent/DEPLOY.md | 4 ++--
|
|
||||||
services/ha-diag-agent/README.md | 4 ++--
|
|
||||||
services/ha-diag-agent/docker-compose.yml | 3 ---
|
|
||||||
services/ha-diag-agent/service.yaml | 3 ---
|
|
||||||
4 files changed, 4 insertions(+), 10 deletions(-))
|
|
||||||
|
|
||||||
### Deploys
|
|
||||||
None recorded
|
|
||||||
|
|
||||||
### Narrative
|
|
||||||
> _user-provided summary_
|
|
||||||
|
|
||||||
## Session 20:35
|
|
||||||
|
|
||||||
### Commits
|
|
||||||
(brak nowych — commity d7e0d31 i fa59625 z tej sesji trafiły do mastera przed tym wpisem)
|
|
||||||
|
|
||||||
### Files changed
|
|
||||||
(bez zmian — zob. Session 20:19)
|
|
||||||
|
|
||||||
### Deploys
|
|
||||||
None recorded
|
|
||||||
|
|
||||||
### Narrative
|
|
||||||
> _user-provided summary_
|
|
||||||
|
|
@ -1,10 +0,0 @@
|
||||||
services:
|
|
||||||
ha-diag-agent:
|
|
||||||
# Pin events to the piha-specific subdirectory; overrides the ${NODE_NAME}
|
|
||||||
# variable substitution in the base compose file which requires a shell env var.
|
|
||||||
volumes:
|
|
||||||
- /opt/homelab/events/piha:/events
|
|
||||||
- /var/lib/ha-diag-agent:/data
|
|
||||||
- /opt/homelab/config/ha-diag-agent:/config:ro
|
|
||||||
mem_limit: 128m
|
|
||||||
restart: unless-stopped
|
|
||||||
|
|
@ -42,6 +42,7 @@ STATE_DIR = Path(RUNTIME_PATH) / "state"
|
||||||
LOGS_DIR = Path(RUNTIME_PATH) / "logs"
|
LOGS_DIR = Path(RUNTIME_PATH) / "logs"
|
||||||
WORLD_DIR = Path(RUNTIME_PATH) / "world"
|
WORLD_DIR = Path(RUNTIME_PATH) / "world"
|
||||||
OBSERVER_STATE_FILE = STATE_DIR / "observer_checkpoint.json"
|
OBSERVER_STATE_FILE = STATE_DIR / "observer_checkpoint.json"
|
||||||
|
FAILED_EVENTS_DIR = STATE_DIR / "observer_failed_events"
|
||||||
|
|
||||||
REPO_ROOT = Path(__file__).parent.parent.parent
|
REPO_ROOT = Path(__file__).parent.parent.parent
|
||||||
INVENTORY_TOPOLOGY = REPO_ROOT / "inventory" / "topology.yaml"
|
INVENTORY_TOPOLOGY = REPO_ROOT / "inventory" / "topology.yaml"
|
||||||
|
|
@ -76,6 +77,27 @@ class Observer:
|
||||||
STATE_DIR.mkdir(parents=True, exist_ok=True)
|
STATE_DIR.mkdir(parents=True, exist_ok=True)
|
||||||
EVENTS_DIR.mkdir(parents=True, exist_ok=True)
|
EVENTS_DIR.mkdir(parents=True, exist_ok=True)
|
||||||
LOGS_DIR.mkdir(parents=True, exist_ok=True)
|
LOGS_DIR.mkdir(parents=True, exist_ok=True)
|
||||||
|
FAILED_EVENTS_DIR.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
def _quarantine_event_file(self, file_path: str, node_dir: str, exc: Exception) -> None:
|
||||||
|
"""Move an unreadable/unprocessable event out of the hot path."""
|
||||||
|
src = Path(file_path)
|
||||||
|
dest_dir = FAILED_EVENTS_DIR / node_dir
|
||||||
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
dest = dest_dir / src.name
|
||||||
|
if dest.exists():
|
||||||
|
dest = dest_dir / f"{src.stem}-{int(time.time())}{src.suffix}"
|
||||||
|
try:
|
||||||
|
os.replace(src, dest)
|
||||||
|
logger.error(
|
||||||
|
"Quarantined bad event for node_dir=%s: %s -> %s (%s: %s)",
|
||||||
|
node_dir, src, dest, type(exc).__name__, exc,
|
||||||
|
)
|
||||||
|
except Exception as move_exc:
|
||||||
|
logger.error(
|
||||||
|
"Failed to quarantine bad event for node_dir=%s: %s (%s: %s); move error=%s: %s",
|
||||||
|
node_dir, src, type(exc).__name__, exc, type(move_exc).__name__, move_exc,
|
||||||
|
)
|
||||||
|
|
||||||
def _load_inventory(self):
|
def _load_inventory(self):
|
||||||
inventory = {"nodes": {}, "services": {}}
|
inventory = {"nodes": {}, "services": {}}
|
||||||
|
|
@ -499,7 +521,11 @@ class Observer:
|
||||||
if file_path > self.node_checkpoints.get(node_dir, ""):
|
if file_path > self.node_checkpoints.get(node_dir, ""):
|
||||||
self.node_checkpoints[node_dir] = file_path
|
self.node_checkpoints[node_dir] = file_path
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error processing {file_path}: {e}")
|
logger.error(
|
||||||
|
"Error processing node_dir=%s file=%s (%s: %s)",
|
||||||
|
node_dir, file_path, type(e).__name__, e,
|
||||||
|
)
|
||||||
|
self._quarantine_event_file(file_path, node_dir, e)
|
||||||
|
|
||||||
self._save_checkpoint()
|
self._save_checkpoint()
|
||||||
self._prune_stale_world()
|
self._prune_stale_world()
|
||||||
|
|
|
||||||
|
|
@ -41,6 +41,7 @@ def _make_observer(tmp_path: Path) -> Observer:
|
||||||
original_logs = obs_mod.LOGS_DIR
|
original_logs = obs_mod.LOGS_DIR
|
||||||
original_inventory = obs_mod.INVENTORY_TOPOLOGY
|
original_inventory = obs_mod.INVENTORY_TOPOLOGY
|
||||||
original_repo = obs_mod.REPO_ROOT
|
original_repo = obs_mod.REPO_ROOT
|
||||||
|
original_failed_events = obs_mod.FAILED_EVENTS_DIR
|
||||||
|
|
||||||
obs_mod.WORLD_DIR = world
|
obs_mod.WORLD_DIR = world
|
||||||
obs_mod.STATE_DIR = state
|
obs_mod.STATE_DIR = state
|
||||||
|
|
@ -48,6 +49,7 @@ def _make_observer(tmp_path: Path) -> Observer:
|
||||||
obs_mod.LOGS_DIR = logs
|
obs_mod.LOGS_DIR = logs
|
||||||
obs_mod.INVENTORY_TOPOLOGY = repo / "inventory" / "topology.yaml"
|
obs_mod.INVENTORY_TOPOLOGY = repo / "inventory" / "topology.yaml"
|
||||||
obs_mod.REPO_ROOT = repo
|
obs_mod.REPO_ROOT = repo
|
||||||
|
obs_mod.FAILED_EVENTS_DIR = state / "observer_failed_events"
|
||||||
|
|
||||||
obs = Observer()
|
obs = Observer()
|
||||||
|
|
||||||
|
|
@ -59,6 +61,7 @@ def _make_observer(tmp_path: Path) -> Observer:
|
||||||
obs_mod.LOGS_DIR = original_logs
|
obs_mod.LOGS_DIR = original_logs
|
||||||
obs_mod.INVENTORY_TOPOLOGY = original_inventory
|
obs_mod.INVENTORY_TOPOLOGY = original_inventory
|
||||||
obs_mod.REPO_ROOT = original_repo
|
obs_mod.REPO_ROOT = original_repo
|
||||||
|
obs_mod.FAILED_EVENTS_DIR = original_failed_events
|
||||||
|
|
||||||
return obs
|
return obs
|
||||||
|
|
||||||
|
|
@ -87,6 +90,7 @@ def _make_observer_simple(tmp_path: Path):
|
||||||
obs_mod.LOGS_DIR = logs
|
obs_mod.LOGS_DIR = logs
|
||||||
obs_mod.INVENTORY_TOPOLOGY = repo / "inventory" / "topology.yaml"
|
obs_mod.INVENTORY_TOPOLOGY = repo / "inventory" / "topology.yaml"
|
||||||
obs_mod.REPO_ROOT = repo
|
obs_mod.REPO_ROOT = repo
|
||||||
|
obs_mod.FAILED_EVENTS_DIR = state / "observer_failed_events"
|
||||||
|
|
||||||
obs = Observer()
|
obs = Observer()
|
||||||
return obs
|
return obs
|
||||||
|
|
@ -331,3 +335,45 @@ def test_prune_keeps_recently_resolved_incident(tmp_path):
|
||||||
obs._prune_stale_world()
|
obs._prune_stale_world()
|
||||||
|
|
||||||
assert inc_id in obs.world_state["incidents"]
|
assert inc_id in obs.world_state["incidents"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_run_once_quarantines_bad_event_and_processes_next_for_same_node(tmp_path):
|
||||||
|
"""A malformed event file must not wedge a node forever."""
|
||||||
|
obs = _make_observer_simple(tmp_path)
|
||||||
|
|
||||||
|
import observer.observer as obs_mod
|
||||||
|
|
||||||
|
topology = obs_mod.INVENTORY_TOPOLOGY
|
||||||
|
topology.write_text(
|
||||||
|
"nodes:\n"
|
||||||
|
" lustro:\n"
|
||||||
|
" roles: [edge]\n"
|
||||||
|
" connectivity: {}\n"
|
||||||
|
)
|
||||||
|
obs.inventory = obs._load_inventory()
|
||||||
|
|
||||||
|
bad_dir = obs_mod.EVENTS_DIR / "lustro"
|
||||||
|
bad_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
bad_event = bad_dir / "evt-lustro-1-bad.json"
|
||||||
|
bad_event.write_text("{not-json")
|
||||||
|
|
||||||
|
good_event = bad_dir / "evt-lustro-2-good.json"
|
||||||
|
good_event.write_text(json.dumps({
|
||||||
|
"id": "evt-lustro-2-good",
|
||||||
|
"timestamp": int(time.time()),
|
||||||
|
"date": "2026-06-10T00:00:00Z",
|
||||||
|
"type": "node_health",
|
||||||
|
"severity": "info",
|
||||||
|
"node": "lustro",
|
||||||
|
"service": "",
|
||||||
|
"message": "ok",
|
||||||
|
"payload": {"disk_pct": 1, "mem_pct": 2, "cpu_pct": 3},
|
||||||
|
}))
|
||||||
|
|
||||||
|
obs.run_once()
|
||||||
|
|
||||||
|
quarantined = obs_mod.FAILED_EVENTS_DIR / "lustro" / bad_event.name
|
||||||
|
assert quarantined.exists()
|
||||||
|
assert not bad_event.exists()
|
||||||
|
assert obs.world_state["nodes"]["lustro"]["status"] == "online"
|
||||||
|
assert obs.node_checkpoints["lustro"] == str(good_event)
|
||||||
|
|
|
||||||
|
|
@ -115,8 +115,8 @@ docker ps | grep ha-diag-agent
|
||||||
# Last 50 log lines
|
# Last 50 log lines
|
||||||
docker logs ha-diag-agent --tail 50
|
docker logs ha-diag-agent --tail 50
|
||||||
|
|
||||||
# FastAPI health endpoint (no host port mapping — probe via docker exec)
|
# FastAPI health endpoint
|
||||||
docker exec ha-diag-agent python -c "import urllib.request; print(urllib.request.urlopen('http://localhost:8087/health', timeout=5).read().decode())"
|
curl http://localhost:8087/health
|
||||||
# Expect: {"status": "ok", "ws_connected": true, ...}
|
# Expect: {"status": "ok", "ws_connected": true, ...}
|
||||||
|
|
||||||
# Events are being written
|
# Events are being written
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,7 @@ WebSocketMonitor (persistent, long-running — Phase 4b)
|
||||||
silence > 5min or on disconnect. Emits ha_websocket_recovered
|
silence > 5min or on disconnect. Emits ha_websocket_recovered
|
||||||
when the connection is restored after a dead alert.
|
when the connection is restored after a dead alert.
|
||||||
|
|
||||||
FastAPI (port 8087, internal only — no host port mapping)
|
FastAPI (port 8087)
|
||||||
GET /health → liveness probe (includes ws_connected field)
|
GET /health → liveness probe (includes ws_connected field)
|
||||||
POST /trigger/<check> → run a named check on demand
|
POST /trigger/<check> → run a named check on demand
|
||||||
|
|
||||||
|
|
@ -97,7 +97,7 @@ scripts/deploy/deploy.sh --service ha-diag-agent
|
||||||
|
|
||||||
# 3. Verify
|
# 3. Verify
|
||||||
docker ps --filter name=ha-diag-agent
|
docker ps --filter name=ha-diag-agent
|
||||||
docker exec ha-diag-agent python -c "import urllib.request; print(urllib.request.urlopen('http://localhost:8087/health', timeout=5).read().decode())"
|
curl http://localhost:8087/health
|
||||||
```
|
```
|
||||||
|
|
||||||
### chelsty-infra note
|
### chelsty-infra note
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,9 @@ services:
|
||||||
env_file:
|
env_file:
|
||||||
- /opt/homelab/config/ha-diag-agent/.env
|
- /opt/homelab/config/ha-diag-agent/.env
|
||||||
|
|
||||||
|
ports:
|
||||||
|
- "8087:8087"
|
||||||
|
|
||||||
volumes:
|
volumes:
|
||||||
# Events dir: host path includes node name; inside container always /events
|
# Events dir: host path includes node name; inside container always /events
|
||||||
- /opt/homelab/events/${NODE_NAME:-ha-diag}:/events
|
- /opt/homelab/events/${NODE_NAME:-ha-diag}:/events
|
||||||
|
|
|
||||||
|
|
@ -2,14 +2,11 @@
|
||||||
# Copy to /opt/homelab/config/ha-diag-agent/.env on the target node
|
# Copy to /opt/homelab/config/ha-diag-agent/.env on the target node
|
||||||
|
|
||||||
# Home Assistant connection (required)
|
# Home Assistant connection (required)
|
||||||
# piha: HA_URL=http://localhost:8123
|
HA_URL=http://homeassistant.local:8123
|
||||||
# chelsty-infra: HA_URL=http://100.70.180.90:8123 (chelsty-ha Tailscale IP)
|
|
||||||
HA_URL=http://localhost:8123
|
|
||||||
# Obtain from HA UI: Settings → People → <diag_agent user> → Long-Lived Access Tokens
|
|
||||||
HA_TOKEN=your-long-lived-token-here
|
HA_TOKEN=your-long-lived-token-here
|
||||||
HA_TIMEOUT=10.0
|
HA_TIMEOUT=10.0
|
||||||
|
|
||||||
# Node identity (must match the node's canonical name in the homelab inventory)
|
# Node identity
|
||||||
NODE_NAME=piha
|
NODE_NAME=piha
|
||||||
LOCATION_TAG=ken
|
LOCATION_TAG=ken
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,7 @@ dependencies = [
|
||||||
dev = [
|
dev = [
|
||||||
"pytest>=8.1",
|
"pytest>=8.1",
|
||||||
"pytest-asyncio>=0.23",
|
"pytest-asyncio>=0.23",
|
||||||
|
"aioresponses>=0.7",
|
||||||
]
|
]
|
||||||
|
|
||||||
[tool.setuptools.packages.find]
|
[tool.setuptools.packages.find]
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,9 @@ service:
|
||||||
dependencies:
|
dependencies:
|
||||||
- homeassistant
|
- homeassistant
|
||||||
|
|
||||||
|
ports:
|
||||||
|
- 8087
|
||||||
|
|
||||||
healthcheck:
|
healthcheck:
|
||||||
type: http
|
type: http
|
||||||
path: /health
|
path: /health
|
||||||
|
|
|
||||||
|
|
@ -68,7 +68,7 @@ async def _run_check_and_emit(
|
||||||
_log.warning(
|
_log.warning(
|
||||||
"check_unhealthy",
|
"check_unhealthy",
|
||||||
check=check.name,
|
check=check.name,
|
||||||
ha_event=result.event_type,
|
event=result.event_type,
|
||||||
msg=result.message,
|
msg=result.message,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,8 @@
|
||||||
"""Tests for HAClient using unittest.mock to avoid aioresponses/aiohttp version coupling."""
|
"""Tests for HAClient using aioresponses to mock aiohttp."""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from unittest.mock import AsyncMock, MagicMock
|
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
from aioresponses import aioresponses
|
||||||
|
|
||||||
from ha_diag.ha_client import HAClient, make_session
|
from ha_diag.ha_client import HAClient, make_session
|
||||||
|
|
||||||
|
|
@ -11,49 +10,34 @@ HA_URL = "http://homeassistant.test:8123"
|
||||||
TOKEN = "test-token"
|
TOKEN = "test-token"
|
||||||
|
|
||||||
|
|
||||||
def _mock_resp(payload=None, text=None, status=200):
|
|
||||||
"""Return a mock that behaves like an aiohttp response context manager."""
|
|
||||||
resp = MagicMock()
|
|
||||||
resp.status = status
|
|
||||||
if status >= 400:
|
|
||||||
resp.raise_for_status.side_effect = Exception(f"HTTP {status}")
|
|
||||||
else:
|
|
||||||
resp.raise_for_status = MagicMock()
|
|
||||||
resp.json = AsyncMock(return_value=payload if payload is not None else {})
|
|
||||||
resp.text = AsyncMock(return_value=text or "")
|
|
||||||
resp.__aenter__ = AsyncMock(return_value=resp)
|
|
||||||
resp.__aexit__ = AsyncMock(return_value=False)
|
|
||||||
return resp
|
|
||||||
|
|
||||||
|
|
||||||
def _mock_session(get_resp=None):
|
|
||||||
session = MagicMock()
|
|
||||||
session.get.return_value = get_resp or _mock_resp()
|
|
||||||
return session
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_get_api_status_ok():
|
async def test_get_api_status_ok():
|
||||||
session = _mock_session(_mock_resp({"message": "API running."}))
|
with aioresponses() as m:
|
||||||
client = HAClient(HA_URL, session)
|
m.get(f"{HA_URL}/api/", payload={"message": "API running."})
|
||||||
result = await client.get_api_status()
|
async with make_session(TOKEN) as session:
|
||||||
|
client = HAClient(HA_URL, session)
|
||||||
|
result = await client.get_api_status()
|
||||||
assert result == {"message": "API running."}
|
assert result == {"message": "API running."}
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_get_api_status_unauthorized():
|
async def test_get_api_status_unauthorized():
|
||||||
session = _mock_session(_mock_resp(status=401))
|
with aioresponses() as m:
|
||||||
client = HAClient(HA_URL, session)
|
m.get(f"{HA_URL}/api/", status=401)
|
||||||
with pytest.raises(Exception):
|
async with make_session(TOKEN) as session:
|
||||||
await client.get_api_status()
|
client = HAClient(HA_URL, session)
|
||||||
|
with pytest.raises(Exception):
|
||||||
|
await client.get_api_status()
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_get_states_returns_list():
|
async def test_get_states_returns_list():
|
||||||
payload = [{"entity_id": "light.living_room", "state": "on"}]
|
payload = [{"entity_id": "light.living_room", "state": "on"}]
|
||||||
session = _mock_session(_mock_resp(payload))
|
with aioresponses() as m:
|
||||||
client = HAClient(HA_URL, session)
|
m.get(f"{HA_URL}/api/states", payload=payload)
|
||||||
states = await client.get_states()
|
async with make_session(TOKEN) as session:
|
||||||
|
client = HAClient(HA_URL, session)
|
||||||
|
states = await client.get_states()
|
||||||
assert isinstance(states, list)
|
assert isinstance(states, list)
|
||||||
assert states[0]["entity_id"] == "light.living_room"
|
assert states[0]["entity_id"] == "light.living_room"
|
||||||
|
|
||||||
|
|
@ -61,9 +45,11 @@ async def test_get_states_returns_list():
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_get_config_returns_dict():
|
async def test_get_config_returns_dict():
|
||||||
payload = {"version": "2024.1.0", "location_name": "Home"}
|
payload = {"version": "2024.1.0", "location_name": "Home"}
|
||||||
session = _mock_session(_mock_resp(payload))
|
with aioresponses() as m:
|
||||||
client = HAClient(HA_URL, session)
|
m.get(f"{HA_URL}/api/config", payload=payload)
|
||||||
config = await client.get_config()
|
async with make_session(TOKEN) as session:
|
||||||
|
client = HAClient(HA_URL, session)
|
||||||
|
config = await client.get_config()
|
||||||
assert config["version"] == "2024.1.0"
|
assert config["version"] == "2024.1.0"
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -73,9 +59,11 @@ async def test_get_entity_registry_returns_list():
|
||||||
{"entity_id": "light.hall", "platform": "zha", "area_id": "hallway"},
|
{"entity_id": "light.hall", "platform": "zha", "area_id": "hallway"},
|
||||||
{"entity_id": "sensor.temp", "platform": "mqtt", "area_id": None},
|
{"entity_id": "sensor.temp", "platform": "mqtt", "area_id": None},
|
||||||
]
|
]
|
||||||
session = _mock_session(_mock_resp(payload))
|
with aioresponses() as m:
|
||||||
client = HAClient(HA_URL, session)
|
m.get(f"{HA_URL}/api/config/entity_registry", payload=payload)
|
||||||
registry = await client.get_entity_registry()
|
async with make_session(TOKEN) as session:
|
||||||
|
client = HAClient(HA_URL, session)
|
||||||
|
registry = await client.get_entity_registry()
|
||||||
assert len(registry) == 2
|
assert len(registry) == 2
|
||||||
assert registry[0]["platform"] == "zha"
|
assert registry[0]["platform"] == "zha"
|
||||||
|
|
||||||
|
|
@ -83,7 +71,12 @@ async def test_get_entity_registry_returns_list():
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_make_session_sets_auth_header():
|
async def test_make_session_sets_auth_header():
|
||||||
"""make_session injects the Bearer token in all requests."""
|
"""make_session injects the Bearer token in all requests."""
|
||||||
async with make_session("my-secret-token") as session:
|
with aioresponses() as m:
|
||||||
|
m.get(f"{HA_URL}/api/", payload={"message": "API running."})
|
||||||
|
async with make_session("my-secret-token") as session:
|
||||||
|
client = HAClient(HA_URL, session)
|
||||||
|
await client.get_api_status()
|
||||||
|
# Verify the Authorization header was sent
|
||||||
assert session.headers.get("Authorization") == "Bearer my-secret-token"
|
assert session.headers.get("Authorization") == "Bearer my-secret-token"
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -96,42 +89,47 @@ async def test_make_session_sets_auth_header():
|
||||||
async def test_entity_registry_cached_on_second_call():
|
async def test_entity_registry_cached_on_second_call():
|
||||||
"""Second call within TTL returns cache, making only one HTTP request."""
|
"""Second call within TTL returns cache, making only one HTTP request."""
|
||||||
payload = [{"entity_id": "light.hall", "platform": "zha", "area_id": "hallway"}]
|
payload = [{"entity_id": "light.hall", "platform": "zha", "area_id": "hallway"}]
|
||||||
session = _mock_session(_mock_resp(payload))
|
with aioresponses() as m:
|
||||||
client = HAClient(HA_URL, session, entity_registry_cache_ttl=300.0)
|
m.get(f"{HA_URL}/api/config/entity_registry", payload=payload)
|
||||||
r1 = await client.get_entity_registry()
|
async with make_session(TOKEN) as session:
|
||||||
r2 = await client.get_entity_registry() # from cache
|
client = HAClient(HA_URL, session, entity_registry_cache_ttl=300.0)
|
||||||
|
r1 = await client.get_entity_registry()
|
||||||
|
r2 = await client.get_entity_registry() # from cache — no second HTTP call
|
||||||
assert r1 == r2
|
assert r1 == r2
|
||||||
session.get.assert_called_once() # only one HTTP request
|
# aioresponses would raise ConnectionError on the unmocked second request
|
||||||
|
# if caching weren't working; reaching here means it used the cache.
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_entity_registry_cache_bypassed_after_ttl():
|
async def test_entity_registry_cache_bypassed_after_ttl(monkeypatch):
|
||||||
"""After TTL expiry (ttl=0), next call fetches fresh data."""
|
"""After TTL expiry, next call fetches fresh data."""
|
||||||
|
import time
|
||||||
payload = [{"entity_id": "light.hall", "platform": "zha", "area_id": "hallway"}]
|
payload = [{"entity_id": "light.hall", "platform": "zha", "area_id": "hallway"}]
|
||||||
session = _mock_session(_mock_resp(payload))
|
with aioresponses() as m:
|
||||||
# TTL=0 means every call is stale → two fetches
|
m.get(f"{HA_URL}/api/config/entity_registry", payload=payload)
|
||||||
session.get.side_effect = [_mock_resp(payload), _mock_resp(payload)]
|
m.get(f"{HA_URL}/api/config/entity_registry", payload=payload)
|
||||||
client = HAClient(HA_URL, session, entity_registry_cache_ttl=0.0)
|
async with make_session(TOKEN) as session:
|
||||||
await client.get_entity_registry()
|
client = HAClient(HA_URL, session, entity_registry_cache_ttl=0.0)
|
||||||
await client.get_entity_registry()
|
await client.get_entity_registry() # fetches
|
||||||
assert session.get.call_count == 2
|
await client.get_entity_registry() # TTL=0 → fetches again
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_invalidate_registry_cache_forces_refetch():
|
async def test_invalidate_registry_cache_forces_refetch():
|
||||||
"""invalidate_registry_cache() makes the next call hit the network."""
|
"""invalidate_registry_cache() makes the next call hit the network."""
|
||||||
payload = [{"entity_id": "light.hall", "platform": "zha", "area_id": ""}]
|
payload = [{"entity_id": "light.hall", "platform": "zha", "area_id": ""}]
|
||||||
session = _mock_session(_mock_resp(payload))
|
with aioresponses() as m:
|
||||||
session.get.side_effect = [_mock_resp(payload), _mock_resp(payload)]
|
m.get(f"{HA_URL}/api/config/entity_registry", payload=payload)
|
||||||
client = HAClient(HA_URL, session, entity_registry_cache_ttl=300.0)
|
m.get(f"{HA_URL}/api/config/entity_registry", payload=payload)
|
||||||
await client.get_entity_registry()
|
async with make_session(TOKEN) as session:
|
||||||
client.invalidate_registry_cache()
|
client = HAClient(HA_URL, session, entity_registry_cache_ttl=300.0)
|
||||||
await client.get_entity_registry()
|
await client.get_entity_registry()
|
||||||
assert session.get.call_count == 2
|
client.invalidate_registry_cache()
|
||||||
|
await client.get_entity_registry() # must hit network again
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_entity_registry_cache_default_ttl_is_300():
|
async def test_entity_registry_cache_default_ttl_is_300():
|
||||||
session = _mock_session()
|
async with make_session(TOKEN) as session:
|
||||||
client = HAClient(HA_URL, session)
|
client = HAClient(HA_URL, session)
|
||||||
assert client._registry_cache_ttl == 300.0
|
assert client._registry_cache_ttl == 300.0
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue