fix(observer): robust incident lifecycle + orphan auto-resolve

Two root causes for stale "active" incidents on the dashboard:

1. TypeError bug in _prune_stale_world: last_occurrence / resolved_at
   can be an ISO-8601 string (stability-agent via events.py) or a Unix
   int (node-agent).  The previous session's auto-resolve did plain
   `time.time() - last_occ` which raises TypeError for strings,
   silently preventing _save_world() from being called and leaving
   incidents perpetually "active" on disk.

   Fix: add _parse_ts(ts) -> float that handles int, float, and
   ISO-8601 strings uniformly. All timestamp arithmetic now goes through
   it; returns 0.0 on None / garbage to keep comparisons safe.

2. Orphaned active incidents: _resolve_incident clears service["incident_id"]
   and marks the incident "resolved" in memory, but if incidents.json was
   truncated mid-write (pre-atomic-write era), the observer loaded it at
   next startup with status="active" and no service entry pointing to it.
   No code ever touched these orphans again.

   Fix: _prune_stale_world now runs two cleanup passes each cycle:
   - Case 1 (healthy-linked): service.status=="healthy" AND incident_id
     still set → resolve immediately (service cannot have active incident)
   - Case 2 (orphaned): active incident with no service link AND
     last_occurrence > 5 min ago → resolve (5-min guard for creation race)

   Both cases are wrapped in try/except so a bug here never crashes the
   observer loop or blocks _save_world.

   Also fixes the 7-day stale-incident prune to use _parse_ts so
   ISO-string resolved_at values are handled correctly.

3. Operator UI: current_incidents() now filters to status=="active" only.
   Resolved incidents were previously included in the /incidents endpoint,
   making the dashboard show a wall of historical records as if active.

Nocturnal job investigation: _cleanup_control_plane_fs in node-agent runs
every 60s on VPS (not midnight-specific); it reads observer_checkpoint.json
(now written atomically) and deletes old event files. No non-atomic writes
found. Midnight clustering was likely external (logrotate / OS flush);
the supervisor's resilient loader already handles such transient issues.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Oskar Kapala 2026-06-03 14:29:12 +02:00
parent 98437d46b2
commit f5dcefc752
3 changed files with 413 additions and 22 deletions

View file

@ -17,6 +17,24 @@ def _atomic_write_json(path: Path, data) -> None:
os.fsync(f.fileno())
os.replace(tmp, path)
def _parse_ts(ts) -> float:
"""Return a Unix timestamp float from ts, which may be int/float or an ISO-8601 string.
Events from node-agent use int(time.time()); events from stability-agent / events.py
use ISO format ('2026-06-03T10:30:00Z'). Both appear in incident fields such as
last_occurrence and resolved_at, so any arithmetic on them must go through here.
Returns 0.0 on None or unparseable input so callers can use plain comparisons.
"""
if ts is None:
return 0.0
if isinstance(ts, (int, float)):
return float(ts)
try:
return datetime.fromisoformat(str(ts).replace("Z", "+00:00")).timestamp()
except Exception:
return 0.0
# Constants and Paths
RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab")
EVENTS_DIR = Path(RUNTIME_PATH) / "events"
@ -184,32 +202,66 @@ class Observer:
now = time.time()
# Auto-resolve active incidents for services that are currently healthy
# and whose last_occurrence is older than 30 minutes. These are phantom
# incidents created by race-condition reads of truncated state files; they
# never receive a service_recovered event because the service was healthy
# all along.
try:
# Collect incident_ids currently referenced by any service entry.
linked_ids: set = {
svc.get("incident_id")
for svc in self.world_state["services"].values()
if svc.get("incident_id")
}
# Case 1 — service is healthy but still points at an active incident.
# process_event already calls _resolve_incident on service_healthy events,
# but if the observer restarted with on-disk state where the link was
# intact (inconsistency from a pre-atomic-write crash), it may not get
# resolved until the next service_healthy event is processed. Resolve
# immediately — a healthy service cannot have an ongoing incident.
for svc_key, svc in self.world_state["services"].items():
if svc.get("status") == "healthy":
if svc.get("status") != "healthy":
continue
inc_id = svc.get("incident_id")
if inc_id and inc_id in self.world_state["incidents"]:
inc = self.world_state["incidents"][inc_id]
last_occ = inc.get("last_occurrence") or 0
if (inc.get("status") == "active"
and (now - last_occ) > 1800):
if not inc_id:
continue
inc = self.world_state["incidents"].get(inc_id, {})
if inc.get("status") == "active":
logger.info(
f"Auto-resolving stale incident {inc_id} for {svc_key}: "
f"service healthy, last_occurrence >{int((now - last_occ) / 60)}min ago"
f"Auto-resolving incident {inc_id} for {svc_key}: "
f"service is healthy"
)
inc["status"] = "resolved"
inc["resolved_at"] = now
svc["incident_id"] = None
linked_ids.discard(inc_id)
# Case 2 — orphaned active incident: no service entry links to it and
# last_occurrence is older than 5 minutes (guard against creation races).
# These are the stale records left behind when on-disk state was
# inconsistent: the service entry had incident_id cleared but incidents.json
# still had the record as "active".
for inc_id, inc in self.world_state["incidents"].items():
if inc.get("status") != "active":
continue
if inc_id in linked_ids:
continue
age = now - _parse_ts(inc.get("last_occurrence"))
if age > 300: # 5-minute guard
logger.info(
f"Auto-resolving orphaned incident {inc_id} "
f"(service={inc.get('service')}, node={inc.get('node')}): "
f"no service references it, age={int(age)}s"
)
inc["status"] = "resolved"
inc["resolved_at"] = now
except Exception as exc:
logger.error(f"Error during incident auto-resolve in _prune_stale_world: {exc}")
# Remove resolved incidents older than 7 days.
# Use _parse_ts so ISO-string resolved_at values are handled correctly.
stale_incidents = [
k for k, v in self.world_state["incidents"].items()
if v.get("status") == "resolved"
and (now - (v.get("resolved_at") or now)) > 7 * 86400
and now - _parse_ts(v.get("resolved_at")) > 7 * 86400
]
for k in stale_incidents:
del self.world_state["incidents"][k]

