Compare commits
3 commits
f381023206
...
98437d46b2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
98437d46b2 | ||
|
|
5e97b4e448 | ||
|
|
ffb0608b9a |
|
|
@ -7,6 +7,16 @@ import yaml
|
|||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def _atomic_write_json(path: Path, data) -> None:
|
||||
"""Write JSON atomically: write to a sibling .tmp, fsync, then os.replace."""
|
||||
tmp = path.with_suffix(".tmp")
|
||||
with open(tmp, "w") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
os.replace(tmp, path)
|
||||
|
||||
# Constants and Paths
|
||||
RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab")
|
||||
EVENTS_DIR = Path(RUNTIME_PATH) / "events"
|
||||
|
|
@ -124,8 +134,7 @@ class Observer:
|
|||
|
||||
def _save_checkpoint(self):
|
||||
try:
|
||||
with open(OBSERVER_STATE_FILE, "w") as f:
|
||||
json.dump({"node_checkpoints": self.node_checkpoints}, f, indent=2)
|
||||
_atomic_write_json(OBSERVER_STATE_FILE, {"node_checkpoints": self.node_checkpoints})
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save checkpoint: {e}")
|
||||
|
||||
|
|
@ -173,8 +182,30 @@ class Observer:
|
|||
logger.info(f"Pruning ghost (hash-prefixed) service key from world state: {k}")
|
||||
del self.world_state["services"][k]
|
||||
|
||||
# Remove resolved incidents older than 7 days.
|
||||
now = time.time()
|
||||
|
||||
# Auto-resolve active incidents for services that are currently healthy
|
||||
# and whose last_occurrence is older than 30 minutes. These are phantom
|
||||
# incidents created by race-condition reads of truncated state files; they
|
||||
# never receive a service_recovered event because the service was healthy
|
||||
# all along.
|
||||
for svc_key, svc in self.world_state["services"].items():
|
||||
if svc.get("status") == "healthy":
|
||||
inc_id = svc.get("incident_id")
|
||||
if inc_id and inc_id in self.world_state["incidents"]:
|
||||
inc = self.world_state["incidents"][inc_id]
|
||||
last_occ = inc.get("last_occurrence") or 0
|
||||
if (inc.get("status") == "active"
|
||||
and (now - last_occ) > 1800):
|
||||
logger.info(
|
||||
f"Auto-resolving stale incident {inc_id} for {svc_key}: "
|
||||
f"service healthy, last_occurrence >{int((now - last_occ) / 60)}min ago"
|
||||
)
|
||||
inc["status"] = "resolved"
|
||||
inc["resolved_at"] = now
|
||||
svc["incident_id"] = None
|
||||
|
||||
# Remove resolved incidents older than 7 days.
|
||||
stale_incidents = [
|
||||
k for k, v in self.world_state["incidents"].items()
|
||||
if v.get("status") == "resolved"
|
||||
|
|
@ -202,13 +233,12 @@ class Observer:
|
|||
"services.json": self.world_state["services"],
|
||||
"deployments.json": self.world_state["deployments"],
|
||||
"incidents.json": self.world_state["incidents"],
|
||||
"recommendations.json": [], # Placeholder to satisfy requirements
|
||||
"recommendations.json": [],
|
||||
"runtime-summary.json": self.world_state["summary"]
|
||||
}
|
||||
for filename, data in files.items():
|
||||
try:
|
||||
with open(WORLD_DIR / filename, "w") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
_atomic_write_json(WORLD_DIR / filename, data)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save {filename}: {e}")
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,16 @@ import logging
|
|||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def _atomic_write_json(path: Path, data) -> None:
|
||||
"""Write JSON atomically: write to a sibling .tmp, fsync, then os.replace."""
|
||||
tmp = path.with_suffix(".tmp")
|
||||
with open(tmp, "w") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
os.replace(tmp, path)
|
||||
|
||||
# Constants and Paths
|
||||
RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab")
|
||||
ACTIONS_DIR = Path(RUNTIME_PATH) / "actions"
|
||||
|
|
@ -57,8 +67,7 @@ class Executor:
|
|||
data = json.load(f)
|
||||
data["status"] = "running"
|
||||
data["started_at"] = time.time()
|
||||
with open(running_path, "w") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
_atomic_write_json(running_path, data)
|
||||
action_file.unlink()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to move {action_id} to running: {e}")
|
||||
|
|
@ -121,8 +130,7 @@ class Executor:
|
|||
data["finished_at"] = time.time()
|
||||
if not success:
|
||||
data["error"] = error_msg
|
||||
with open(target_path, "w") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
_atomic_write_json(target_path, data)
|
||||
running_path.unlink()
|
||||
logger.info(f"Action {action_id} {target_status}")
|
||||
except Exception as e:
|
||||
|
|
|
|||
|
|
@ -5,6 +5,16 @@ import logging
|
|||
import yaml
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def _atomic_write_json(path: Path, data) -> None:
|
||||
"""Write JSON atomically: write to a sibling .tmp, fsync, then os.replace."""
|
||||
tmp = path.with_suffix(".tmp")
|
||||
with open(tmp, "w") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
os.replace(tmp, path)
|
||||
|
||||
# Constants and Paths
|
||||
RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab")
|
||||
WORLD_DIR = Path(RUNTIME_PATH) / "world"
|
||||
|
|
@ -175,7 +185,11 @@ class Supervisor:
|
|||
logger.error(f"Failed to load {svc_file}: {e}")
|
||||
self.desired_state["services"] = services
|
||||
|
||||
def _load_actual_state(self):
|
||||
def _load_actual_state(self) -> bool:
|
||||
"""Load world state from disk. Returns False if any file is unreadable
|
||||
(empty / mid-write truncation), in which case actual_state is NOT updated
|
||||
so the caller can skip this reconcile cycle rather than treating missing
|
||||
data as a real drift signal."""
|
||||
files = {
|
||||
"services": WORLD_DIR / "services.json",
|
||||
"nodes": WORLD_DIR / "nodes.json",
|
||||
|
|
@ -188,8 +202,11 @@ class Supervisor:
|
|||
with open(path, "r") as f:
|
||||
raw[key] = json.load(f)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load {key} actual state: {e}")
|
||||
raw[key] = {}
|
||||
logger.warning(
|
||||
f"World state {path.name} unreadable (truncated write?): {e} "
|
||||
f"— skipping reconcile cycle, keeping last known state"
|
||||
)
|
||||
return False
|
||||
else:
|
||||
raw[key] = {}
|
||||
|
||||
|
|
@ -219,6 +236,7 @@ class Supervisor:
|
|||
self.actual_state["services"] = normalized_services
|
||||
self.actual_state["nodes"] = raw.get("nodes", {})
|
||||
self.actual_state["incidents"] = normalized_incidents
|
||||
return True
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Incident helpers
|
||||
|
|
@ -252,7 +270,8 @@ class Supervisor:
|
|||
logger.error(f"Failed to touch heartbeat file: {e}")
|
||||
|
||||
self._load_desired_state()
|
||||
self._load_actual_state()
|
||||
if not self._load_actual_state():
|
||||
return # world state unreadable this cycle — skip to avoid false drift
|
||||
|
||||
drifts = []
|
||||
|
||||
|
|
@ -375,8 +394,7 @@ class Supervisor:
|
|||
|
||||
action_path = ACTIONS_DIR / "pending" / f"{action_id}.json"
|
||||
try:
|
||||
with open(action_path, "w") as f:
|
||||
json.dump(action, f, indent=2)
|
||||
_atomic_write_json(action_path, action)
|
||||
logger.info(
|
||||
f"Generated recommendation: {action_id} "
|
||||
f"(type={action['type']}, risk={action['risk_level']})"
|
||||
|
|
@ -428,8 +446,7 @@ class Supervisor:
|
|||
|
||||
action_path = ACTIONS_DIR / "pending" / f"{action_id}.json"
|
||||
try:
|
||||
with open(action_path, "w") as f:
|
||||
json.dump(action, f, indent=2)
|
||||
_atomic_write_json(action_path, action)
|
||||
logger.info(
|
||||
f"Generated disk cleanup recommendation: {action_id} "
|
||||
f"(node={node}, risk=guarded)"
|
||||
|
|
@ -494,8 +511,7 @@ class Supervisor:
|
|||
action["status"] = "cancelled"
|
||||
action["cancelled_reason"] = cancel_reason
|
||||
action["cancelled_at"] = time.time()
|
||||
with open(dest, "w") as f:
|
||||
json.dump(action, f, indent=2)
|
||||
_atomic_write_json(dest, action)
|
||||
action_file.unlink()
|
||||
logger.info(
|
||||
f"Auto-cancelled {action_file.name}: "
|
||||
|
|
@ -725,8 +741,7 @@ class Supervisor:
|
|||
action["status"] = "cancelled"
|
||||
action["cancelled_reason"] = "ha_websocket_recovered"
|
||||
action["cancelled_at"] = time.time()
|
||||
with open(dest, "w") as f:
|
||||
json.dump(action, f, indent=2)
|
||||
_atomic_write_json(dest, action)
|
||||
pending_path.unlink()
|
||||
logger.info(f"Cancelled {action_id}: ha_websocket_recovered on {node}")
|
||||
except Exception as e:
|
||||
|
|
@ -736,8 +751,7 @@ class Supervisor:
|
|||
action_id = action["action_id"]
|
||||
action_path = ACTIONS_DIR / "pending" / f"{action_id}.json"
|
||||
try:
|
||||
with open(action_path, "w") as f:
|
||||
json.dump(action, f, indent=2)
|
||||
_atomic_write_json(action_path, action)
|
||||
logger.info(
|
||||
f"Generated HA action: {action_id} "
|
||||
f"(type={action['type']}, risk={action['risk_level']})"
|
||||
|
|
|
|||
199
services/control-plane/tests/test_state_reliability.py
Normal file
199
services/control-plane/tests/test_state_reliability.py
Normal file
|
|
@ -0,0 +1,199 @@
|
|||
"""Tests for atomic writes and resilient world-state loading in the supervisor."""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
|
||||
import supervisor as supervisor_module
|
||||
from supervisor import Supervisor, _atomic_write_json
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers (reused from test_supervisor_ha)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _setup_supervisor(tmp_path: Path, monkeypatch) -> Supervisor:
|
||||
actions = tmp_path / "actions"
|
||||
events = tmp_path / "events"
|
||||
world = tmp_path / "world"
|
||||
repo = tmp_path / "repo"
|
||||
|
||||
for d in (actions, events, world, repo / "hosts"):
|
||||
d.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
monkeypatch.setattr(supervisor_module, "ACTIONS_DIR", actions)
|
||||
monkeypatch.setattr(supervisor_module, "EVENTS_DIR", events)
|
||||
monkeypatch.setattr(supervisor_module, "WORLD_DIR", world)
|
||||
monkeypatch.setattr(supervisor_module, "REPO_ROOT", repo)
|
||||
|
||||
sup = Supervisor()
|
||||
sup.desired_state = {"services": {}}
|
||||
sup.actual_state = {"services": {}, "nodes": {}, "incidents": {}}
|
||||
return sup
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 1. atomic_write_json correctness
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_atomic_write_json_produces_valid_json(tmp_path):
|
||||
path = tmp_path / "out.json"
|
||||
data = {"services": {"vps/outline": {"status": "healthy"}}, "count": 42}
|
||||
_atomic_write_json(path, data)
|
||||
|
||||
assert path.exists(), "output file must exist after atomic write"
|
||||
loaded = json.loads(path.read_text())
|
||||
assert loaded == data
|
||||
|
||||
|
||||
def test_atomic_write_json_no_tmp_left_behind(tmp_path):
|
||||
path = tmp_path / "world.json"
|
||||
_atomic_write_json(path, {"ok": True})
|
||||
|
||||
tmp = path.with_suffix(".tmp")
|
||||
assert not tmp.exists(), ".tmp must be cleaned up by os.replace"
|
||||
|
||||
|
||||
def test_atomic_write_json_overwrites_existing(tmp_path):
|
||||
path = tmp_path / "state.json"
|
||||
path.write_text('{"old": true}')
|
||||
_atomic_write_json(path, {"new": True})
|
||||
assert json.loads(path.read_text()) == {"new": True}
|
||||
|
||||
|
||||
def test_atomic_write_json_nested_structure(tmp_path):
|
||||
path = tmp_path / "complex.json"
|
||||
data = {
|
||||
"nodes": {"vps": {"status": "online", "disk_usage_pct": 42}},
|
||||
"incidents": {},
|
||||
"list": [1, 2, 3],
|
||||
}
|
||||
_atomic_write_json(path, data)
|
||||
assert json.loads(path.read_text()) == data
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 2. Resilient loader: empty / truncated file → skip cycle, no drift
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _populate_desired(sup: Supervisor, svc_key: str = "vps/outline"):
|
||||
node, service = svc_key.split("/", 1)
|
||||
sup.desired_state["services"][svc_key] = {
|
||||
"node": node,
|
||||
"service": service,
|
||||
"desired": "running",
|
||||
}
|
||||
|
||||
|
||||
def test_empty_services_json_skips_reconcile(tmp_path, monkeypatch):
|
||||
"""Empty services.json (truncated write) must not generate any redeploy action."""
|
||||
sup = _setup_supervisor(tmp_path, monkeypatch)
|
||||
_populate_desired(sup)
|
||||
|
||||
# Write empty services.json — simulates a mid-write truncation
|
||||
(tmp_path / "world" / "services.json").write_text("")
|
||||
(tmp_path / "world" / "nodes.json").write_text("{}")
|
||||
(tmp_path / "world" / "incidents.json").write_text("{}")
|
||||
|
||||
sup.reconcile()
|
||||
|
||||
pending = list((tmp_path / "actions" / "pending").glob("*.json"))
|
||||
assert pending == [], f"No actions should be generated on empty state file, got: {[p.name for p in pending]}"
|
||||
|
||||
|
||||
def test_truncated_services_json_skips_reconcile(tmp_path, monkeypatch):
|
||||
"""Partially-written (truncated mid-write) JSON must not generate any action."""
|
||||
sup = _setup_supervisor(tmp_path, monkeypatch)
|
||||
_populate_desired(sup)
|
||||
|
||||
(tmp_path / "world" / "services.json").write_text('{"vps/outline": {"status": "hea')
|
||||
(tmp_path / "world" / "nodes.json").write_text("{}")
|
||||
(tmp_path / "world" / "incidents.json").write_text("{}")
|
||||
|
||||
sup.reconcile()
|
||||
|
||||
pending = list((tmp_path / "actions" / "pending").glob("*.json"))
|
||||
assert pending == [], f"No actions expected on truncated state, got: {[p.name for p in pending]}"
|
||||
|
||||
|
||||
def test_empty_incidents_json_skips_reconcile(tmp_path, monkeypatch):
|
||||
"""Empty incidents.json (any world-state file failing) skips full cycle."""
|
||||
sup = _setup_supervisor(tmp_path, monkeypatch)
|
||||
_populate_desired(sup)
|
||||
|
||||
(tmp_path / "world" / "services.json").write_text("{}")
|
||||
(tmp_path / "world" / "nodes.json").write_text("{}")
|
||||
(tmp_path / "world" / "incidents.json").write_text("")
|
||||
|
||||
sup.reconcile()
|
||||
|
||||
pending = list((tmp_path / "actions" / "pending").glob("*.json"))
|
||||
assert pending == [], f"No actions expected when any state file is unreadable, got: {[p.name for p in pending]}"
|
||||
|
||||
|
||||
def test_load_actual_state_returns_false_on_empty_file(tmp_path, monkeypatch):
|
||||
"""_load_actual_state must return False (not raise) when a file is empty."""
|
||||
sup = _setup_supervisor(tmp_path, monkeypatch)
|
||||
|
||||
(tmp_path / "world" / "services.json").write_text("")
|
||||
(tmp_path / "world" / "nodes.json").write_text("{}")
|
||||
(tmp_path / "world" / "incidents.json").write_text("{}")
|
||||
|
||||
result = sup._load_actual_state()
|
||||
assert result is False
|
||||
|
||||
|
||||
def test_load_actual_state_returns_true_on_valid_files(tmp_path, monkeypatch):
|
||||
"""_load_actual_state returns True and populates actual_state on valid files."""
|
||||
sup = _setup_supervisor(tmp_path, monkeypatch)
|
||||
|
||||
services = {"vps/outline": {"node": "vps", "service": "outline", "status": "healthy"}}
|
||||
(tmp_path / "world" / "services.json").write_text(json.dumps(services))
|
||||
(tmp_path / "world" / "nodes.json").write_text('{"vps": {"status": "online"}}')
|
||||
(tmp_path / "world" / "incidents.json").write_text("{}")
|
||||
|
||||
result = sup._load_actual_state()
|
||||
assert result is True
|
||||
assert "vps/outline" in sup.actual_state["services"]
|
||||
|
||||
|
||||
def test_parse_failure_preserves_last_known_good_state(tmp_path, monkeypatch):
|
||||
"""When a file becomes unreadable, actual_state retains the previous good values."""
|
||||
sup = _setup_supervisor(tmp_path, monkeypatch)
|
||||
|
||||
# First successful load
|
||||
services = {"vps/outline": {"node": "vps", "service": "outline", "status": "healthy"}}
|
||||
(tmp_path / "world" / "services.json").write_text(json.dumps(services))
|
||||
(tmp_path / "world" / "nodes.json").write_text("{}")
|
||||
(tmp_path / "world" / "incidents.json").write_text("{}")
|
||||
assert sup._load_actual_state() is True
|
||||
assert "vps/outline" in sup.actual_state["services"]
|
||||
|
||||
# File becomes empty (race condition)
|
||||
(tmp_path / "world" / "services.json").write_text("")
|
||||
assert sup._load_actual_state() is False
|
||||
|
||||
# State must be unchanged from the previous good load
|
||||
assert "vps/outline" in sup.actual_state["services"], \
|
||||
"Last-known-good state must be preserved on parse failure"
|
||||
|
||||
|
||||
def test_healthy_service_does_not_generate_action(tmp_path, monkeypatch):
|
||||
"""A desired service that appears healthy in world state generates no action."""
|
||||
sup = _setup_supervisor(tmp_path, monkeypatch)
|
||||
_populate_desired(sup)
|
||||
|
||||
services = {"vps/outline": {"node": "vps", "service": "outline", "status": "healthy"}}
|
||||
(tmp_path / "world" / "services.json").write_text(json.dumps(services))
|
||||
(tmp_path / "world" / "nodes.json").write_text("{}")
|
||||
(tmp_path / "world" / "incidents.json").write_text("{}")
|
||||
|
||||
sup.reconcile()
|
||||
|
||||
pending = list((tmp_path / "actions" / "pending").glob("*.json"))
|
||||
assert pending == [], "Healthy service must not generate any action"
|
||||
Loading…
Reference in a new issue