From f4a8db93e44f6d284cdb9676bd2e08cd7c4a73ad Mon Sep 17 00:00:00 2001 From: Oskar Kapala Date: Wed, 27 May 2026 14:16:58 +0200 Subject: [PATCH] fix(observer): per-node-directory checkpoints replace single global checkpoint The old mechanism tracked a single 'last_processed_file' and used sorted filename order to find new events. Remote nodes ship events into subdirectories (events/piha/, events/chelsty-infra/) that sort alphabetically BEFORE the VPS directory (events/vps/). Once the checkpoint pointed to a vps/ file, all piha/ and chelsty-infra/ events were silently skipped forever. New mechanism: - node_checkpoints: {node_dir: last_processed_path} - Each node directory has its own independent cursor - New events = files whose path > that node's checkpoint - Backward-compatible: old 'last_processed_file' is migrated by extracting the node dir from the path on first load Co-Authored-By: Claude Sonnet 4.6 --- scripts/observer/observer.py | 61 +++++++++++++++++++++++------------- 1 file changed, 40 insertions(+), 21 deletions(-) diff --git a/scripts/observer/observer.py b/scripts/observer/observer.py index f4bcbe3..e9e8a61 100644 --- a/scripts/observer/observer.py +++ b/scripts/observer/observer.py @@ -24,7 +24,10 @@ logger = logging.getLogger("observer") class Observer: def __init__(self): - self.last_processed_file = None + # Per-node-directory checkpoint: {"vps": "last/file/path", "piha": "last/file/path"} + # Replaces the old single last_processed_file which silently skipped event dirs + # that sort alphabetically before the checkpoint (e.g. piha/ < vps/). + self.node_checkpoints: dict = {} self.world_state = { "nodes": {}, "services": {}, @@ -83,11 +86,22 @@ class Observer: try: with open(OBSERVER_STATE_FILE, "r") as f: checkpoint = json.load(f) - self.last_processed_file = checkpoint.get("last_processed_file") - # We might want to persist partial world state, - # but for now we rebuild from events (idempotent) - # or we can load existing world state files. - self._load_world_from_disk() + + if "node_checkpoints" in checkpoint: + # New format: per-directory checkpoints. + self.node_checkpoints = checkpoint["node_checkpoints"] + elif "last_processed_file" in checkpoint: + # Migrate old single-file checkpoint: extract node dir from path. + old = checkpoint["last_processed_file"] + if old: + try: + node_dir = Path(old).relative_to(EVENTS_DIR).parts[0] + self.node_checkpoints = {node_dir: old} + logger.info(f"Migrated old checkpoint → node_checkpoints: {self.node_checkpoints}") + except Exception: + pass # Bad path — start fresh + + self._load_world_from_disk() except Exception as e: logger.error(f"Failed to load checkpoint: {e}") @@ -111,7 +125,7 @@ class Observer: def _save_checkpoint(self): try: with open(OBSERVER_STATE_FILE, "w") as f: - json.dump({"last_processed_file": self.last_processed_file}, f) + json.dump({"node_checkpoints": self.node_checkpoints}, f, indent=2) except Exception as e: logger.error(f"Failed to save checkpoint: {e}") @@ -345,19 +359,21 @@ class Observer: except Exception as e: logger.error(f"Failed to touch heartbeat file: {e}") - # Find all event files - event_files = sorted(glob.glob(str(EVENTS_DIR / "**" / "*.json"), recursive=True)) - + # Collect all event files grouped by node directory. + # Per-node checkpoints are compared within each directory independently, + # so late-arriving events from remote nodes (sorted earlier in the path) + # are never skipped just because another node's checkpoint is further ahead. + all_files = sorted(glob.glob(str(EVENTS_DIR / "**" / "*.json"), recursive=True)) + new_files = [] - if self.last_processed_file: + for file_path in all_files: try: - idx = event_files.index(self.last_processed_file) - new_files = event_files[idx+1:] - except ValueError: - # If last_processed_file is gone or not in list, process all - new_files = event_files - else: - new_files = event_files + node_dir = str(Path(file_path).relative_to(EVENTS_DIR).parts[0]) + except (IndexError, ValueError): + node_dir = "__unknown__" + last_for_node = self.node_checkpoints.get(node_dir, "") + if file_path > last_for_node: + new_files.append((node_dir, file_path)) if not new_files: # Even if no new events, prune stale entries and refresh summary freshness. @@ -365,13 +381,16 @@ class Observer: self._save_world() return - logger.info(f"Processing {len(new_files)} new events") - for file_path in new_files: + logger.info(f"Processing {len(new_files)} new events across " + f"{len({n for n, _ in new_files})} node(s)") + for node_dir, file_path in new_files: try: with open(file_path, "r") as f: event = json.load(f) self.process_event(event) - self.last_processed_file = file_path + # Advance per-node checkpoint (only forward — no regression). + if file_path > self.node_checkpoints.get(node_dir, ""): + self.node_checkpoints[node_dir] = file_path except Exception as e: logger.error(f"Error processing {file_path}: {e}")