fix(supervisor): atomic writes + skip cycle on unreadable world state

Two independent fixes for the false-alarm storm caused by race-condition
reads of truncated world state files:

1. Atomic writes: _atomic_write_json (write→fsync→os.replace) replaces
   all bare open('w')+json.dump calls in supervisor and executor, so the
   action-file pipeline is never visible in a half-written state.

2. Resilient loader: _load_actual_state now returns False when any world
   state file fails to parse (empty or truncated mid-write). reconcile()
   skips the entire drift check on False instead of treating {} as "all
   services missing". actual_state retains its last-known-good values so
   a single bad cycle does not wipe accumulated context.

   Before: parse error → raw[key]={} → all desired services missing →
     wall of redeploy actions → drift_resolved_auto churn on next cycle.
   After:  parse error → WARNING logged → cycle skipped → no actions.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Oskar Kapala 2026-06-03 12:26:59 +02:00
parent ffb0608b9a
commit 5e97b4e448
2 changed files with 40 additions and 18 deletions

View file

@ -5,6 +5,16 @@ import logging
import subprocess import subprocess
from pathlib import Path from pathlib import Path
def _atomic_write_json(path: Path, data) -> None:
"""Write JSON atomically: write to a sibling .tmp, fsync, then os.replace."""
tmp = path.with_suffix(".tmp")
with open(tmp, "w") as f:
json.dump(data, f, indent=2)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, path)
# Constants and Paths # Constants and Paths
RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab") RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab")
ACTIONS_DIR = Path(RUNTIME_PATH) / "actions" ACTIONS_DIR = Path(RUNTIME_PATH) / "actions"
@ -57,8 +67,7 @@ class Executor:
data = json.load(f) data = json.load(f)
data["status"] = "running" data["status"] = "running"
data["started_at"] = time.time() data["started_at"] = time.time()
with open(running_path, "w") as f: _atomic_write_json(running_path, data)
json.dump(data, f, indent=2)
action_file.unlink() action_file.unlink()
except Exception as e: except Exception as e:
logger.error(f"Failed to move {action_id} to running: {e}") logger.error(f"Failed to move {action_id} to running: {e}")
@ -121,8 +130,7 @@ class Executor:
data["finished_at"] = time.time() data["finished_at"] = time.time()
if not success: if not success:
data["error"] = error_msg data["error"] = error_msg
with open(target_path, "w") as f: _atomic_write_json(target_path, data)
json.dump(data, f, indent=2)
running_path.unlink() running_path.unlink()
logger.info(f"Action {action_id} {target_status}") logger.info(f"Action {action_id} {target_status}")
except Exception as e: except Exception as e:

View file

