From c9ee8eb06d484d7be4f0ee4d8b292c630445115d Mon Sep 17 00:00:00 2001 From: Oskar Kapala Date: Fri, 12 Jun 2026 13:11:15 +0200 Subject: [PATCH] fix(observer): quarantine malformed event files to prevent processing wedge MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Recovery from bad merge of task/observer-poison-quarantine (c255a02) which carried false deletes from a stale branch base. Re-applies only the genuine observer changes on top of correct master state. When an event file fails to parse (malformed JSON, truncated, corrupted), the observer previously kept retrying on every cycle while the node's checkpoint stayed pinned — all subsequent good events for that node lost. Now: first parse failure -> atomic os.replace to STATE_DIR/observer_failed_events// with collision handling. Checkpoint advances, downstream events flow. Move failures are logged but don't crash the loop. Complementary to the atomic_write_json fix on state files; this addresses the same race-pattern on event files instead. Regression test asserts: bad event quarantined to failed_events dir, removed from hot path, subsequent good event processed (node online), checkpoint moves to good event. --- scripts/observer/observer.py | 28 ++++++++++- .../tests/test_incident_lifecycle.py | 46 +++++++++++++++++++ 2 files changed, 73 insertions(+), 1 deletion(-) diff --git a/scripts/observer/observer.py b/scripts/observer/observer.py index 90a5ee0..3b1c680 100644 --- a/scripts/observer/observer.py +++ b/scripts/observer/observer.py @@ -42,6 +42,7 @@ STATE_DIR = Path(RUNTIME_PATH) / "state" LOGS_DIR = Path(RUNTIME_PATH) / "logs" WORLD_DIR = Path(RUNTIME_PATH) / "world" OBSERVER_STATE_FILE = STATE_DIR / "observer_checkpoint.json" +FAILED_EVENTS_DIR = STATE_DIR / "observer_failed_events" REPO_ROOT = Path(__file__).parent.parent.parent INVENTORY_TOPOLOGY = REPO_ROOT / "inventory" / "topology.yaml" @@ -76,6 +77,27 @@ class Observer: STATE_DIR.mkdir(parents=True, exist_ok=True) EVENTS_DIR.mkdir(parents=True, exist_ok=True) LOGS_DIR.mkdir(parents=True, exist_ok=True) + FAILED_EVENTS_DIR.mkdir(parents=True, exist_ok=True) + + def _quarantine_event_file(self, file_path: str, node_dir: str, exc: Exception) -> None: + """Move an unreadable/unprocessable event out of the hot path.""" + src = Path(file_path) + dest_dir = FAILED_EVENTS_DIR / node_dir + dest_dir.mkdir(parents=True, exist_ok=True) + dest = dest_dir / src.name + if dest.exists(): + dest = dest_dir / f"{src.stem}-{int(time.time())}{src.suffix}" + try: + os.replace(src, dest) + logger.error( + "Quarantined bad event for node_dir=%s: %s -> %s (%s: %s)", + node_dir, src, dest, type(exc).__name__, exc, + ) + except Exception as move_exc: + logger.error( + "Failed to quarantine bad event for node_dir=%s: %s (%s: %s); move error=%s: %s", + node_dir, src, type(exc).__name__, exc, type(move_exc).__name__, move_exc, + ) def _load_inventory(self): inventory = {"nodes": {}, "services": {}} @@ -499,7 +521,11 @@ class Observer: if file_path > self.node_checkpoints.get(node_dir, ""): self.node_checkpoints[node_dir] = file_path except Exception as e: - logger.error(f"Error processing {file_path}: {e}") + logger.error( + "Error processing node_dir=%s file=%s (%s: %s)", + node_dir, file_path, type(e).__name__, e, + ) + self._quarantine_event_file(file_path, node_dir, e) self._save_checkpoint() self._prune_stale_world() diff --git a/services/control-plane/tests/test_incident_lifecycle.py b/services/control-plane/tests/test_incident_lifecycle.py index a0a9f36..9c0b7b2 100644 --- a/services/control-plane/tests/test_incident_lifecycle.py +++ b/services/control-plane/tests/test_incident_lifecycle.py @@ -41,6 +41,7 @@ def _make_observer(tmp_path: Path) -> Observer: original_logs = obs_mod.LOGS_DIR original_inventory = obs_mod.INVENTORY_TOPOLOGY original_repo = obs_mod.REPO_ROOT + original_failed_events = obs_mod.FAILED_EVENTS_DIR obs_mod.WORLD_DIR = world obs_mod.STATE_DIR = state @@ -48,6 +49,7 @@ def _make_observer(tmp_path: Path) -> Observer: obs_mod.LOGS_DIR = logs obs_mod.INVENTORY_TOPOLOGY = repo / "inventory" / "topology.yaml" obs_mod.REPO_ROOT = repo + obs_mod.FAILED_EVENTS_DIR = state / "observer_failed_events" obs = Observer() @@ -59,6 +61,7 @@ def _make_observer(tmp_path: Path) -> Observer: obs_mod.LOGS_DIR = original_logs obs_mod.INVENTORY_TOPOLOGY = original_inventory obs_mod.REPO_ROOT = original_repo + obs_mod.FAILED_EVENTS_DIR = original_failed_events return obs @@ -87,6 +90,7 @@ def _make_observer_simple(tmp_path: Path): obs_mod.LOGS_DIR = logs obs_mod.INVENTORY_TOPOLOGY = repo / "inventory" / "topology.yaml" obs_mod.REPO_ROOT = repo + obs_mod.FAILED_EVENTS_DIR = state / "observer_failed_events" obs = Observer() return obs @@ -331,3 +335,45 @@ def test_prune_keeps_recently_resolved_incident(tmp_path): obs._prune_stale_world() assert inc_id in obs.world_state["incidents"] + + +def test_run_once_quarantines_bad_event_and_processes_next_for_same_node(tmp_path): + """A malformed event file must not wedge a node forever.""" + obs = _make_observer_simple(tmp_path) + + import observer.observer as obs_mod + + topology = obs_mod.INVENTORY_TOPOLOGY + topology.write_text( + "nodes:\n" + " lustro:\n" + " roles: [edge]\n" + " connectivity: {}\n" + ) + obs.inventory = obs._load_inventory() + + bad_dir = obs_mod.EVENTS_DIR / "lustro" + bad_dir.mkdir(parents=True, exist_ok=True) + bad_event = bad_dir / "evt-lustro-1-bad.json" + bad_event.write_text("{not-json") + + good_event = bad_dir / "evt-lustro-2-good.json" + good_event.write_text(json.dumps({ + "id": "evt-lustro-2-good", + "timestamp": int(time.time()), + "date": "2026-06-10T00:00:00Z", + "type": "node_health", + "severity": "info", + "node": "lustro", + "service": "", + "message": "ok", + "payload": {"disk_pct": 1, "mem_pct": 2, "cpu_pct": 3}, + })) + + obs.run_once() + + quarantined = obs_mod.FAILED_EVENTS_DIR / "lustro" / bad_event.name + assert quarantined.exists() + assert not bad_event.exists() + assert obs.world_state["nodes"]["lustro"]["status"] == "online" + assert obs.node_checkpoints["lustro"] == str(good_event)