"""Tests for atomic writes and resilient world-state loading in the supervisor.""" from __future__ import annotations import json import sys import time from pathlib import Path import pytest sys.path.insert(0, str(Path(__file__).parent.parent / "src")) import supervisor as supervisor_module from supervisor import Supervisor, _atomic_write_json # --------------------------------------------------------------------------- # Helpers (reused from test_supervisor_ha) # --------------------------------------------------------------------------- def _setup_supervisor(tmp_path: Path, monkeypatch) -> Supervisor: actions = tmp_path / "actions" events = tmp_path / "events" world = tmp_path / "world" repo = tmp_path / "repo" for d in (actions, events, world, repo / "hosts"): d.mkdir(parents=True, exist_ok=True) monkeypatch.setattr(supervisor_module, "ACTIONS_DIR", actions) monkeypatch.setattr(supervisor_module, "EVENTS_DIR", events) monkeypatch.setattr(supervisor_module, "WORLD_DIR", world) monkeypatch.setattr(supervisor_module, "REPO_ROOT", repo) sup = Supervisor() sup.desired_state = {"services": {}} sup.actual_state = {"services": {}, "nodes": {}, "incidents": {}} return sup # --------------------------------------------------------------------------- # 1. atomic_write_json correctness # --------------------------------------------------------------------------- def test_atomic_write_json_produces_valid_json(tmp_path): path = tmp_path / "out.json" data = {"services": {"vps/outline": {"status": "healthy"}}, "count": 42} _atomic_write_json(path, data) assert path.exists(), "output file must exist after atomic write" loaded = json.loads(path.read_text()) assert loaded == data def test_atomic_write_json_no_tmp_left_behind(tmp_path): path = tmp_path / "world.json" _atomic_write_json(path, {"ok": True}) tmp = path.with_suffix(".tmp") assert not tmp.exists(), ".tmp must be cleaned up by os.replace" def test_atomic_write_json_overwrites_existing(tmp_path): path = tmp_path / "state.json" path.write_text('{"old": true}') _atomic_write_json(path, {"new": True}) assert json.loads(path.read_text()) == {"new": True} def test_atomic_write_json_nested_structure(tmp_path): path = tmp_path / "complex.json" data = { "nodes": {"vps": {"status": "online", "disk_usage_pct": 42}}, "incidents": {}, "list": [1, 2, 3], } _atomic_write_json(path, data) assert json.loads(path.read_text()) == data # --------------------------------------------------------------------------- # 2. Resilient loader: empty / truncated file → skip cycle, no drift # --------------------------------------------------------------------------- def _populate_desired(sup: Supervisor, svc_key: str = "vps/outline"): node, service = svc_key.split("/", 1) sup.desired_state["services"][svc_key] = { "node": node, "service": service, "desired": "running", } def test_empty_services_json_skips_reconcile(tmp_path, monkeypatch): """Empty services.json (truncated write) must not generate any redeploy action.""" sup = _setup_supervisor(tmp_path, monkeypatch) _populate_desired(sup) # Write empty services.json — simulates a mid-write truncation (tmp_path / "world" / "services.json").write_text("") (tmp_path / "world" / "nodes.json").write_text("{}") (tmp_path / "world" / "incidents.json").write_text("{}") sup.reconcile() pending = list((tmp_path / "actions" / "pending").glob("*.json")) assert pending == [], f"No actions should be generated on empty state file, got: {[p.name for p in pending]}" def test_truncated_services_json_skips_reconcile(tmp_path, monkeypatch): """Partially-written (truncated mid-write) JSON must not generate any action.""" sup = _setup_supervisor(tmp_path, monkeypatch) _populate_desired(sup) (tmp_path / "world" / "services.json").write_text('{"vps/outline": {"status": "hea') (tmp_path / "world" / "nodes.json").write_text("{}") (tmp_path / "world" / "incidents.json").write_text("{}") sup.reconcile() pending = list((tmp_path / "actions" / "pending").glob("*.json")) assert pending == [], f"No actions expected on truncated state, got: {[p.name for p in pending]}" def test_empty_incidents_json_skips_reconcile(tmp_path, monkeypatch): """Empty incidents.json (any world-state file failing) skips full cycle.""" sup = _setup_supervisor(tmp_path, monkeypatch) _populate_desired(sup) (tmp_path / "world" / "services.json").write_text("{}") (tmp_path / "world" / "nodes.json").write_text("{}") (tmp_path / "world" / "incidents.json").write_text("") sup.reconcile() pending = list((tmp_path / "actions" / "pending").glob("*.json")) assert pending == [], f"No actions expected when any state file is unreadable, got: {[p.name for p in pending]}" def test_load_actual_state_returns_false_on_empty_file(tmp_path, monkeypatch): """_load_actual_state must return False (not raise) when a file is empty.""" sup = _setup_supervisor(tmp_path, monkeypatch) (tmp_path / "world" / "services.json").write_text("") (tmp_path / "world" / "nodes.json").write_text("{}") (tmp_path / "world" / "incidents.json").write_text("{}") result = sup._load_actual_state() assert result is False def test_load_actual_state_returns_true_on_valid_files(tmp_path, monkeypatch): """_load_actual_state returns True and populates actual_state on valid files.""" sup = _setup_supervisor(tmp_path, monkeypatch) services = {"vps/outline": {"node": "vps", "service": "outline", "status": "healthy"}} (tmp_path / "world" / "services.json").write_text(json.dumps(services)) (tmp_path / "world" / "nodes.json").write_text('{"vps": {"status": "online"}}') (tmp_path / "world" / "incidents.json").write_text("{}") result = sup._load_actual_state() assert result is True assert "vps/outline" in sup.actual_state["services"] def test_parse_failure_preserves_last_known_good_state(tmp_path, monkeypatch): """When a file becomes unreadable, actual_state retains the previous good values.""" sup = _setup_supervisor(tmp_path, monkeypatch) # First successful load services = {"vps/outline": {"node": "vps", "service": "outline", "status": "healthy"}} (tmp_path / "world" / "services.json").write_text(json.dumps(services)) (tmp_path / "world" / "nodes.json").write_text("{}") (tmp_path / "world" / "incidents.json").write_text("{}") assert sup._load_actual_state() is True assert "vps/outline" in sup.actual_state["services"] # File becomes empty (race condition) (tmp_path / "world" / "services.json").write_text("") assert sup._load_actual_state() is False # State must be unchanged from the previous good load assert "vps/outline" in sup.actual_state["services"], \ "Last-known-good state must be preserved on parse failure" def test_healthy_service_does_not_generate_action(tmp_path, monkeypatch): """A desired service that appears healthy in world state generates no action.""" sup = _setup_supervisor(tmp_path, monkeypatch) _populate_desired(sup) services = {"vps/outline": {"node": "vps", "service": "outline", "status": "healthy"}} (tmp_path / "world" / "services.json").write_text(json.dumps(services)) (tmp_path / "world" / "nodes.json").write_text("{}") (tmp_path / "world" / "incidents.json").write_text("{}") sup.reconcile() pending = list((tmp_path / "actions" / "pending").glob("*.json")) assert pending == [], "Healthy service must not generate any action"