Two root causes for stale "active" incidents on the dashboard:
1. TypeError bug in _prune_stale_world: last_occurrence / resolved_at
can be an ISO-8601 string (stability-agent via events.py) or a Unix
int (node-agent). The previous session's auto-resolve did plain
`time.time() - last_occ` which raises TypeError for strings,
silently preventing _save_world() from being called and leaving
incidents perpetually "active" on disk.
Fix: add _parse_ts(ts) -> float that handles int, float, and
ISO-8601 strings uniformly. All timestamp arithmetic now goes through
it; returns 0.0 on None / garbage to keep comparisons safe.
2. Orphaned active incidents: _resolve_incident clears service["incident_id"]
and marks the incident "resolved" in memory, but if incidents.json was
truncated mid-write (pre-atomic-write era), the observer loaded it at
next startup with status="active" and no service entry pointing to it.
No code ever touched these orphans again.
Fix: _prune_stale_world now runs two cleanup passes each cycle:
- Case 1 (healthy-linked): service.status=="healthy" AND incident_id
still set → resolve immediately (service cannot have active incident)
- Case 2 (orphaned): active incident with no service link AND
last_occurrence > 5 min ago → resolve (5-min guard for creation race)
Both cases are wrapped in try/except so a bug here never crashes the
observer loop or blocks _save_world.
Also fixes the 7-day stale-incident prune to use _parse_ts so
ISO-string resolved_at values are handled correctly.
3. Operator UI: current_incidents() now filters to status=="active" only.
Resolved incidents were previously included in the /incidents endpoint,
making the dashboard show a wall of historical records as if active.
Nocturnal job investigation: _cleanup_control_plane_fs in node-agent runs
every 60s on VPS (not midnight-specific); it reads observer_checkpoint.json
(now written atomically) and deletes old event files. No non-atomic writes
found. Midnight clustering was likely external (logrotate / OS flush);
the supervisor's resilient loader already handles such transient issues.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
521 lines
23 KiB
Python
521 lines
23 KiB
Python
import os
|
|
import json
|
|
import time
|
|
import glob
|
|
import logging
|
|
import yaml
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
|
|
def _atomic_write_json(path: Path, data) -> None:
|
|
"""Write JSON atomically: write to a sibling .tmp, fsync, then os.replace."""
|
|
tmp = path.with_suffix(".tmp")
|
|
with open(tmp, "w") as f:
|
|
json.dump(data, f, indent=2)
|
|
f.flush()
|
|
os.fsync(f.fileno())
|
|
os.replace(tmp, path)
|
|
|
|
|
|
def _parse_ts(ts) -> float:
|
|
"""Return a Unix timestamp float from ts, which may be int/float or an ISO-8601 string.
|
|
|
|
Events from node-agent use int(time.time()); events from stability-agent / events.py
|
|
use ISO format ('2026-06-03T10:30:00Z'). Both appear in incident fields such as
|
|
last_occurrence and resolved_at, so any arithmetic on them must go through here.
|
|
Returns 0.0 on None or unparseable input so callers can use plain comparisons.
|
|
"""
|
|
if ts is None:
|
|
return 0.0
|
|
if isinstance(ts, (int, float)):
|
|
return float(ts)
|
|
try:
|
|
return datetime.fromisoformat(str(ts).replace("Z", "+00:00")).timestamp()
|
|
except Exception:
|
|
return 0.0
|
|
|
|
# Constants and Paths
|
|
RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab")
|
|
EVENTS_DIR = Path(RUNTIME_PATH) / "events"
|
|
STATE_DIR = Path(RUNTIME_PATH) / "state"
|
|
LOGS_DIR = Path(RUNTIME_PATH) / "logs"
|
|
WORLD_DIR = Path(RUNTIME_PATH) / "world"
|
|
OBSERVER_STATE_FILE = STATE_DIR / "observer_checkpoint.json"
|
|
|
|
REPO_ROOT = Path(__file__).parent.parent.parent
|
|
INVENTORY_TOPOLOGY = REPO_ROOT / "inventory" / "topology.yaml"
|
|
|
|
# Logging setup
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger("observer")
|
|
|
|
class Observer:
|
|
def __init__(self):
|
|
# Per-node-directory checkpoint: {"vps": "last/file/path", "piha": "last/file/path"}
|
|
# Replaces the old single last_processed_file which silently skipped event dirs
|
|
# that sort alphabetically before the checkpoint (e.g. piha/ < vps/).
|
|
self.node_checkpoints: dict = {}
|
|
self.world_state = {
|
|
"nodes": {},
|
|
"services": {},
|
|
"deployments": {},
|
|
"incidents": {},
|
|
"summary": {
|
|
"last_update": datetime.now(timezone.utc).isoformat(),
|
|
"status": "initializing",
|
|
"active_incidents_count": 0
|
|
}
|
|
}
|
|
self.inventory = self._load_inventory()
|
|
self._ensure_dirs()
|
|
self._load_checkpoint()
|
|
|
|
def _ensure_dirs(self):
|
|
WORLD_DIR.mkdir(parents=True, exist_ok=True)
|
|
STATE_DIR.mkdir(parents=True, exist_ok=True)
|
|
EVENTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
LOGS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
def _load_inventory(self):
|
|
inventory = {"nodes": {}, "services": {}}
|
|
try:
|
|
if INVENTORY_TOPOLOGY.exists():
|
|
with open(INVENTORY_TOPOLOGY, "r") as f:
|
|
topo = yaml.safe_load(f)
|
|
for node_name, node_info in topo.get("nodes", {}).items():
|
|
inventory["nodes"][node_name] = {
|
|
"roles": node_info.get("roles", []),
|
|
"connectivity": node_info.get("connectivity", {})
|
|
}
|
|
|
|
# Load service assignments from hosts files
|
|
hosts_dir = REPO_ROOT / "hosts"
|
|
for host_dir in hosts_dir.iterdir():
|
|
if host_dir.is_dir():
|
|
svc_file = host_dir / "services.yaml"
|
|
if svc_file.exists():
|
|
with open(svc_file, "r") as f:
|
|
svc_data = yaml.safe_load(f)
|
|
host_name = svc_data.get("host")
|
|
for svc_name, svc_info in svc_data.get("services", {}).items():
|
|
if host_name not in inventory["services"]:
|
|
inventory["services"][host_name] = {}
|
|
inventory["services"][host_name][svc_name] = {
|
|
"role": svc_info.get("role"),
|
|
"exposure": svc_info.get("exposure")
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Failed to load inventory: {e}")
|
|
return inventory
|
|
|
|
def _load_checkpoint(self):
|
|
if OBSERVER_STATE_FILE.exists():
|
|
try:
|
|
with open(OBSERVER_STATE_FILE, "r") as f:
|
|
checkpoint = json.load(f)
|
|
|
|
if "node_checkpoints" in checkpoint:
|
|
# New format: per-directory checkpoints.
|
|
self.node_checkpoints = checkpoint["node_checkpoints"]
|
|
elif "last_processed_file" in checkpoint:
|
|
# Migrate old single-file checkpoint: extract node dir from path.
|
|
old = checkpoint["last_processed_file"]
|
|
if old:
|
|
try:
|
|
node_dir = Path(old).relative_to(EVENTS_DIR).parts[0]
|
|
self.node_checkpoints = {node_dir: old}
|
|
logger.info(f"Migrated old checkpoint → node_checkpoints: {self.node_checkpoints}")
|
|
except Exception:
|
|
pass # Bad path — start fresh
|
|
|
|
self._load_world_from_disk()
|
|
except Exception as e:
|
|
logger.error(f"Failed to load checkpoint: {e}")
|
|
|
|
def _load_world_from_disk(self):
|
|
# Optional: Load existing state to resume faster
|
|
files = {
|
|
"nodes": WORLD_DIR / "nodes.json",
|
|
"services": WORLD_DIR / "services.json",
|
|
"deployments": WORLD_DIR / "deployments.json",
|
|
"incidents": WORLD_DIR / "incidents.json",
|
|
"summary": WORLD_DIR / "runtime-summary.json"
|
|
}
|
|
for key, path in files.items():
|
|
if path.exists():
|
|
try:
|
|
with open(path, "r") as f:
|
|
self.world_state[key] = json.load(f)
|
|
except Exception as e:
|
|
logger.error(f"Failed to load {key} state: {e}")
|
|
|
|
def _save_checkpoint(self):
|
|
try:
|
|
_atomic_write_json(OBSERVER_STATE_FILE, {"node_checkpoints": self.node_checkpoints})
|
|
except Exception as e:
|
|
logger.error(f"Failed to save checkpoint: {e}")
|
|
|
|
def _prune_stale_world(self):
|
|
"""Remove world-state entries for nodes absent from the topology inventory.
|
|
|
|
Root cause this guards against: when NODE_NAME env var is unset, node_agent.py
|
|
falls back to socket.gethostname(), which inside a Docker container returns the
|
|
12-char hex container ID (e.g. 'be17cb6eb0f6') instead of the canonical host name
|
|
('vps'). The observer ingests those events and creates ghost entries that never
|
|
expire on their own.
|
|
|
|
Also ages out resolved incidents older than 7 days to keep world state lean.
|
|
"""
|
|
known_nodes = set(self.inventory["nodes"].keys())
|
|
if not known_nodes:
|
|
# Inventory failed to load — don't prune to avoid wiping valid state.
|
|
return
|
|
|
|
stale_nodes = [n for n in list(self.world_state["nodes"].keys())
|
|
if n not in known_nodes]
|
|
for n in stale_nodes:
|
|
logger.info(f"Pruning stale node from world state: {n}")
|
|
del self.world_state["nodes"][n]
|
|
|
|
stale_svcs = [k for k in list(self.world_state["services"].keys())
|
|
if k.split("/")[0] in stale_nodes]
|
|
for k in stale_svcs:
|
|
logger.info(f"Pruning stale service from world state: {k}")
|
|
del self.world_state["services"][k]
|
|
|
|
# Prune ghost service keys whose service-name portion is a hash-prefixed
|
|
# Docker stale-state artifact (e.g. "9e36297651e7_control-plane-observer").
|
|
# These are created when node-agent incorrectly uses c.name instead of the
|
|
# compose label, and accumulate on every container rebuild.
|
|
# Pattern: <node>/<12hexchars>_<real-name>
|
|
ghost_svcs = [
|
|
k for k in list(self.world_state["services"].keys())
|
|
if len(k.split("/", 1)) == 2
|
|
and len(k.split("/", 1)[1]) > 13
|
|
and k.split("/", 1)[1][12] == "_"
|
|
and all(ch in "0123456789abcdef" for ch in k.split("/", 1)[1][:12])
|
|
]
|
|
for k in ghost_svcs:
|
|
logger.info(f"Pruning ghost (hash-prefixed) service key from world state: {k}")
|
|
del self.world_state["services"][k]
|
|
|
|
now = time.time()
|
|
|
|
try:
|
|
# Collect incident_ids currently referenced by any service entry.
|
|
linked_ids: set = {
|
|
svc.get("incident_id")
|
|
for svc in self.world_state["services"].values()
|
|
if svc.get("incident_id")
|
|
}
|
|
|
|
# Case 1 — service is healthy but still points at an active incident.
|
|
# process_event already calls _resolve_incident on service_healthy events,
|
|
# but if the observer restarted with on-disk state where the link was
|
|
# intact (inconsistency from a pre-atomic-write crash), it may not get
|
|
# resolved until the next service_healthy event is processed. Resolve
|
|
# immediately — a healthy service cannot have an ongoing incident.
|
|
for svc_key, svc in self.world_state["services"].items():
|
|
if svc.get("status") != "healthy":
|
|
continue
|
|
inc_id = svc.get("incident_id")
|
|
if not inc_id:
|
|
continue
|
|
inc = self.world_state["incidents"].get(inc_id, {})
|
|
if inc.get("status") == "active":
|
|
logger.info(
|
|
f"Auto-resolving incident {inc_id} for {svc_key}: "
|
|
f"service is healthy"
|
|
)
|
|
inc["status"] = "resolved"
|
|
inc["resolved_at"] = now
|
|
svc["incident_id"] = None
|
|
linked_ids.discard(inc_id)
|
|
|
|
# Case 2 — orphaned active incident: no service entry links to it and
|
|
# last_occurrence is older than 5 minutes (guard against creation races).
|
|
# These are the stale records left behind when on-disk state was
|
|
# inconsistent: the service entry had incident_id cleared but incidents.json
|
|
# still had the record as "active".
|
|
for inc_id, inc in self.world_state["incidents"].items():
|
|
if inc.get("status") != "active":
|
|
continue
|
|
if inc_id in linked_ids:
|
|
continue
|
|
age = now - _parse_ts(inc.get("last_occurrence"))
|
|
if age > 300: # 5-minute guard
|
|
logger.info(
|
|
f"Auto-resolving orphaned incident {inc_id} "
|
|
f"(service={inc.get('service')}, node={inc.get('node')}): "
|
|
f"no service references it, age={int(age)}s"
|
|
)
|
|
inc["status"] = "resolved"
|
|
inc["resolved_at"] = now
|
|
|
|
except Exception as exc:
|
|
logger.error(f"Error during incident auto-resolve in _prune_stale_world: {exc}")
|
|
|
|
# Remove resolved incidents older than 7 days.
|
|
# Use _parse_ts so ISO-string resolved_at values are handled correctly.
|
|
stale_incidents = [
|
|
k for k, v in self.world_state["incidents"].items()
|
|
if v.get("status") == "resolved"
|
|
and now - _parse_ts(v.get("resolved_at")) > 7 * 86400
|
|
]
|
|
for k in stale_incidents:
|
|
del self.world_state["incidents"][k]
|
|
|
|
def _save_world(self):
|
|
self.world_state["summary"]["last_update"] = datetime.now(timezone.utc).isoformat()
|
|
active_incidents = [
|
|
k for k, v in self.world_state["incidents"].items() if v.get("status") == "active"
|
|
]
|
|
self.world_state["summary"]["active_incidents_count"] = len(active_incidents)
|
|
self.world_state["summary"]["node_count"] = len(self.world_state["nodes"])
|
|
self.world_state["summary"]["service_count"] = len(self.world_state["services"])
|
|
|
|
if active_incidents:
|
|
self.world_state["summary"]["status"] = "degraded"
|
|
else:
|
|
self.world_state["summary"]["status"] = "nominal"
|
|
|
|
files = {
|
|
"nodes.json": self.world_state["nodes"],
|
|
"services.json": self.world_state["services"],
|
|
"deployments.json": self.world_state["deployments"],
|
|
"incidents.json": self.world_state["incidents"],
|
|
"recommendations.json": [],
|
|
"runtime-summary.json": self.world_state["summary"]
|
|
}
|
|
for filename, data in files.items():
|
|
try:
|
|
_atomic_write_json(WORLD_DIR / filename, data)
|
|
except Exception as e:
|
|
logger.error(f"Failed to save {filename}: {e}")
|
|
|
|
def process_event(self, event):
|
|
etype = event.get("type")
|
|
node = event.get("node")
|
|
service = event.get("service")
|
|
severity = event.get("severity")
|
|
timestamp = event.get("timestamp")
|
|
cid = event.get("correlation_id")
|
|
payload = event.get("payload", {})
|
|
|
|
# 1. Update Node State
|
|
if node not in self.world_state["nodes"]:
|
|
self.world_state["nodes"][node] = {
|
|
"status": "unknown",
|
|
"last_seen": None,
|
|
"roles": self.inventory["nodes"].get(node, {}).get("roles", [])
|
|
}
|
|
self.world_state["nodes"][node]["last_seen"] = timestamp
|
|
|
|
if etype == "node_online":
|
|
self.world_state["nodes"][node]["status"] = "online"
|
|
elif etype == "node_offline":
|
|
self.world_state["nodes"][node]["status"] = "offline"
|
|
|
|
elif etype == "node_health":
|
|
# Regular heartbeat from node-agent; updates resource metrics.
|
|
# Clears disk_pressure if disk is now healthy (< warn threshold).
|
|
self.world_state["nodes"][node]["status"] = "online"
|
|
self.world_state["nodes"][node].update({
|
|
"disk_usage_pct": payload.get("disk_pct"),
|
|
"mem_usage_pct": payload.get("mem_pct"),
|
|
"cpu_usage_pct": payload.get("cpu_pct"),
|
|
})
|
|
if (payload.get("disk_pct") or 0) < 75:
|
|
self.world_state["nodes"][node].pop("disk_pressure", None)
|
|
|
|
elif etype == "disk_pressure":
|
|
# Emitted when disk usage crosses 75 % (medium) or 85 % (high).
|
|
# The supervisor reads disk_pressure to generate disk_cleanup actions.
|
|
self.world_state["nodes"][node]["disk_pressure"] = severity
|
|
self.world_state["nodes"][node]["disk_usage_pct"] = payload.get("usage_pct")
|
|
|
|
elif etype == "high_memory":
|
|
# Memory pressure observation; recorded on the node for correlation.
|
|
# No automated action — operator decides if a container restart helps.
|
|
self.world_state["nodes"][node]["memory_pressure"] = severity
|
|
self.world_state["nodes"][node]["mem_usage_pct"] = payload.get("usage_pct")
|
|
|
|
elif etype == "high_cpu":
|
|
# CPU pressure observation; recorded for visibility.
|
|
self.world_state["nodes"][node]["cpu_pressure"] = severity
|
|
self.world_state["nodes"][node]["cpu_usage_pct"] = payload.get("usage_pct")
|
|
|
|
# 2. Update Service State
|
|
if service and service != "all":
|
|
svc_key = f"{node}/{service}"
|
|
if svc_key not in self.world_state["services"]:
|
|
self.world_state["services"][svc_key] = {
|
|
"node": node,
|
|
"service": service,
|
|
"status": "unknown",
|
|
"last_check": None,
|
|
"incident_id": None
|
|
}
|
|
self.world_state["services"][svc_key]["last_check"] = timestamp
|
|
|
|
if etype == "service_recovered":
|
|
self.world_state["services"][svc_key]["status"] = "healthy"
|
|
self._resolve_incident(svc_key, timestamp)
|
|
elif etype == "service_healthy":
|
|
# Positive confirmation from node-agent that a managed container
|
|
# is running. This keeps services.json populated so the supervisor
|
|
# can correctly detect drift (absent entry = never reported = unknown,
|
|
# not the same as confirmed missing).
|
|
# Also resolve any active incident — if a service that had been
|
|
# unhealthy/crashing is now confirmed healthy, the incident is over.
|
|
self.world_state["services"][svc_key]["status"] = "healthy"
|
|
self._resolve_incident(svc_key, timestamp)
|
|
elif etype in ["service_unhealthy", "healthcheck_failed"]:
|
|
self.world_state["services"][svc_key]["status"] = "unhealthy"
|
|
self._handle_incident(svc_key, event)
|
|
|
|
# 3. Update Deployment State
|
|
if etype.startswith("deployment_") and cid:
|
|
if cid not in self.world_state["deployments"]:
|
|
self.world_state["deployments"][cid] = {
|
|
"node": node,
|
|
"service": service,
|
|
"status": "unknown",
|
|
"started_at": None,
|
|
"finished_at": None,
|
|
"events": []
|
|
}
|
|
self.world_state["deployments"][cid]["events"].append({
|
|
"type": etype,
|
|
"timestamp": timestamp,
|
|
"payload": payload
|
|
})
|
|
if etype == "deployment_started":
|
|
self.world_state["deployments"][cid]["status"] = "in_progress"
|
|
self.world_state["deployments"][cid]["started_at"] = timestamp
|
|
elif etype == "deployment_completed":
|
|
self.world_state["deployments"][cid]["status"] = "completed"
|
|
self.world_state["deployments"][cid]["finished_at"] = timestamp
|
|
elif etype == "deployment_failed":
|
|
self.world_state["deployments"][cid]["status"] = "failed"
|
|
self.world_state["deployments"][cid]["finished_at"] = timestamp
|
|
# Deployment failure often creates an incident
|
|
self._handle_deployment_failure(event)
|
|
|
|
def _handle_incident(self, svc_key, event):
|
|
# Correlation: collapse repeated failures for the same service on the same node
|
|
active_incident = self.world_state["services"][svc_key].get("incident_id")
|
|
|
|
if active_incident and active_incident in self.world_state["incidents"]:
|
|
incident = self.world_state["incidents"][active_incident]
|
|
if incident["status"] == "active":
|
|
incident["last_occurrence"] = event["timestamp"]
|
|
incident["occurrence_count"] = incident.get("occurrence_count", 1) + 1
|
|
incident["events"].append(event["timestamp"])
|
|
return
|
|
|
|
# Create new incident
|
|
incident_id = f"inc-{int(time.time())}-{event.get('node')}-{event.get('service')}"
|
|
self.world_state["incidents"][incident_id] = {
|
|
"id": incident_id,
|
|
"node": event.get("node"),
|
|
"service": event.get("service"),
|
|
"status": "active",
|
|
"severity": event.get("severity"),
|
|
# trigger_type records the event type that opened this incident so that
|
|
# the supervisor can choose the appropriate remediation action
|
|
# (e.g. container_restart for containers_not_running / mqtt_unreachable
|
|
# vs. a full redeploy for other causes).
|
|
"trigger_type": event.get("type"),
|
|
"started_at": event.get("timestamp"),
|
|
"last_occurrence": event.get("timestamp"),
|
|
"occurrence_count": 1,
|
|
"events": [event["timestamp"]],
|
|
"correlation_id": event.get("correlation_id")
|
|
}
|
|
self.world_state["services"][svc_key]["incident_id"] = incident_id
|
|
|
|
def _resolve_incident(self, svc_key, timestamp):
|
|
incident_id = self.world_state["services"][svc_key].get("incident_id")
|
|
if incident_id and incident_id in self.world_state["incidents"]:
|
|
if self.world_state["incidents"][incident_id]["status"] == "active":
|
|
self.world_state["incidents"][incident_id]["status"] = "resolved"
|
|
self.world_state["incidents"][incident_id]["resolved_at"] = timestamp
|
|
self.world_state["services"][svc_key]["incident_id"] = None
|
|
|
|
def _handle_deployment_failure(self, event):
|
|
# Specific logic for deployment failures
|
|
svc_key = f"{event.get('node')}/{event.get('service')}"
|
|
self._handle_incident(svc_key, event)
|
|
|
|
# Link diagnostics if available in payload
|
|
incident_id = self.world_state["services"][svc_key].get("incident_id")
|
|
if incident_id and incident_id in self.world_state["incidents"]:
|
|
payload = event.get("payload", {})
|
|
if "diagnostics_file" in payload:
|
|
self.world_state["incidents"][incident_id]["diagnostics_ref"] = payload["diagnostics_file"]
|
|
elif "error" in payload:
|
|
self.world_state["incidents"][incident_id]["last_error"] = payload["error"]
|
|
|
|
def run_once(self):
|
|
# Update heartbeat
|
|
heartbeat_file = STATE_DIR / "observer.heartbeat"
|
|
try:
|
|
heartbeat_file.touch()
|
|
except Exception as e:
|
|
logger.error(f"Failed to touch heartbeat file: {e}")
|
|
|
|
# Collect all event files grouped by node directory.
|
|
# Per-node checkpoints are compared within each directory independently,
|
|
# so late-arriving events from remote nodes (sorted earlier in the path)
|
|
# are never skipped just because another node's checkpoint is further ahead.
|
|
all_files = sorted(glob.glob(str(EVENTS_DIR / "**" / "*.json"), recursive=True))
|
|
|
|
new_files = []
|
|
for file_path in all_files:
|
|
try:
|
|
node_dir = str(Path(file_path).relative_to(EVENTS_DIR).parts[0])
|
|
except (IndexError, ValueError):
|
|
node_dir = "__unknown__"
|
|
last_for_node = self.node_checkpoints.get(node_dir, "")
|
|
if file_path > last_for_node:
|
|
new_files.append((node_dir, file_path))
|
|
|
|
if not new_files:
|
|
# Even if no new events, prune stale entries and refresh summary freshness.
|
|
self._prune_stale_world()
|
|
self._save_world()
|
|
return
|
|
|
|
logger.info(f"Processing {len(new_files)} new events across "
|
|
f"{len({n for n, _ in new_files})} node(s)")
|
|
for node_dir, file_path in new_files:
|
|
try:
|
|
with open(file_path, "r") as f:
|
|
event = json.load(f)
|
|
self.process_event(event)
|
|
# Advance per-node checkpoint (only forward — no regression).
|
|
if file_path > self.node_checkpoints.get(node_dir, ""):
|
|
self.node_checkpoints[node_dir] = file_path
|
|
except Exception as e:
|
|
logger.error(f"Error processing {file_path}: {e}")
|
|
|
|
self._save_checkpoint()
|
|
self._prune_stale_world()
|
|
self._save_world()
|
|
|
|
def loop(self, interval=5):
|
|
logger.info("Starting observer loop")
|
|
while True:
|
|
self.run_once()
|
|
time.sleep(interval)
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
observer = Observer()
|
|
if "--run-once" in sys.argv:
|
|
observer.run_once()
|
|
else:
|
|
observer.loop()
|