View file

@ -147,12 +147,18 @@ def current_deployments():
def current_incidents():
"""Return incidents as a list sorted most-recent-first."""
"""Return active incidents as a list sorted most-recent-first.
Only incidents with status='active' are returned; resolved and cancelled
records are excluded so the dashboard reflects the current operational state.
"""
raw = read_json_file(WORLD_DIR / "incidents.json", default={})
if isinstance(raw, list):
return raw
return [i for i in raw if i.get("status") == "active"]
result = []
for inc in raw.values():
if inc.get("status") != "active":
continue
# Synthesise a human-readable message if not stored (observer doesn't set one).
if "message" not in inc:
inc = dict(inc)

View file

@ -0,0 +1,333 @@
"""Tests for incident lifecycle: auto-resolve, orphan detection, timestamp parsing."""
from __future__ import annotations
import json
import sys
import time
from pathlib import Path
import pytest
# Observer lives outside the control-plane package; add scripts/ to path.
sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent / "scripts"))
from observer.observer import Observer, _parse_ts, _atomic_write_json
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_observer(tmp_path: Path) -> Observer:
"""Return an Observer with all runtime paths redirected to tmp_path."""
import observer.observer as obs_mod
world = tmp_path / "world"
state = tmp_path / "state"
events = tmp_path / "events"
logs = tmp_path / "logs"
repo = tmp_path / "repo"
for d in (world, state, events, logs, repo / "inventory", repo / "hosts"):
d.mkdir(parents=True, exist_ok=True)
# Minimal topology so inventory isn't empty (avoids prune-guard early-return)
(repo / "inventory" / "topology.yaml").write_text(
"nodes:\n vps:\n roles: [control-plane]\n connectivity: {}\n"
)
original_world = obs_mod.WORLD_DIR
original_state = obs_mod.STATE_DIR
original_events = obs_mod.EVENTS_DIR
original_logs = obs_mod.LOGS_DIR
original_inventory = obs_mod.INVENTORY_TOPOLOGY
original_repo = obs_mod.REPO_ROOT
obs_mod.WORLD_DIR = world
obs_mod.STATE_DIR = state
obs_mod.EVENTS_DIR = events
obs_mod.LOGS_DIR = logs
obs_mod.INVENTORY_TOPOLOGY = repo / "inventory" / "topology.yaml"
obs_mod.REPO_ROOT = repo
obs = Observer()
# Restore module-level constants (monkeypatching at module level is sufficient
# for the Observer instance which captures paths at construction time via globals)
obs_mod.WORLD_DIR = original_world
obs_mod.STATE_DIR = original_state
obs_mod.EVENTS_DIR = original_events
obs_mod.LOGS_DIR = original_logs
obs_mod.INVENTORY_TOPOLOGY = original_inventory
obs_mod.REPO_ROOT = original_repo
return obs
def _make_observer_simple(tmp_path: Path):
"""Return an Observer instance and patch its world_state in-place."""
import observer.observer as obs_mod
world = tmp_path / "world"
state = tmp_path / "state"
events = tmp_path / "events"
logs = tmp_path / "logs"
repo = tmp_path / "repo"
for d in (world, state, events, logs, repo / "inventory", repo / "hosts"):
d.mkdir(parents=True, exist_ok=True)
(repo / "inventory" / "topology.yaml").write_text(
"nodes:\n vps:\n roles: [control-plane]\n connectivity: {}\n"
)
# Patch before construction
obs_mod.WORLD_DIR = world
obs_mod.STATE_DIR = state
obs_mod.EVENTS_DIR = events
obs_mod.LOGS_DIR = logs
obs_mod.INVENTORY_TOPOLOGY = repo / "inventory" / "topology.yaml"
obs_mod.REPO_ROOT = repo
obs = Observer()
return obs
# ---------------------------------------------------------------------------
# 1. _parse_ts — timestamp normalisation
# ---------------------------------------------------------------------------
def test_parse_ts_int():
ts = int(time.time()) - 3600
assert abs(_parse_ts(ts) - ts) < 1
def test_parse_ts_float():
ts = time.time() - 100.5
assert abs(_parse_ts(ts) - ts) < 0.01
def test_parse_ts_iso_string():
# ISO format as emitted by events.py / stability-agent
from datetime import datetime, timezone
iso = "2026-06-01T00:03:22Z"
expected = datetime(2026, 6, 1, 0, 3, 22, tzinfo=timezone.utc).timestamp()
result = _parse_ts(iso)
assert result > 0
assert isinstance(result, float)
assert abs(result - expected) < 1
def test_parse_ts_none_returns_zero():
assert _parse_ts(None) == 0.0
def test_parse_ts_garbage_returns_zero():
assert _parse_ts("not-a-date") == 0.0
def test_parse_ts_zero_int():
assert _parse_ts(0) == 0.0
# ---------------------------------------------------------------------------
# 2. Lifecycle: service_healthy event resolves linked incident
# ---------------------------------------------------------------------------
def test_service_healthy_resolves_active_incident(tmp_path):
obs = _make_observer_simple(tmp_path)
inc_id = "inc-111-vps-outline"
obs.world_state["services"]["vps/outline"] = {
"node": "vps", "service": "outline",
"status": "unhealthy", "last_check": None,
"incident_id": inc_id,
}
obs.world_state["incidents"][inc_id] = {
"id": inc_id, "node": "vps", "service": "outline",
"status": "active", "trigger_type": "service_unhealthy",
"started_at": int(time.time()) - 600,
"last_occurrence": int(time.time()) - 600,
"occurrence_count": 1, "events": [],
}
obs.process_event({
"type": "service_healthy",
"node": "vps",
"service": "outline",
"severity": "info",
"timestamp": int(time.time()),
"payload": {},
})
assert obs.world_state["services"]["vps/outline"]["status"] == "healthy"
assert obs.world_state["services"]["vps/outline"]["incident_id"] is None
assert obs.world_state["incidents"][inc_id]["status"] == "resolved"
def test_service_healthy_does_not_resolve_other_incidents(tmp_path):
"""service_healthy for service A must not touch incident for service B."""
obs = _make_observer_simple(tmp_path)
inc_b = "inc-222-vps-supervisor"
obs.world_state["services"]["vps/supervisor"] = {
"node": "vps", "service": "supervisor",
"status": "unhealthy", "last_check": None,
"incident_id": inc_b,
}
obs.world_state["incidents"][inc_b] = {
"id": inc_b, "status": "active",
"last_occurrence": int(time.time()) - 300,
}
obs.process_event({
"type": "service_healthy",
"node": "vps",
"service": "outline", # different service
"severity": "info",
"timestamp": int(time.time()),
"payload": {},
})
assert obs.world_state["incidents"][inc_b]["status"] == "active"
# ---------------------------------------------------------------------------
# 3. _prune_stale_world: healthy-service-linked incident → immediate resolve
# ---------------------------------------------------------------------------
def test_prune_resolves_healthy_linked_incident(tmp_path):
"""If a service is healthy but still points at an active incident, resolve it."""
obs = _make_observer_simple(tmp_path)
inc_id = "inc-333-vps-outline"
obs.world_state["services"]["vps/outline"] = {
"node": "vps", "service": "outline",
"status": "healthy", # <-- healthy but incident_id still set
"last_check": None,
"incident_id": inc_id,
}
obs.world_state["incidents"][inc_id] = {
"id": inc_id, "status": "active",
"started_at": int(time.time()) - 7200,
"last_occurrence": int(time.time()) - 7200,
}
obs._prune_stale_world()
assert obs.world_state["services"]["vps/outline"]["incident_id"] is None
assert obs.world_state["incidents"][inc_id]["status"] == "resolved"
def test_prune_resolves_healthy_linked_incident_iso_timestamp(tmp_path):
"""Healthy-linked incident with ISO-string last_occurrence must still resolve."""
obs = _make_observer_simple(tmp_path)
inc_id = "inc-444-vps-outline"
obs.world_state["services"]["vps/outline"] = {
"node": "vps", "service": "outline",
"status": "healthy", "last_check": None, "incident_id": inc_id,
}
obs.world_state["incidents"][inc_id] = {
"id": inc_id, "status": "active",
"last_occurrence": "2026-06-01T00:03:22Z", # ISO string from events.py
}
obs._prune_stale_world() # must not raise TypeError
assert obs.world_state["incidents"][inc_id]["status"] == "resolved"
# ---------------------------------------------------------------------------
# 4. _prune_stale_world: orphaned incident (no service link) → resolve after 5 min
# ---------------------------------------------------------------------------
def test_prune_resolves_orphaned_incident_old_enough(tmp_path):
"""Orphaned active incident older than 5 min must be auto-resolved."""
obs = _make_observer_simple(tmp_path)
inc_id = "inc-555-vps-supervisor"
# No service entry links to this incident
obs.world_state["incidents"][inc_id] = {
"id": inc_id, "status": "active", "node": "vps", "service": "supervisor",
"last_occurrence": int(time.time()) - 400, # 6.7 min ago
}
obs._prune_stale_world()
assert obs.world_state["incidents"][inc_id]["status"] == "resolved"
def test_prune_does_not_resolve_orphaned_incident_too_recent(tmp_path):
"""Orphaned incident younger than 5 min must stay active (guard against race)."""
obs = _make_observer_simple(tmp_path)
inc_id = "inc-666-vps-supervisor"
obs.world_state["incidents"][inc_id] = {
"id": inc_id, "status": "active",
"last_occurrence": int(time.time()) - 60, # 1 min ago — within guard
}
obs._prune_stale_world()
assert obs.world_state["incidents"][inc_id]["status"] == "active"
def test_prune_resolves_orphaned_incident_iso_timestamp(tmp_path):
"""Orphaned incident with ISO-string last_occurrence must resolve correctly."""
obs = _make_observer_simple(tmp_path)
inc_id = "inc-777-vps-outline"
# ISO timestamp well in the past (2026-06-01)
obs.world_state["incidents"][inc_id] = {
"id": inc_id, "status": "active",
"last_occurrence": "2026-06-01T00:03:22Z",
}
obs._prune_stale_world() # must not raise TypeError
assert obs.world_state["incidents"][inc_id]["status"] == "resolved"
def test_prune_does_not_touch_linked_incident(tmp_path):
"""An active incident still linked from a non-healthy service must stay active."""
obs = _make_observer_simple(tmp_path)
inc_id = "inc-888-vps-outline"
obs.world_state["services"]["vps/outline"] = {
"node": "vps", "service": "outline",
"status": "unhealthy", # <-- still unhealthy
"last_check": None,
"incident_id": inc_id,
}
obs.world_state["incidents"][inc_id] = {
"id": inc_id, "status": "active",
"last_occurrence": int(time.time()) - 3600,
}
obs._prune_stale_world()
assert obs.world_state["incidents"][inc_id]["status"] == "active"
# ---------------------------------------------------------------------------
# 5. 7-day stale incident prune with ISO resolved_at
# ---------------------------------------------------------------------------
def test_prune_removes_old_resolved_incident_iso_resolved_at(tmp_path):
"""Resolved incidents with ISO-string resolved_at older than 7 days must be pruned."""
obs = _make_observer_simple(tmp_path)
inc_id = "inc-old-resolved"
obs.world_state["incidents"][inc_id] = {
"id": inc_id, "status": "resolved",
"resolved_at": "2026-05-01T00:00:00Z", # >7 days before 2026-06-03
}
obs._prune_stale_world()
assert inc_id not in obs.world_state["incidents"]
def test_prune_keeps_recently_resolved_incident(tmp_path):
"""Resolved incidents within 7 days must be kept."""
obs = _make_observer_simple(tmp_path)
inc_id = "inc-recent-resolved"
obs.world_state["incidents"][inc_id] = {
"id": inc_id, "status": "resolved",
"resolved_at": time.time() - 86400, # 1 day ago
}
obs._prune_stale_world()
assert inc_id in obs.world_state["incidents"]