diff --git a/scripts/observer/observer.py b/scripts/observer/observer.py index 90a5ee0..3b1c680 100644 --- a/scripts/observer/observer.py +++ b/scripts/observer/observer.py @@ -42,6 +42,7 @@ STATE_DIR = Path(RUNTIME_PATH) / "state" LOGS_DIR = Path(RUNTIME_PATH) / "logs" WORLD_DIR = Path(RUNTIME_PATH) / "world" OBSERVER_STATE_FILE = STATE_DIR / "observer_checkpoint.json" +FAILED_EVENTS_DIR = STATE_DIR / "observer_failed_events" REPO_ROOT = Path(__file__).parent.parent.parent INVENTORY_TOPOLOGY = REPO_ROOT / "inventory" / "topology.yaml" @@ -76,6 +77,27 @@ class Observer: STATE_DIR.mkdir(parents=True, exist_ok=True) EVENTS_DIR.mkdir(parents=True, exist_ok=True) LOGS_DIR.mkdir(parents=True, exist_ok=True) + FAILED_EVENTS_DIR.mkdir(parents=True, exist_ok=True) + + def _quarantine_event_file(self, file_path: str, node_dir: str, exc: Exception) -> None: + """Move an unreadable/unprocessable event out of the hot path.""" + src = Path(file_path) + dest_dir = FAILED_EVENTS_DIR / node_dir + dest_dir.mkdir(parents=True, exist_ok=True) + dest = dest_dir / src.name + if dest.exists(): + dest = dest_dir / f"{src.stem}-{int(time.time())}{src.suffix}" + try: + os.replace(src, dest) + logger.error( + "Quarantined bad event for node_dir=%s: %s -> %s (%s: %s)", + node_dir, src, dest, type(exc).__name__, exc, + ) + except Exception as move_exc: + logger.error( + "Failed to quarantine bad event for node_dir=%s: %s (%s: %s); move error=%s: %s", + node_dir, src, type(exc).__name__, exc, type(move_exc).__name__, move_exc, + ) def _load_inventory(self): inventory = {"nodes": {}, "services": {}} @@ -499,7 +521,11 @@ class Observer: if file_path > self.node_checkpoints.get(node_dir, ""): self.node_checkpoints[node_dir] = file_path except Exception as e: - logger.error(f"Error processing {file_path}: {e}") + logger.error( + "Error processing node_dir=%s file=%s (%s: %s)", + node_dir, file_path, type(e).__name__, e, + ) + self._quarantine_event_file(file_path, node_dir, e) self._save_checkpoint() self._prune_stale_world() diff --git a/services/control-plane/tests/test_incident_lifecycle.py b/services/control-plane/tests/test_incident_lifecycle.py index a0a9f36..9c0b7b2 100644 --- a/services/control-plane/tests/test_incident_lifecycle.py +++ b/services/control-plane/tests/test_incident_lifecycle.py @@ -41,6 +41,7 @@ def _make_observer(tmp_path: Path) -> Observer: original_logs = obs_mod.LOGS_DIR original_inventory = obs_mod.INVENTORY_TOPOLOGY original_repo = obs_mod.REPO_ROOT + original_failed_events = obs_mod.FAILED_EVENTS_DIR obs_mod.WORLD_DIR = world obs_mod.STATE_DIR = state @@ -48,6 +49,7 @@ def _make_observer(tmp_path: Path) -> Observer: obs_mod.LOGS_DIR = logs obs_mod.INVENTORY_TOPOLOGY = repo / "inventory" / "topology.yaml" obs_mod.REPO_ROOT = repo + obs_mod.FAILED_EVENTS_DIR = state / "observer_failed_events" obs = Observer() @@ -59,6 +61,7 @@ def _make_observer(tmp_path: Path) -> Observer: obs_mod.LOGS_DIR = original_logs obs_mod.INVENTORY_TOPOLOGY = original_inventory obs_mod.REPO_ROOT = original_repo + obs_mod.FAILED_EVENTS_DIR = original_failed_events return obs @@ -87,6 +90,7 @@ def _make_observer_simple(tmp_path: Path): obs_mod.LOGS_DIR = logs obs_mod.INVENTORY_TOPOLOGY = repo / "inventory" / "topology.yaml" obs_mod.REPO_ROOT = repo + obs_mod.FAILED_EVENTS_DIR = state / "observer_failed_events" obs = Observer() return obs @@ -331,3 +335,45 @@ def test_prune_keeps_recently_resolved_incident(tmp_path): obs._prune_stale_world() assert inc_id in obs.world_state["incidents"] + + +def test_run_once_quarantines_bad_event_and_processes_next_for_same_node(tmp_path): + """A malformed event file must not wedge a node forever.""" + obs = _make_observer_simple(tmp_path) + + import observer.observer as obs_mod + + topology = obs_mod.INVENTORY_TOPOLOGY + topology.write_text( + "nodes:\n" + " lustro:\n" + " roles: [edge]\n" + " connectivity: {}\n" + ) + obs.inventory = obs._load_inventory() + + bad_dir = obs_mod.EVENTS_DIR / "lustro" + bad_dir.mkdir(parents=True, exist_ok=True) + bad_event = bad_dir / "evt-lustro-1-bad.json" + bad_event.write_text("{not-json") + + good_event = bad_dir / "evt-lustro-2-good.json" + good_event.write_text(json.dumps({ + "id": "evt-lustro-2-good", + "timestamp": int(time.time()), + "date": "2026-06-10T00:00:00Z", + "type": "node_health", + "severity": "info", + "node": "lustro", + "service": "", + "message": "ok", + "payload": {"disk_pct": 1, "mem_pct": 2, "cpu_pct": 3}, + })) + + obs.run_once() + + quarantined = obs_mod.FAILED_EVENTS_DIR / "lustro" / bad_event.name + assert quarantined.exists() + assert not bad_event.exists() + assert obs.world_state["nodes"]["lustro"]["status"] == "online" + assert obs.node_checkpoints["lustro"] == str(good_event)