fix(observer): per-node-directory checkpoints replace single global checkpoint
The old mechanism tracked a single 'last_processed_file' and used sorted
filename order to find new events. Remote nodes ship events into
subdirectories (events/piha/, events/chelsty-infra/) that sort
alphabetically BEFORE the VPS directory (events/vps/). Once the
checkpoint pointed to a vps/ file, all piha/ and chelsty-infra/ events
were silently skipped forever.
New mechanism:
- node_checkpoints: {node_dir: last_processed_path}
- Each node directory has its own independent cursor
- New events = files whose path > that node's checkpoint
- Backward-compatible: old 'last_processed_file' is migrated by extracting
the node dir from the path on first load
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
a5a3e223dc
commit
f4a8db93e4
|
|
@ -24,7 +24,10 @@ logger = logging.getLogger("observer")
|
|||
|
||||
class Observer:
|
||||
def __init__(self):
|
||||
self.last_processed_file = None
|
||||
# Per-node-directory checkpoint: {"vps": "last/file/path", "piha": "last/file/path"}
|
||||
# Replaces the old single last_processed_file which silently skipped event dirs
|
||||
# that sort alphabetically before the checkpoint (e.g. piha/ < vps/).
|
||||
self.node_checkpoints: dict = {}
|
||||
self.world_state = {
|
||||
"nodes": {},
|
||||
"services": {},
|
||||
|
|
@ -83,10 +86,21 @@ class Observer:
|
|||
try:
|
||||
with open(OBSERVER_STATE_FILE, "r") as f:
|
||||
checkpoint = json.load(f)
|
||||
self.last_processed_file = checkpoint.get("last_processed_file")
|
||||
# We might want to persist partial world state,
|
||||
# but for now we rebuild from events (idempotent)
|
||||
# or we can load existing world state files.
|
||||
|
||||
if "node_checkpoints" in checkpoint:
|
||||
# New format: per-directory checkpoints.
|
||||
self.node_checkpoints = checkpoint["node_checkpoints"]
|
||||
elif "last_processed_file" in checkpoint:
|
||||
# Migrate old single-file checkpoint: extract node dir from path.
|
||||
old = checkpoint["last_processed_file"]
|
||||
if old:
|
||||
try:
|
||||
node_dir = Path(old).relative_to(EVENTS_DIR).parts[0]
|
||||
self.node_checkpoints = {node_dir: old}
|
||||
logger.info(f"Migrated old checkpoint → node_checkpoints: {self.node_checkpoints}")
|
||||
except Exception:
|
||||
pass # Bad path — start fresh
|
||||
|
||||
self._load_world_from_disk()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load checkpoint: {e}")
|
||||
|
|
@ -111,7 +125,7 @@ class Observer:
|
|||
def _save_checkpoint(self):
|
||||
try:
|
||||
with open(OBSERVER_STATE_FILE, "w") as f:
|
||||
json.dump({"last_processed_file": self.last_processed_file}, f)
|
||||
json.dump({"node_checkpoints": self.node_checkpoints}, f, indent=2)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save checkpoint: {e}")
|
||||
|
||||
|
|
@ -345,19 +359,21 @@ class Observer:
|
|||
except Exception as e:
|
||||
logger.error(f"Failed to touch heartbeat file: {e}")
|
||||
|
||||
# Find all event files
|
||||
event_files = sorted(glob.glob(str(EVENTS_DIR / "**" / "*.json"), recursive=True))
|
||||
# Collect all event files grouped by node directory.
|
||||
# Per-node checkpoints are compared within each directory independently,
|
||||
# so late-arriving events from remote nodes (sorted earlier in the path)
|
||||
# are never skipped just because another node's checkpoint is further ahead.
|
||||
all_files = sorted(glob.glob(str(EVENTS_DIR / "**" / "*.json"), recursive=True))
|
||||
|
||||
new_files = []
|
||||
if self.last_processed_file:
|
||||
for file_path in all_files:
|
||||
try:
|
||||
idx = event_files.index(self.last_processed_file)
|
||||
new_files = event_files[idx+1:]
|
||||
except ValueError:
|
||||
# If last_processed_file is gone or not in list, process all
|
||||
new_files = event_files
|
||||
else:
|
||||
new_files = event_files
|
||||
node_dir = str(Path(file_path).relative_to(EVENTS_DIR).parts[0])
|
||||
except (IndexError, ValueError):
|
||||
node_dir = "__unknown__"
|
||||
last_for_node = self.node_checkpoints.get(node_dir, "")
|
||||
if file_path > last_for_node:
|
||||
new_files.append((node_dir, file_path))
|
||||
|
||||
if not new_files:
|
||||
# Even if no new events, prune stale entries and refresh summary freshness.
|
||||
|
|
@ -365,13 +381,16 @@ class Observer:
|
|||
self._save_world()
|
||||
return
|
||||
|
||||
logger.info(f"Processing {len(new_files)} new events")
|
||||
for file_path in new_files:
|
||||
logger.info(f"Processing {len(new_files)} new events across "
|
||||
f"{len({n for n, _ in new_files})} node(s)")
|
||||
for node_dir, file_path in new_files:
|
||||
try:
|
||||
with open(file_path, "r") as f:
|
||||
event = json.load(f)
|
||||
self.process_event(event)
|
||||
self.last_processed_file = file_path
|
||||
# Advance per-node checkpoint (only forward — no regression).
|
||||
if file_path > self.node_checkpoints.get(node_dir, ""):
|
||||
self.node_checkpoints[node_dir] = file_path
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing {file_path}: {e}")
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue