diff --git a/scripts/observer/observer.py b/scripts/observer/observer.py index ab2432f..0945d28 100644 --- a/scripts/observer/observer.py +++ b/scripts/observer/observer.py @@ -7,6 +7,16 @@ import yaml from datetime import datetime, timezone from pathlib import Path + +def _atomic_write_json(path: Path, data) -> None: + """Write JSON atomically: write to a sibling .tmp, fsync, then os.replace.""" + tmp = path.with_suffix(".tmp") + with open(tmp, "w") as f: + json.dump(data, f, indent=2) + f.flush() + os.fsync(f.fileno()) + os.replace(tmp, path) + # Constants and Paths RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab") EVENTS_DIR = Path(RUNTIME_PATH) / "events" @@ -124,8 +134,7 @@ class Observer: def _save_checkpoint(self): try: - with open(OBSERVER_STATE_FILE, "w") as f: - json.dump({"node_checkpoints": self.node_checkpoints}, f, indent=2) + _atomic_write_json(OBSERVER_STATE_FILE, {"node_checkpoints": self.node_checkpoints}) except Exception as e: logger.error(f"Failed to save checkpoint: {e}") @@ -173,8 +182,30 @@ class Observer: logger.info(f"Pruning ghost (hash-prefixed) service key from world state: {k}") del self.world_state["services"][k] - # Remove resolved incidents older than 7 days. now = time.time() + + # Auto-resolve active incidents for services that are currently healthy + # and whose last_occurrence is older than 30 minutes. These are phantom + # incidents created by race-condition reads of truncated state files; they + # never receive a service_recovered event because the service was healthy + # all along. + for svc_key, svc in self.world_state["services"].items(): + if svc.get("status") == "healthy": + inc_id = svc.get("incident_id") + if inc_id and inc_id in self.world_state["incidents"]: + inc = self.world_state["incidents"][inc_id] + last_occ = inc.get("last_occurrence") or 0 + if (inc.get("status") == "active" + and (now - last_occ) > 1800): + logger.info( + f"Auto-resolving stale incident {inc_id} for {svc_key}: " + f"service healthy, last_occurrence >{int((now - last_occ) / 60)}min ago" + ) + inc["status"] = "resolved" + inc["resolved_at"] = now + svc["incident_id"] = None + + # Remove resolved incidents older than 7 days. stale_incidents = [ k for k, v in self.world_state["incidents"].items() if v.get("status") == "resolved" @@ -202,13 +233,12 @@ class Observer: "services.json": self.world_state["services"], "deployments.json": self.world_state["deployments"], "incidents.json": self.world_state["incidents"], - "recommendations.json": [], # Placeholder to satisfy requirements + "recommendations.json": [], "runtime-summary.json": self.world_state["summary"] } for filename, data in files.items(): try: - with open(WORLD_DIR / filename, "w") as f: - json.dump(data, f, indent=2) + _atomic_write_json(WORLD_DIR / filename, data) except Exception as e: logger.error(f"Failed to save {filename}: {e}")