From ffb0608b9a0c036c490f79335cdc9c2d0c8bccde Mon Sep 17 00:00:00 2001 From: Oskar Kapala Date: Wed, 3 Jun 2026 12:26:49 +0200 Subject: [PATCH] fix(observer): atomic writes for world state files All JSON state writes (services.json, nodes.json, incidents.json, deployments.json, runtime-summary.json, observer_checkpoint.json) now use _atomic_write_json: write to a .tmp sibling, fsync, then os.replace. This eliminates the truncated-write window that caused supervisors reading mid-write files to see empty/partial JSON. Also adds auto-resolution of phantom active incidents: if a service reports status=healthy and its incident's last_occurrence is >30 min old, the incident is resolved in _prune_stale_world. This clears false active incidents accumulated from previous race-condition reads. Co-Authored-By: Claude Sonnet 4.6 --- scripts/observer/observer.py | 42 ++++++++++++++++++++++++++++++------ 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/scripts/observer/observer.py b/scripts/observer/observer.py index ab2432f..0945d28 100644 --- a/scripts/observer/observer.py +++ b/scripts/observer/observer.py @@ -7,6 +7,16 @@ import yaml from datetime import datetime, timezone from pathlib import Path + +def _atomic_write_json(path: Path, data) -> None: + """Write JSON atomically: write to a sibling .tmp, fsync, then os.replace.""" + tmp = path.with_suffix(".tmp") + with open(tmp, "w") as f: + json.dump(data, f, indent=2) + f.flush() + os.fsync(f.fileno()) + os.replace(tmp, path) + # Constants and Paths RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab") EVENTS_DIR = Path(RUNTIME_PATH) / "events" @@ -124,8 +134,7 @@ class Observer: def _save_checkpoint(self): try: - with open(OBSERVER_STATE_FILE, "w") as f: - json.dump({"node_checkpoints": self.node_checkpoints}, f, indent=2) + _atomic_write_json(OBSERVER_STATE_FILE, {"node_checkpoints": self.node_checkpoints}) except Exception as e: logger.error(f"Failed to save checkpoint: {e}") @@ -173,8 +182,30 @@ class Observer: logger.info(f"Pruning ghost (hash-prefixed) service key from world state: {k}") del self.world_state["services"][k] - # Remove resolved incidents older than 7 days. now = time.time() + + # Auto-resolve active incidents for services that are currently healthy + # and whose last_occurrence is older than 30 minutes. These are phantom + # incidents created by race-condition reads of truncated state files; they + # never receive a service_recovered event because the service was healthy + # all along. + for svc_key, svc in self.world_state["services"].items(): + if svc.get("status") == "healthy": + inc_id = svc.get("incident_id") + if inc_id and inc_id in self.world_state["incidents"]: + inc = self.world_state["incidents"][inc_id] + last_occ = inc.get("last_occurrence") or 0 + if (inc.get("status") == "active" + and (now - last_occ) > 1800): + logger.info( + f"Auto-resolving stale incident {inc_id} for {svc_key}: " + f"service healthy, last_occurrence >{int((now - last_occ) / 60)}min ago" + ) + inc["status"] = "resolved" + inc["resolved_at"] = now + svc["incident_id"] = None + + # Remove resolved incidents older than 7 days. stale_incidents = [ k for k, v in self.world_state["incidents"].items() if v.get("status") == "resolved" @@ -202,13 +233,12 @@ class Observer: "services.json": self.world_state["services"], "deployments.json": self.world_state["deployments"], "incidents.json": self.world_state["incidents"], - "recommendations.json": [], # Placeholder to satisfy requirements + "recommendations.json": [], "runtime-summary.json": self.world_state["summary"] } for filename, data in files.items(): try: - with open(WORLD_DIR / filename, "w") as f: - json.dump(data, f, indent=2) + _atomic_write_json(WORLD_DIR / filename, data) except Exception as e: logger.error(f"Failed to save {filename}: {e}")