Compare commits

..

3 commits

Author SHA1 Message Date
Oskar Kapala 98437d46b2 test(control-plane): atomic write and resilient loader coverage
11 new test cases in test_state_reliability.py covering:
- atomic_write_json: produces valid JSON, no .tmp left behind, overwrites,
  works with nested structures
- _load_actual_state: returns False on empty / truncated file, returns True
  on valid files, preserves last-known-good state across a parse failure
- reconcile: empty/truncated services.json or incidents.json generates zero
  actions (skip-cycle semantics proven end-to-end)
- healthy service with valid world state generates no spurious action

All 32 tests (11 new + 21 existing) pass.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-03 12:27:05 +02:00
Oskar Kapala 5e97b4e448 fix(supervisor): atomic writes + skip cycle on unreadable world state
Two independent fixes for the false-alarm storm caused by race-condition
reads of truncated world state files:

1. Atomic writes: _atomic_write_json (write→fsync→os.replace) replaces
   all bare open('w')+json.dump calls in supervisor and executor, so the
   action-file pipeline is never visible in a half-written state.

2. Resilient loader: _load_actual_state now returns False when any world
   state file fails to parse (empty or truncated mid-write). reconcile()
   skips the entire drift check on False instead of treating {} as "all
   services missing". actual_state retains its last-known-good values so
   a single bad cycle does not wipe accumulated context.

   Before: parse error → raw[key]={} → all desired services missing →
     wall of redeploy actions → drift_resolved_auto churn on next cycle.
   After:  parse error → WARNING logged → cycle skipped → no actions.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-03 12:26:59 +02:00
Oskar Kapala ffb0608b9a fix(observer): atomic writes for world state files
All JSON state writes (services.json, nodes.json, incidents.json,
deployments.json, runtime-summary.json, observer_checkpoint.json) now use
_atomic_write_json: write to a .tmp sibling, fsync, then os.replace.
This eliminates the truncated-write window that caused supervisors
reading mid-write files to see empty/partial JSON.

Also adds auto-resolution of phantom active incidents: if a service
reports status=healthy and its incident's last_occurrence is >30 min old,
the incident is resolved in _prune_stale_world. This clears false active
incidents accumulated from previous race-condition reads.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-03 12:26:49 +02:00
4 changed files with 275 additions and 24 deletions

View file

@ -7,6 +7,16 @@ import yaml
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
def _atomic_write_json(path: Path, data) -> None:
"""Write JSON atomically: write to a sibling .tmp, fsync, then os.replace."""
tmp = path.with_suffix(".tmp")
with open(tmp, "w") as f:
json.dump(data, f, indent=2)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, path)
# Constants and Paths # Constants and Paths
RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab") RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab")
EVENTS_DIR = Path(RUNTIME_PATH) / "events" EVENTS_DIR = Path(RUNTIME_PATH) / "events"
@ -124,8 +134,7 @@ class Observer:
def _save_checkpoint(self): def _save_checkpoint(self):
try: try:
with open(OBSERVER_STATE_FILE, "w") as f: _atomic_write_json(OBSERVER_STATE_FILE, {"node_checkpoints": self.node_checkpoints})
json.dump({"node_checkpoints": self.node_checkpoints}, f, indent=2)
except Exception as e: except Exception as e:
logger.error(f"Failed to save checkpoint: {e}") logger.error(f"Failed to save checkpoint: {e}")
@ -173,8 +182,30 @@ class Observer:
logger.info(f"Pruning ghost (hash-prefixed) service key from world state: {k}") logger.info(f"Pruning ghost (hash-prefixed) service key from world state: {k}")
del self.world_state["services"][k] del self.world_state["services"][k]
# Remove resolved incidents older than 7 days.
now = time.time() now = time.time()
# Auto-resolve active incidents for services that are currently healthy
# and whose last_occurrence is older than 30 minutes. These are phantom
# incidents created by race-condition reads of truncated state files; they
# never receive a service_recovered event because the service was healthy
# all along.
for svc_key, svc in self.world_state["services"].items():
if svc.get("status") == "healthy":
inc_id = svc.get("incident_id")
if inc_id and inc_id in self.world_state["incidents"]:
inc = self.world_state["incidents"][inc_id]
last_occ = inc.get("last_occurrence") or 0
if (inc.get("status") == "active"
and (now - last_occ) > 1800):
logger.info(
f"Auto-resolving stale incident {inc_id} for {svc_key}: "
f"service healthy, last_occurrence >{int((now - last_occ) / 60)}min ago"
)
inc["status"] = "resolved"
inc["resolved_at"] = now
svc["incident_id"] = None
# Remove resolved incidents older than 7 days.
stale_incidents = [ stale_incidents = [
k for k, v in self.world_state["incidents"].items() k for k, v in self.world_state["incidents"].items()
if v.get("status") == "resolved" if v.get("status") == "resolved"
@ -202,13 +233,12 @@ class Observer:
"services.json": self.world_state["services"], "services.json": self.world_state["services"],
"deployments.json": self.world_state["deployments"], "deployments.json": self.world_state["deployments"],
"incidents.json": self.world_state["incidents"], "incidents.json": self.world_state["incidents"],
"recommendations.json": [], # Placeholder to satisfy requirements "recommendations.json": [],
"runtime-summary.json": self.world_state["summary"] "runtime-summary.json": self.world_state["summary"]
} }
for filename, data in files.items(): for filename, data in files.items():
try: try:
with open(WORLD_DIR / filename, "w") as f: _atomic_write_json(WORLD_DIR / filename, data)
json.dump(data, f, indent=2)
except Exception as e: except Exception as e:
logger.error(f"Failed to save {filename}: {e}") logger.error(f"Failed to save {filename}: {e}")

