From 5e97b4e4488d93c28185a0f51f9dfe48a4543141 Mon Sep 17 00:00:00 2001 From: Oskar Kapala Date: Wed, 3 Jun 2026 12:26:59 +0200 Subject: [PATCH] fix(supervisor): atomic writes + skip cycle on unreadable world state MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two independent fixes for the false-alarm storm caused by race-condition reads of truncated world state files: 1. Atomic writes: _atomic_write_json (write→fsync→os.replace) replaces all bare open('w')+json.dump calls in supervisor and executor, so the action-file pipeline is never visible in a half-written state. 2. Resilient loader: _load_actual_state now returns False when any world state file fails to parse (empty or truncated mid-write). reconcile() skips the entire drift check on False instead of treating {} as "all services missing". actual_state retains its last-known-good values so a single bad cycle does not wipe accumulated context. Before: parse error → raw[key]={} → all desired services missing → wall of redeploy actions → drift_resolved_auto churn on next cycle. After: parse error → WARNING logged → cycle skipped → no actions. Co-Authored-By: Claude Sonnet 4.6 --- services/control-plane/src/executor.py | 16 ++++++--- services/control-plane/src/supervisor.py | 42 ++++++++++++++++-------- 2 files changed, 40 insertions(+), 18 deletions(-) diff --git a/services/control-plane/src/executor.py b/services/control-plane/src/executor.py index 162e9ad..642f721 100644 --- a/services/control-plane/src/executor.py +++ b/services/control-plane/src/executor.py @@ -5,6 +5,16 @@ import logging import subprocess from pathlib import Path + +def _atomic_write_json(path: Path, data) -> None: + """Write JSON atomically: write to a sibling .tmp, fsync, then os.replace.""" + tmp = path.with_suffix(".tmp") + with open(tmp, "w") as f: + json.dump(data, f, indent=2) + f.flush() + os.fsync(f.fileno()) + os.replace(tmp, path) + # Constants and Paths RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab") ACTIONS_DIR = Path(RUNTIME_PATH) / "actions" @@ -57,8 +67,7 @@ class Executor: data = json.load(f) data["status"] = "running" data["started_at"] = time.time() - with open(running_path, "w") as f: - json.dump(data, f, indent=2) + _atomic_write_json(running_path, data) action_file.unlink() except Exception as e: logger.error(f"Failed to move {action_id} to running: {e}") @@ -121,8 +130,7 @@ class Executor: data["finished_at"] = time.time() if not success: data["error"] = error_msg - with open(target_path, "w") as f: - json.dump(data, f, indent=2) + _atomic_write_json(target_path, data) running_path.unlink() logger.info(f"Action {action_id} {target_status}") except Exception as e: diff --git a/services/control-plane/src/supervisor.py b/services/control-plane/src/supervisor.py index a0f8c97..a7db0b4 100644 --- a/services/control-plane/src/supervisor.py +++ b/services/control-plane/src/supervisor.py @@ -5,6 +5,16 @@ import logging import yaml from pathlib import Path + +def _atomic_write_json(path: Path, data) -> None: + """Write JSON atomically: write to a sibling .tmp, fsync, then os.replace.""" + tmp = path.with_suffix(".tmp") + with open(tmp, "w") as f: + json.dump(data, f, indent=2) + f.flush() + os.fsync(f.fileno()) + os.replace(tmp, path) + # Constants and Paths RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab") WORLD_DIR = Path(RUNTIME_PATH) / "world" @@ -175,7 +185,11 @@ class Supervisor: logger.error(f"Failed to load {svc_file}: {e}") self.desired_state["services"] = services - def _load_actual_state(self): + def _load_actual_state(self) -> bool: + """Load world state from disk. Returns False if any file is unreadable + (empty / mid-write truncation), in which case actual_state is NOT updated + so the caller can skip this reconcile cycle rather than treating missing + data as a real drift signal.""" files = { "services": WORLD_DIR / "services.json", "nodes": WORLD_DIR / "nodes.json", @@ -188,8 +202,11 @@ class Supervisor: with open(path, "r") as f: raw[key] = json.load(f) except Exception as e: - logger.error(f"Failed to load {key} actual state: {e}") - raw[key] = {} + logger.warning( + f"World state {path.name} unreadable (truncated write?): {e} " + f"— skipping reconcile cycle, keeping last known state" + ) + return False else: raw[key] = {} @@ -219,6 +236,7 @@ class Supervisor: self.actual_state["services"] = normalized_services self.actual_state["nodes"] = raw.get("nodes", {}) self.actual_state["incidents"] = normalized_incidents + return True # ------------------------------------------------------------------ # Incident helpers @@ -252,7 +270,8 @@ class Supervisor: logger.error(f"Failed to touch heartbeat file: {e}") self._load_desired_state() - self._load_actual_state() + if not self._load_actual_state(): + return # world state unreadable this cycle — skip to avoid false drift drifts = [] @@ -375,8 +394,7 @@ class Supervisor: action_path = ACTIONS_DIR / "pending" / f"{action_id}.json" try: - with open(action_path, "w") as f: - json.dump(action, f, indent=2) + _atomic_write_json(action_path, action) logger.info( f"Generated recommendation: {action_id} " f"(type={action['type']}, risk={action['risk_level']})" @@ -428,8 +446,7 @@ class Supervisor: action_path = ACTIONS_DIR / "pending" / f"{action_id}.json" try: - with open(action_path, "w") as f: - json.dump(action, f, indent=2) + _atomic_write_json(action_path, action) logger.info( f"Generated disk cleanup recommendation: {action_id} " f"(node={node}, risk=guarded)" @@ -494,8 +511,7 @@ class Supervisor: action["status"] = "cancelled" action["cancelled_reason"] = cancel_reason action["cancelled_at"] = time.time() - with open(dest, "w") as f: - json.dump(action, f, indent=2) + _atomic_write_json(dest, action) action_file.unlink() logger.info( f"Auto-cancelled {action_file.name}: " @@ -725,8 +741,7 @@ class Supervisor: action["status"] = "cancelled" action["cancelled_reason"] = "ha_websocket_recovered" action["cancelled_at"] = time.time() - with open(dest, "w") as f: - json.dump(action, f, indent=2) + _atomic_write_json(dest, action) pending_path.unlink() logger.info(f"Cancelled {action_id}: ha_websocket_recovered on {node}") except Exception as e: @@ -736,8 +751,7 @@ class Supervisor: action_id = action["action_id"] action_path = ACTIONS_DIR / "pending" / f"{action_id}.json" try: - with open(action_path, "w") as f: - json.dump(action, f, indent=2) + _atomic_write_json(action_path, action) logger.info( f"Generated HA action: {action_id} " f"(type={action['type']}, risk={action['risk_level']})"