@ -5,6 +5,16 @@ import logging
import yaml import yaml
from pathlib import Path from pathlib import Path
def _atomic_write_json(path: Path, data) -> None:
"""Write JSON atomically: write to a sibling .tmp, fsync, then os.replace."""
tmp = path.with_suffix(".tmp")
with open(tmp, "w") as f:
json.dump(data, f, indent=2)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, path)
# Constants and Paths # Constants and Paths
RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab") RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab")
WORLD_DIR = Path(RUNTIME_PATH) / "world" WORLD_DIR = Path(RUNTIME_PATH) / "world"
@ -175,7 +185,11 @@ class Supervisor:
logger.error(f"Failed to load {svc_file}: {e}") logger.error(f"Failed to load {svc_file}: {e}")
self.desired_state["services"] = services self.desired_state["services"] = services
def _load_actual_state(self): def _load_actual_state(self) -> bool:
"""Load world state from disk. Returns False if any file is unreadable
(empty / mid-write truncation), in which case actual_state is NOT updated
so the caller can skip this reconcile cycle rather than treating missing
data as a real drift signal."""
files = { files = {
"services": WORLD_DIR / "services.json", "services": WORLD_DIR / "services.json",
"nodes": WORLD_DIR / "nodes.json", "nodes": WORLD_DIR / "nodes.json",
@ -188,8 +202,11 @@ class Supervisor:
with open(path, "r") as f: with open(path, "r") as f:
raw[key] = json.load(f) raw[key] = json.load(f)
except Exception as e: except Exception as e:
logger.error(f"Failed to load {key} actual state: {e}") logger.warning(
raw[key] = {} f"World state {path.name} unreadable (truncated write?): {e} "
f"— skipping reconcile cycle, keeping last known state"
)
return False
else: else:
raw[key] = {} raw[key] = {}
@ -219,6 +236,7 @@ class Supervisor:
self.actual_state["services"] = normalized_services self.actual_state["services"] = normalized_services
self.actual_state["nodes"] = raw.get("nodes", {}) self.actual_state["nodes"] = raw.get("nodes", {})
self.actual_state["incidents"] = normalized_incidents self.actual_state["incidents"] = normalized_incidents
return True
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Incident helpers # Incident helpers
@ -252,7 +270,8 @@ class Supervisor:
logger.error(f"Failed to touch heartbeat file: {e}") logger.error(f"Failed to touch heartbeat file: {e}")
self._load_desired_state() self._load_desired_state()
self._load_actual_state() if not self._load_actual_state():
return # world state unreadable this cycle — skip to avoid false drift
drifts = [] drifts = []
@ -375,8 +394,7 @@ class Supervisor:
action_path = ACTIONS_DIR / "pending" / f"{action_id}.json" action_path = ACTIONS_DIR / "pending" / f"{action_id}.json"
try: try:
with open(action_path, "w") as f: _atomic_write_json(action_path, action)
json.dump(action, f, indent=2)
logger.info( logger.info(
f"Generated recommendation: {action_id} " f"Generated recommendation: {action_id} "
f"(type={action['type']}, risk={action['risk_level']})" f"(type={action['type']}, risk={action['risk_level']})"
@ -428,8 +446,7 @@ class Supervisor:
action_path = ACTIONS_DIR / "pending" / f"{action_id}.json" action_path = ACTIONS_DIR / "pending" / f"{action_id}.json"
try: try:
with open(action_path, "w") as f: _atomic_write_json(action_path, action)
json.dump(action, f, indent=2)
logger.info( logger.info(
f"Generated disk cleanup recommendation: {action_id} " f"Generated disk cleanup recommendation: {action_id} "
f"(node={node}, risk=guarded)" f"(node={node}, risk=guarded)"
@ -494,8 +511,7 @@ class Supervisor:
action["status"] = "cancelled" action["status"] = "cancelled"
action["cancelled_reason"] = cancel_reason action["cancelled_reason"] = cancel_reason
action["cancelled_at"] = time.time() action["cancelled_at"] = time.time()
with open(dest, "w") as f: _atomic_write_json(dest, action)
json.dump(action, f, indent=2)
action_file.unlink() action_file.unlink()
logger.info( logger.info(
f"Auto-cancelled {action_file.name}: " f"Auto-cancelled {action_file.name}: "
@ -725,8 +741,7 @@ class Supervisor:
action["status"] = "cancelled" action["status"] = "cancelled"
action["cancelled_reason"] = "ha_websocket_recovered" action["cancelled_reason"] = "ha_websocket_recovered"
action["cancelled_at"] = time.time() action["cancelled_at"] = time.time()
with open(dest, "w") as f: _atomic_write_json(dest, action)
json.dump(action, f, indent=2)
pending_path.unlink() pending_path.unlink()
logger.info(f"Cancelled {action_id}: ha_websocket_recovered on {node}") logger.info(f"Cancelled {action_id}: ha_websocket_recovered on {node}")
except Exception as e: except Exception as e:
@ -736,8 +751,7 @@ class Supervisor:
action_id = action["action_id"] action_id = action["action_id"]
action_path = ACTIONS_DIR / "pending" / f"{action_id}.json" action_path = ACTIONS_DIR / "pending" / f"{action_id}.json"
try: try:
with open(action_path, "w") as f: _atomic_write_json(action_path, action)
json.dump(action, f, indent=2)
logger.info( logger.info(
f"Generated HA action: {action_id} " f"Generated HA action: {action_id} "
f"(type={action['type']}, risk={action['risk_level']})" f"(type={action['type']}, risk={action['risk_level']})"