View file

@ -5,6 +5,16 @@ import logging
import subprocess import subprocess
from pathlib import Path from pathlib import Path
def _atomic_write_json(path: Path, data) -> None:
"""Write JSON atomically: write to a sibling .tmp, fsync, then os.replace."""
tmp = path.with_suffix(".tmp")
with open(tmp, "w") as f:
json.dump(data, f, indent=2)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, path)
# Constants and Paths # Constants and Paths
RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab") RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab")
ACTIONS_DIR = Path(RUNTIME_PATH) / "actions" ACTIONS_DIR = Path(RUNTIME_PATH) / "actions"
@ -57,8 +67,7 @@ class Executor:
data = json.load(f) data = json.load(f)
data["status"] = "running" data["status"] = "running"
data["started_at"] = time.time() data["started_at"] = time.time()
with open(running_path, "w") as f: _atomic_write_json(running_path, data)
json.dump(data, f, indent=2)
action_file.unlink() action_file.unlink()
except Exception as e: except Exception as e:
logger.error(f"Failed to move {action_id} to running: {e}") logger.error(f"Failed to move {action_id} to running: {e}")
@ -121,8 +130,7 @@ class Executor:
data["finished_at"] = time.time() data["finished_at"] = time.time()
if not success: if not success:
data["error"] = error_msg data["error"] = error_msg
with open(target_path, "w") as f: _atomic_write_json(target_path, data)
json.dump(data, f, indent=2)
running_path.unlink() running_path.unlink()
logger.info(f"Action {action_id} {target_status}") logger.info(f"Action {action_id} {target_status}")
except Exception as e: except Exception as e:

View file

