diff --git a/scripts/observer/observer.py b/scripts/observer/observer.py index f4bcbe3..e9e8a61 100644 --- a/scripts/observer/observer.py +++ b/scripts/observer/observer.py @@ -24,7 +24,10 @@ logger = logging.getLogger("observer") class Observer: def __init__(self): - self.last_processed_file = None + # Per-node-directory checkpoint: {"vps": "last/file/path", "piha": "last/file/path"} + # Replaces the old single last_processed_file which silently skipped event dirs + # that sort alphabetically before the checkpoint (e.g. piha/ < vps/). + self.node_checkpoints: dict = {} self.world_state = { "nodes": {}, "services": {}, @@ -83,11 +86,22 @@ class Observer: try: with open(OBSERVER_STATE_FILE, "r") as f: checkpoint = json.load(f) - self.last_processed_file = checkpoint.get("last_processed_file") - # We might want to persist partial world state, - # but for now we rebuild from events (idempotent) - # or we can load existing world state files. - self._load_world_from_disk() + + if "node_checkpoints" in checkpoint: + # New format: per-directory checkpoints. + self.node_checkpoints = checkpoint["node_checkpoints"] + elif "last_processed_file" in checkpoint: + # Migrate old single-file checkpoint: extract node dir from path. + old = checkpoint["last_processed_file"] + if old: + try: + node_dir = Path(old).relative_to(EVENTS_DIR).parts[0] + self.node_checkpoints = {node_dir: old} + logger.info(f"Migrated old checkpoint → node_checkpoints: {self.node_checkpoints}") + except Exception: + pass # Bad path — start fresh + + self._load_world_from_disk() except Exception as e: logger.error(f"Failed to load checkpoint: {e}") @@ -111,7 +125,7 @@ class Observer: def _save_checkpoint(self): try: with open(OBSERVER_STATE_FILE, "w") as f: - json.dump({"last_processed_file": self.last_processed_file}, f) + json.dump({"node_checkpoints": self.node_checkpoints}, f, indent=2) except Exception as e: logger.error(f"Failed to save checkpoint: {e}") @@ -345,19 +359,21 @@ class Observer: except Exception as e: logger.error(f"Failed to touch heartbeat file: {e}") - # Find all event files - event_files = sorted(glob.glob(str(EVENTS_DIR / "**" / "*.json"), recursive=True)) - + # Collect all event files grouped by node directory. + # Per-node checkpoints are compared within each directory independently, + # so late-arriving events from remote nodes (sorted earlier in the path) + # are never skipped just because another node's checkpoint is further ahead. + all_files = sorted(glob.glob(str(EVENTS_DIR / "**" / "*.json"), recursive=True)) + new_files = [] - if self.last_processed_file: + for file_path in all_files: try: - idx = event_files.index(self.last_processed_file) - new_files = event_files[idx+1:] - except ValueError: - # If last_processed_file is gone or not in list, process all - new_files = event_files - else: - new_files = event_files + node_dir = str(Path(file_path).relative_to(EVENTS_DIR).parts[0]) + except (IndexError, ValueError): + node_dir = "__unknown__" + last_for_node = self.node_checkpoints.get(node_dir, "") + if file_path > last_for_node: + new_files.append((node_dir, file_path)) if not new_files: # Even if no new events, prune stale entries and refresh summary freshness. @@ -365,13 +381,16 @@ class Observer: self._save_world() return - logger.info(f"Processing {len(new_files)} new events") - for file_path in new_files: + logger.info(f"Processing {len(new_files)} new events across " + f"{len({n for n, _ in new_files})} node(s)") + for node_dir, file_path in new_files: try: with open(file_path, "r") as f: event = json.load(f) self.process_event(event) - self.last_processed_file = file_path + # Advance per-node checkpoint (only forward — no regression). + if file_path > self.node_checkpoints.get(node_dir, ""): + self.node_checkpoints[node_dir] = file_path except Exception as e: logger.error(f"Error processing {file_path}: {e}")