fix(observer): quarantine malformed event files to prevent processing wedge

Recovery from bad merge of task/observer-poison-quarantine (c255a02)
which carried false deletes from a stale branch base. Re-applies only
the genuine observer changes on top of correct master state.

When an event file fails to parse (malformed JSON, truncated, corrupted),
the observer previously kept retrying on every cycle while the node's
checkpoint stayed pinned — all subsequent good events for that node lost.

Now: first parse failure -> atomic os.replace to STATE_DIR/observer_failed_events/<node>/
with collision handling. Checkpoint advances, downstream events flow.
Move failures are logged but don't crash the loop.

Complementary to the atomic_write_json fix on state files; this addresses
the same race-pattern on event files instead.

Regression test asserts: bad event quarantined to failed_events dir,
removed from hot path, subsequent good event processed (node online),
checkpoint moves to good event.
This commit is contained in:
Oskar Kapala 2026-06-12 13:11:15 +02:00
parent 31b5981174
commit c9ee8eb06d
2 changed files with 73 additions and 1 deletions

View file

@ -42,6 +42,7 @@ STATE_DIR = Path(RUNTIME_PATH) / "state"
LOGS_DIR = Path(RUNTIME_PATH) / "logs"
WORLD_DIR = Path(RUNTIME_PATH) / "world"
OBSERVER_STATE_FILE = STATE_DIR / "observer_checkpoint.json"
FAILED_EVENTS_DIR = STATE_DIR / "observer_failed_events"
REPO_ROOT = Path(__file__).parent.parent.parent
INVENTORY_TOPOLOGY = REPO_ROOT / "inventory" / "topology.yaml"
@ -76,6 +77,27 @@ class Observer:
STATE_DIR.mkdir(parents=True, exist_ok=True)
EVENTS_DIR.mkdir(parents=True, exist_ok=True)
LOGS_DIR.mkdir(parents=True, exist_ok=True)
FAILED_EVENTS_DIR.mkdir(parents=True, exist_ok=True)
def _quarantine_event_file(self, file_path: str, node_dir: str, exc: Exception) -> None:
"""Move an unreadable/unprocessable event out of the hot path."""
src = Path(file_path)
dest_dir = FAILED_EVENTS_DIR / node_dir
dest_dir.mkdir(parents=True, exist_ok=True)
dest = dest_dir / src.name
if dest.exists():
dest = dest_dir / f"{src.stem}-{int(time.time())}{src.suffix}"
try:
os.replace(src, dest)
logger.error(
"Quarantined bad event for node_dir=%s: %s -> %s (%s: %s)",
node_dir, src, dest, type(exc).__name__, exc,
)
except Exception as move_exc:
logger.error(
"Failed to quarantine bad event for node_dir=%s: %s (%s: %s); move error=%s: %s",
node_dir, src, type(exc).__name__, exc, type(move_exc).__name__, move_exc,
)
def _load_inventory(self):
inventory = {"nodes": {}, "services": {}}
@ -499,7 +521,11 @@ class Observer:
if file_path > self.node_checkpoints.get(node_dir, ""):
self.node_checkpoints[node_dir] = file_path
except Exception as e:
logger.error(f"Error processing {file_path}: {e}")
logger.error(
"Error processing node_dir=%s file=%s (%s: %s)",
node_dir, file_path, type(e).__name__, e,
)
self._quarantine_event_file(file_path, node_dir, e)
self._save_checkpoint()
self._prune_stale_world()

View file

@ -41,6 +41,7 @@ def _make_observer(tmp_path: Path) -> Observer:
original_logs = obs_mod.LOGS_DIR
original_inventory = obs_mod.INVENTORY_TOPOLOGY
original_repo = obs_mod.REPO_ROOT
original_failed_events = obs_mod.FAILED_EVENTS_DIR
obs_mod.WORLD_DIR = world
obs_mod.STATE_DIR = state
@ -48,6 +49,7 @@ def _make_observer(tmp_path: Path) -> Observer:
obs_mod.LOGS_DIR = logs
obs_mod.INVENTORY_TOPOLOGY = repo / "inventory" / "topology.yaml"
obs_mod.REPO_ROOT = repo
obs_mod.FAILED_EVENTS_DIR = state / "observer_failed_events"
obs = Observer()
@ -59,6 +61,7 @@ def _make_observer(tmp_path: Path) -> Observer:
obs_mod.LOGS_DIR = original_logs
obs_mod.INVENTORY_TOPOLOGY = original_inventory
obs_mod.REPO_ROOT = original_repo
obs_mod.FAILED_EVENTS_DIR = original_failed_events
return obs
@ -87,6 +90,7 @@ def _make_observer_simple(tmp_path: Path):
obs_mod.LOGS_DIR = logs
obs_mod.INVENTORY_TOPOLOGY = repo / "inventory" / "topology.yaml"
obs_mod.REPO_ROOT = repo
obs_mod.FAILED_EVENTS_DIR = state / "observer_failed_events"
obs = Observer()
return obs
@ -331,3 +335,45 @@ def test_prune_keeps_recently_resolved_incident(tmp_path):
obs._prune_stale_world()
assert inc_id in obs.world_state["incidents"]
def test_run_once_quarantines_bad_event_and_processes_next_for_same_node(tmp_path):
"""A malformed event file must not wedge a node forever."""
obs = _make_observer_simple(tmp_path)
import observer.observer as obs_mod
topology = obs_mod.INVENTORY_TOPOLOGY
topology.write_text(
"nodes:\n"
" lustro:\n"
" roles: [edge]\n"
" connectivity: {}\n"
)
obs.inventory = obs._load_inventory()
bad_dir = obs_mod.EVENTS_DIR / "lustro"
bad_dir.mkdir(parents=True, exist_ok=True)
bad_event = bad_dir / "evt-lustro-1-bad.json"
bad_event.write_text("{not-json")
good_event = bad_dir / "evt-lustro-2-good.json"
good_event.write_text(json.dumps({
"id": "evt-lustro-2-good",
"timestamp": int(time.time()),
"date": "2026-06-10T00:00:00Z",
"type": "node_health",
"severity": "info",
"node": "lustro",
"service": "",
"message": "ok",
"payload": {"disk_pct": 1, "mem_pct": 2, "cpu_pct": 3},
}))
obs.run_once()
quarantined = obs_mod.FAILED_EVENTS_DIR / "lustro" / bad_event.name
assert quarantined.exists()
assert not bad_event.exists()
assert obs.world_state["nodes"]["lustro"]["status"] == "online"
assert obs.node_checkpoints["lustro"] == str(good_event)