@ -5,6 +5,16 @@ import logging
import yaml import yaml
from pathlib import Path from pathlib import Path
def _atomic_write_json(path: Path, data) -> None:
"""Write JSON atomically: write to a sibling .tmp, fsync, then os.replace."""
tmp = path.with_suffix(".tmp")
with open(tmp, "w") as f:
json.dump(data, f, indent=2)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, path)
# Constants and Paths # Constants and Paths
RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab") RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab")
WORLD_DIR = Path(RUNTIME_PATH) / "world" WORLD_DIR = Path(RUNTIME_PATH) / "world"
@ -175,7 +185,11 @@ class Supervisor:
logger.error(f"Failed to load {svc_file}: {e}") logger.error(f"Failed to load {svc_file}: {e}")
self.desired_state["services"] = services self.desired_state["services"] = services
def _load_actual_state(self): def _load_actual_state(self) -> bool:
"""Load world state from disk. Returns False if any file is unreadable
(empty / mid-write truncation), in which case actual_state is NOT updated
so the caller can skip this reconcile cycle rather than treating missing
data as a real drift signal."""
files = { files = {
"services": WORLD_DIR / "services.json", "services": WORLD_DIR / "services.json",
"nodes": WORLD_DIR / "nodes.json", "nodes": WORLD_DIR / "nodes.json",
@ -188,8 +202,11 @@ class Supervisor:
with open(path, "r") as f: with open(path, "r") as f:
raw[key] = json.load(f) raw[key] = json.load(f)
except Exception as e: except Exception as e:
logger.error(f"Failed to load {key} actual state: {e}") logger.warning(
raw[key] = {} f"World state {path.name} unreadable (truncated write?): {e} "
f"— skipping reconcile cycle, keeping last known state"
)
return False
else: else:
raw[key] = {} raw[key] = {}
@ -219,6 +236,7 @@ class Supervisor:
self.actual_state["services"] = normalized_services self.actual_state["services"] = normalized_services
self.actual_state["nodes"] = raw.get("nodes", {}) self.actual_state["nodes"] = raw.get("nodes", {})
self.actual_state["incidents"] = normalized_incidents self.actual_state["incidents"] = normalized_incidents
return True
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Incident helpers # Incident helpers
@ -252,7 +270,8 @@ class Supervisor:
logger.error(f"Failed to touch heartbeat file: {e}") logger.error(f"Failed to touch heartbeat file: {e}")
self._load_desired_state() self._load_desired_state()
self._load_actual_state() if not self._load_actual_state():
return # world state unreadable this cycle — skip to avoid false drift
drifts = [] drifts = []
@ -375,8 +394,7 @@ class Supervisor:
action_path = ACTIONS_DIR / "pending" / f"{action_id}.json" action_path = ACTIONS_DIR / "pending" / f"{action_id}.json"
try: try:
with open(action_path, "w") as f: _atomic_write_json(action_path, action)
json.dump(action, f, indent=2)
logger.info( logger.info(
f"Generated recommendation: {action_id} " f"Generated recommendation: {action_id} "
f"(type={action['type']}, risk={action['risk_level']})" f"(type={action['type']}, risk={action['risk_level']})"
@ -428,8 +446,7 @@ class Supervisor:
action_path = ACTIONS_DIR / "pending" / f"{action_id}.json" action_path = ACTIONS_DIR / "pending" / f"{action_id}.json"
try: try:
with open(action_path, "w") as f: _atomic_write_json(action_path, action)
json.dump(action, f, indent=2)
logger.info( logger.info(
f"Generated disk cleanup recommendation: {action_id} " f"Generated disk cleanup recommendation: {action_id} "
f"(node={node}, risk=guarded)" f"(node={node}, risk=guarded)"
@ -494,8 +511,7 @@ class Supervisor:
action["status"] = "cancelled" action["status"] = "cancelled"
action["cancelled_reason"] = cancel_reason action["cancelled_reason"] = cancel_reason
action["cancelled_at"] = time.time() action["cancelled_at"] = time.time()
with open(dest, "w") as f: _atomic_write_json(dest, action)
json.dump(action, f, indent=2)
action_file.unlink() action_file.unlink()
logger.info( logger.info(
f"Auto-cancelled {action_file.name}: " f"Auto-cancelled {action_file.name}: "
@ -725,8 +741,7 @@ class Supervisor:
action["status"] = "cancelled" action["status"] = "cancelled"
action["cancelled_reason"] = "ha_websocket_recovered" action["cancelled_reason"] = "ha_websocket_recovered"
action["cancelled_at"] = time.time() action["cancelled_at"] = time.time()
with open(dest, "w") as f: _atomic_write_json(dest, action)
json.dump(action, f, indent=2)
pending_path.unlink() pending_path.unlink()
logger.info(f"Cancelled {action_id}: ha_websocket_recovered on {node}") logger.info(f"Cancelled {action_id}: ha_websocket_recovered on {node}")
except Exception as e: except Exception as e:
@ -736,8 +751,7 @@ class Supervisor:
action_id = action["action_id"] action_id = action["action_id"]
action_path = ACTIONS_DIR / "pending" / f"{action_id}.json" action_path = ACTIONS_DIR / "pending" / f"{action_id}.json"
try: try:
with open(action_path, "w") as f: _atomic_write_json(action_path, action)
json.dump(action, f, indent=2)
logger.info( logger.info(
f"Generated HA action: {action_id} " f"Generated HA action: {action_id} "
f"(type={action['type']}, risk={action['risk_level']})" f"(type={action['type']}, risk={action['risk_level']})"

View file

@ -0,0 +1,199 @@
"""Tests for atomic writes and resilient world-state loading in the supervisor."""
from __future__ import annotations
import json
import sys
import time
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
import supervisor as supervisor_module
from supervisor import Supervisor, _atomic_write_json
# ---------------------------------------------------------------------------
# Helpers (reused from test_supervisor_ha)
# ---------------------------------------------------------------------------
def _setup_supervisor(tmp_path: Path, monkeypatch) -> Supervisor:
actions = tmp_path / "actions"
events = tmp_path / "events"
world = tmp_path / "world"
repo = tmp_path / "repo"
for d in (actions, events, world, repo / "hosts"):
d.mkdir(parents=True, exist_ok=True)
monkeypatch.setattr(supervisor_module, "ACTIONS_DIR", actions)
monkeypatch.setattr(supervisor_module, "EVENTS_DIR", events)
monkeypatch.setattr(supervisor_module, "WORLD_DIR", world)
monkeypatch.setattr(supervisor_module, "REPO_ROOT", repo)
sup = Supervisor()
sup.desired_state = {"services": {}}
sup.actual_state = {"services": {}, "nodes": {}, "incidents": {}}
return sup
# ---------------------------------------------------------------------------
# 1. atomic_write_json correctness
# ---------------------------------------------------------------------------
def test_atomic_write_json_produces_valid_json(tmp_path):
path = tmp_path / "out.json"
data = {"services": {"vps/outline": {"status": "healthy"}}, "count": 42}
_atomic_write_json(path, data)
assert path.exists(), "output file must exist after atomic write"
loaded = json.loads(path.read_text())
assert loaded == data
def test_atomic_write_json_no_tmp_left_behind(tmp_path):
path = tmp_path / "world.json"
_atomic_write_json(path, {"ok": True})
tmp = path.with_suffix(".tmp")
assert not tmp.exists(), ".tmp must be cleaned up by os.replace"
def test_atomic_write_json_overwrites_existing(tmp_path):
path = tmp_path / "state.json"
path.write_text('{"old": true}')
_atomic_write_json(path, {"new": True})
assert json.loads(path.read_text()) == {"new": True}
def test_atomic_write_json_nested_structure(tmp_path):
path = tmp_path / "complex.json"
data = {
"nodes": {"vps": {"status": "online", "disk_usage_pct": 42}},
"incidents": {},
"list": [1, 2, 3],
}
_atomic_write_json(path, data)
assert json.loads(path.read_text()) == data
# ---------------------------------------------------------------------------
# 2. Resilient loader: empty / truncated file → skip cycle, no drift
# ---------------------------------------------------------------------------
def _populate_desired(sup: Supervisor, svc_key: str = "vps/outline"):
node, service = svc_key.split("/", 1)
sup.desired_state["services"][svc_key] = {
"node": node,
"service": service,
"desired": "running",
}
def test_empty_services_json_skips_reconcile(tmp_path, monkeypatch):
"""Empty services.json (truncated write) must not generate any redeploy action."""
sup = _setup_supervisor(tmp_path, monkeypatch)
_populate_desired(sup)
# Write empty services.json — simulates a mid-write truncation
(tmp_path / "world" / "services.json").write_text("")
(tmp_path / "world" / "nodes.json").write_text("{}")
(tmp_path / "world" / "incidents.json").write_text("{}")
sup.reconcile()
pending = list((tmp_path / "actions" / "pending").glob("*.json"))
assert pending == [], f"No actions should be generated on empty state file, got: {[p.name for p in pending]}"
def test_truncated_services_json_skips_reconcile(tmp_path, monkeypatch):
"""Partially-written (truncated mid-write) JSON must not generate any action."""
sup = _setup_supervisor(tmp_path, monkeypatch)
_populate_desired(sup)
(tmp_path / "world" / "services.json").write_text('{"vps/outline": {"status": "hea')
(tmp_path / "world" / "nodes.json").write_text("{}")
(tmp_path / "world" / "incidents.json").write_text("{}")
sup.reconcile()
pending = list((tmp_path / "actions" / "pending").glob("*.json"))
assert pending == [], f"No actions expected on truncated state, got: {[p.name for p in pending]}"
def test_empty_incidents_json_skips_reconcile(tmp_path, monkeypatch):
"""Empty incidents.json (any world-state file failing) skips full cycle."""
sup = _setup_supervisor(tmp_path, monkeypatch)
_populate_desired(sup)
(tmp_path / "world" / "services.json").write_text("{}")
(tmp_path / "world" / "nodes.json").write_text("{}")
(tmp_path / "world" / "incidents.json").write_text("")
sup.reconcile()
pending = list((tmp_path / "actions" / "pending").glob("*.json"))
assert pending == [], f"No actions expected when any state file is unreadable, got: {[p.name for p in pending]}"
def test_load_actual_state_returns_false_on_empty_file(tmp_path, monkeypatch):
"""_load_actual_state must return False (not raise) when a file is empty."""
sup = _setup_supervisor(tmp_path, monkeypatch)
(tmp_path / "world" / "services.json").write_text("")
(tmp_path / "world" / "nodes.json").write_text("{}")
(tmp_path / "world" / "incidents.json").write_text("{}")
result = sup._load_actual_state()
assert result is False
def test_load_actual_state_returns_true_on_valid_files(tmp_path, monkeypatch):
"""_load_actual_state returns True and populates actual_state on valid files."""
sup = _setup_supervisor(tmp_path, monkeypatch)
services = {"vps/outline": {"node": "vps", "service": "outline", "status": "healthy"}}
(tmp_path / "world" / "services.json").write_text(json.dumps(services))
(tmp_path / "world" / "nodes.json").write_text('{"vps": {"status": "online"}}')
(tmp_path / "world" / "incidents.json").write_text("{}")
result = sup._load_actual_state()
assert result is True
assert "vps/outline" in sup.actual_state["services"]
def test_parse_failure_preserves_last_known_good_state(tmp_path, monkeypatch):
"""When a file becomes unreadable, actual_state retains the previous good values."""
sup = _setup_supervisor(tmp_path, monkeypatch)
# First successful load
services = {"vps/outline": {"node": "vps", "service": "outline", "status": "healthy"}}
(tmp_path / "world" / "services.json").write_text(json.dumps(services))
(tmp_path / "world" / "nodes.json").write_text("{}")
(tmp_path / "world" / "incidents.json").write_text("{}")
assert sup._load_actual_state() is True
assert "vps/outline" in sup.actual_state["services"]
# File becomes empty (race condition)
(tmp_path / "world" / "services.json").write_text("")
assert sup._load_actual_state() is False
# State must be unchanged from the previous good load
assert "vps/outline" in sup.actual_state["services"], \
"Last-known-good state must be preserved on parse failure"
def test_healthy_service_does_not_generate_action(tmp_path, monkeypatch):
"""A desired service that appears healthy in world state generates no action."""
sup = _setup_supervisor(tmp_path, monkeypatch)
_populate_desired(sup)
services = {"vps/outline": {"node": "vps", "service": "outline", "status": "healthy"}}
(tmp_path / "world" / "services.json").write_text(json.dumps(services))
(tmp_path / "world" / "nodes.json").write_text("{}")
(tmp_path / "world" / "incidents.json").write_text("{}")
sup.reconcile()
pending = list((tmp_path / "actions" / "pending").glob("*.json"))
assert pending == [], "Healthy service must not generate any action"