Recovery from bad merge of task/observer-poison-quarantine (c255a02)
which carried false deletes from a stale branch base. Re-applies only
the genuine observer changes on top of correct master state.
When an event file fails to parse (malformed JSON, truncated, corrupted),
the observer previously kept retrying on every cycle while the node's
checkpoint stayed pinned — all subsequent good events for that node lost.
Now: first parse failure -> atomic os.replace to STATE_DIR/observer_failed_events/<node>/
with collision handling. Checkpoint advances, downstream events flow.
Move failures are logged but don't crash the loop.
Complementary to the atomic_write_json fix on state files; this addresses
the same race-pattern on event files instead.
Regression test asserts: bad event quarantined to failed_events dir,
removed from hot path, subsequent good event processed (node online),
checkpoint moves to good event.
547 lines
24 KiB
Python
547 lines
24 KiB
Python
import os
|
|
import json
|
|
import time
|
|
import glob
|
|
import logging
|
|
import yaml
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
|
|
def _atomic_write_json(path: Path, data) -> None:
|
|
"""Write JSON atomically: write to a sibling .tmp, fsync, then os.replace."""
|
|
tmp = path.with_suffix(".tmp")
|
|
with open(tmp, "w") as f:
|
|
json.dump(data, f, indent=2)
|
|
f.flush()
|
|
os.fsync(f.fileno())
|
|
os.replace(tmp, path)
|
|
|
|
|
|
def _parse_ts(ts) -> float:
|
|
"""Return a Unix timestamp float from ts, which may be int/float or an ISO-8601 string.
|
|
|
|
Events from node-agent use int(time.time()); events from stability-agent / events.py
|
|
use ISO format ('2026-06-03T10:30:00Z'). Both appear in incident fields such as
|
|
last_occurrence and resolved_at, so any arithmetic on them must go through here.
|
|
Returns 0.0 on None or unparseable input so callers can use plain comparisons.
|
|
"""
|
|
if ts is None:
|
|
return 0.0
|
|
if isinstance(ts, (int, float)):
|
|
return float(ts)
|
|
try:
|
|
return datetime.fromisoformat(str(ts).replace("Z", "+00:00")).timestamp()
|
|
except Exception:
|
|
return 0.0
|
|
|
|
# Constants and Paths
|
|
RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab")
|
|
EVENTS_DIR = Path(RUNTIME_PATH) / "events"
|
|
STATE_DIR = Path(RUNTIME_PATH) / "state"
|
|
LOGS_DIR = Path(RUNTIME_PATH) / "logs"
|
|
WORLD_DIR = Path(RUNTIME_PATH) / "world"
|
|
OBSERVER_STATE_FILE = STATE_DIR / "observer_checkpoint.json"
|
|
FAILED_EVENTS_DIR = STATE_DIR / "observer_failed_events"
|
|
|
|
REPO_ROOT = Path(__file__).parent.parent.parent
|
|
INVENTORY_TOPOLOGY = REPO_ROOT / "inventory" / "topology.yaml"
|
|
|
|
# Logging setup
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger("observer")
|
|
|
|
class Observer:
|
|
def __init__(self):
|
|
# Per-node-directory checkpoint: {"vps": "last/file/path", "piha": "last/file/path"}
|
|
# Replaces the old single last_processed_file which silently skipped event dirs
|
|
# that sort alphabetically before the checkpoint (e.g. piha/ < vps/).
|
|
self.node_checkpoints: dict = {}
|
|
self.world_state = {
|
|
"nodes": {},
|
|
"services": {},
|
|
"deployments": {},
|
|
"incidents": {},
|
|
"summary": {
|
|
"last_update": datetime.now(timezone.utc).isoformat(),
|
|
"status": "initializing",
|
|
"active_incidents_count": 0
|
|
}
|
|
}
|
|
self.inventory = self._load_inventory()
|
|
self._ensure_dirs()
|
|
self._load_checkpoint()
|
|
|
|
def _ensure_dirs(self):
|
|
WORLD_DIR.mkdir(parents=True, exist_ok=True)
|
|
STATE_DIR.mkdir(parents=True, exist_ok=True)
|
|
EVENTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
LOGS_DIR.mkdir(parents=True, exist_ok=True)
|
|
FAILED_EVENTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
def _quarantine_event_file(self, file_path: str, node_dir: str, exc: Exception) -> None:
|
|
"""Move an unreadable/unprocessable event out of the hot path."""
|
|
src = Path(file_path)
|
|
dest_dir = FAILED_EVENTS_DIR / node_dir
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
dest = dest_dir / src.name
|
|
if dest.exists():
|
|
dest = dest_dir / f"{src.stem}-{int(time.time())}{src.suffix}"
|
|
try:
|
|
os.replace(src, dest)
|
|
logger.error(
|
|
"Quarantined bad event for node_dir=%s: %s -> %s (%s: %s)",
|
|
node_dir, src, dest, type(exc).__name__, exc,
|
|
)
|
|
except Exception as move_exc:
|
|
logger.error(
|
|
"Failed to quarantine bad event for node_dir=%s: %s (%s: %s); move error=%s: %s",
|
|
node_dir, src, type(exc).__name__, exc, type(move_exc).__name__, move_exc,
|
|
)
|
|
|
|
def _load_inventory(self):
|
|
inventory = {"nodes": {}, "services": {}}
|
|
try:
|
|
if INVENTORY_TOPOLOGY.exists():
|
|
with open(INVENTORY_TOPOLOGY, "r") as f:
|
|
topo = yaml.safe_load(f)
|
|
for node_name, node_info in topo.get("nodes", {}).items():
|
|
inventory["nodes"][node_name] = {
|
|
"roles": node_info.get("roles", []),
|
|
"connectivity": node_info.get("connectivity", {})
|
|
}
|
|
|
|
# Load service assignments from hosts files
|
|
hosts_dir = REPO_ROOT / "hosts"
|
|
for host_dir in hosts_dir.iterdir():
|
|
if host_dir.is_dir():
|
|
svc_file = host_dir / "services.yaml"
|
|
if svc_file.exists():
|
|
with open(svc_file, "r") as f:
|
|
svc_data = yaml.safe_load(f)
|
|
host_name = svc_data.get("host")
|
|
for svc_name, svc_info in svc_data.get("services", {}).items():
|
|
if host_name not in inventory["services"]:
|
|
inventory["services"][host_name] = {}
|
|
inventory["services"][host_name][svc_name] = {
|
|
"role": svc_info.get("role"),
|
|
"exposure": svc_info.get("exposure")
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Failed to load inventory: {e}")
|
|
return inventory
|
|
|
|
def _load_checkpoint(self):
|
|
if OBSERVER_STATE_FILE.exists():
|
|
try:
|
|
with open(OBSERVER_STATE_FILE, "r") as f:
|
|
checkpoint = json.load(f)
|
|
|
|
if "node_checkpoints" in checkpoint:
|
|
# New format: per-directory checkpoints.
|
|
self.node_checkpoints = checkpoint["node_checkpoints"]
|
|
elif "last_processed_file" in checkpoint:
|
|
# Migrate old single-file checkpoint: extract node dir from path.
|
|
old = checkpoint["last_processed_file"]
|
|
if old:
|
|
try:
|
|
node_dir = Path(old).relative_to(EVENTS_DIR).parts[0]
|
|
self.node_checkpoints = {node_dir: old}
|
|
logger.info(f"Migrated old checkpoint → node_checkpoints: {self.node_checkpoints}")
|
|
except Exception:
|
|
pass # Bad path — start fresh
|
|
|
|
self._load_world_from_disk()
|
|
except Exception as e:
|
|
logger.error(f"Failed to load checkpoint: {e}")
|
|
|
|
def _load_world_from_disk(self):
|
|
# Optional: Load existing state to resume faster
|
|
files = {
|
|
"nodes": WORLD_DIR / "nodes.json",
|
|
"services": WORLD_DIR / "services.json",
|
|
"deployments": WORLD_DIR / "deployments.json",
|
|
"incidents": WORLD_DIR / "incidents.json",
|
|
"summary": WORLD_DIR / "runtime-summary.json"
|
|
}
|
|
for key, path in files.items():
|
|
if path.exists():
|
|
try:
|
|
with open(path, "r") as f:
|
|
self.world_state[key] = json.load(f)
|
|
except Exception as e:
|
|
logger.error(f"Failed to load {key} state: {e}")
|
|
|
|
def _save_checkpoint(self):
|
|
try:
|
|
_atomic_write_json(OBSERVER_STATE_FILE, {"node_checkpoints": self.node_checkpoints})
|
|
except Exception as e:
|
|
logger.error(f"Failed to save checkpoint: {e}")
|
|
|
|
def _prune_stale_world(self):
|
|
"""Remove world-state entries for nodes absent from the topology inventory.
|
|
|
|
Root cause this guards against: when NODE_NAME env var is unset, node_agent.py
|
|
falls back to socket.gethostname(), which inside a Docker container returns the
|
|
12-char hex container ID (e.g. 'be17cb6eb0f6') instead of the canonical host name
|
|
('vps'). The observer ingests those events and creates ghost entries that never
|
|
expire on their own.
|
|
|
|
Also ages out resolved incidents older than 7 days to keep world state lean.
|
|
"""
|
|
known_nodes = set(self.inventory["nodes"].keys())
|
|
if not known_nodes:
|
|
# Inventory failed to load — don't prune to avoid wiping valid state.
|
|
return
|
|
|
|
stale_nodes = [n for n in list(self.world_state["nodes"].keys())
|
|
if n not in known_nodes]
|
|
for n in stale_nodes:
|
|
logger.info(f"Pruning stale node from world state: {n}")
|
|
del self.world_state["nodes"][n]
|
|
|
|
stale_svcs = [k for k in list(self.world_state["services"].keys())
|
|
if k.split("/")[0] in stale_nodes]
|
|
for k in stale_svcs:
|
|
logger.info(f"Pruning stale service from world state: {k}")
|
|
del self.world_state["services"][k]
|
|
|
|
# Prune ghost service keys whose service-name portion is a hash-prefixed
|
|
# Docker stale-state artifact (e.g. "9e36297651e7_control-plane-observer").
|
|
# These are created when node-agent incorrectly uses c.name instead of the
|
|
# compose label, and accumulate on every container rebuild.
|
|
# Pattern: <node>/<12hexchars>_<real-name>
|
|
ghost_svcs = [
|
|
k for k in list(self.world_state["services"].keys())
|
|
if len(k.split("/", 1)) == 2
|
|
and len(k.split("/", 1)[1]) > 13
|
|
and k.split("/", 1)[1][12] == "_"
|
|
and all(ch in "0123456789abcdef" for ch in k.split("/", 1)[1][:12])
|
|
]
|
|
for k in ghost_svcs:
|
|
logger.info(f"Pruning ghost (hash-prefixed) service key from world state: {k}")
|
|
del self.world_state["services"][k]
|
|
|
|
now = time.time()
|
|
|
|
try:
|
|
# Collect incident_ids currently referenced by any service entry.
|
|
linked_ids: set = {
|
|
svc.get("incident_id")
|
|
for svc in self.world_state["services"].values()
|
|
if svc.get("incident_id")
|
|
}
|
|
|
|
# Case 1 — service is healthy but still points at an active incident.
|
|
# process_event already calls _resolve_incident on service_healthy events,
|
|
# but if the observer restarted with on-disk state where the link was
|
|
# intact (inconsistency from a pre-atomic-write crash), it may not get
|
|
# resolved until the next service_healthy event is processed. Resolve
|
|
# immediately — a healthy service cannot have an ongoing incident.
|
|
for svc_key, svc in self.world_state["services"].items():
|
|
if svc.get("status") != "healthy":
|
|
continue
|
|
inc_id = svc.get("incident_id")
|
|
if not inc_id:
|
|
continue
|
|
inc = self.world_state["incidents"].get(inc_id, {})
|
|
if inc.get("status") == "active":
|
|
logger.info(
|
|
f"Auto-resolving incident {inc_id} for {svc_key}: "
|
|
f"service is healthy"
|
|
)
|
|
inc["status"] = "resolved"
|
|
inc["resolved_at"] = now
|
|
svc["incident_id"] = None
|
|
linked_ids.discard(inc_id)
|
|
|
|
# Case 2 — orphaned active incident: no service entry links to it and
|
|
# last_occurrence is older than 5 minutes (guard against creation races).
|
|
# These are the stale records left behind when on-disk state was
|
|
# inconsistent: the service entry had incident_id cleared but incidents.json
|
|
# still had the record as "active".
|
|
for inc_id, inc in self.world_state["incidents"].items():
|
|
if inc.get("status") != "active":
|
|
continue
|
|
if inc_id in linked_ids:
|
|
continue
|
|
age = now - _parse_ts(inc.get("last_occurrence"))
|
|
if age > 300: # 5-minute guard
|
|
logger.info(
|
|
f"Auto-resolving orphaned incident {inc_id} "
|
|
f"(service={inc.get('service')}, node={inc.get('node')}): "
|
|
f"no service references it, age={int(age)}s"
|
|
)
|
|
inc["status"] = "resolved"
|
|
inc["resolved_at"] = now
|
|
|
|
except Exception as exc:
|
|
logger.error(f"Error during incident auto-resolve in _prune_stale_world: {exc}")
|
|
|
|
# Remove resolved incidents older than 7 days.
|
|
# Use _parse_ts so ISO-string resolved_at values are handled correctly.
|
|
stale_incidents = [
|
|
k for k, v in self.world_state["incidents"].items()
|
|
if v.get("status") == "resolved"
|
|
and now - _parse_ts(v.get("resolved_at")) > 7 * 86400
|
|
]
|
|
for k in stale_incidents:
|
|
del self.world_state["incidents"][k]
|
|
|
|
def _save_world(self):
|
|
self.world_state["summary"]["last_update"] = datetime.now(timezone.utc).isoformat()
|
|
active_incidents = [
|
|
k for k, v in self.world_state["incidents"].items() if v.get("status") == "active"
|
|
]
|
|
self.world_state["summary"]["active_incidents_count"] = len(active_incidents)
|
|
self.world_state["summary"]["node_count"] = len(self.world_state["nodes"])
|
|
self.world_state["summary"]["service_count"] = len(self.world_state["services"])
|
|
|
|
if active_incidents:
|
|
self.world_state["summary"]["status"] = "degraded"
|
|
else:
|
|
self.world_state["summary"]["status"] = "nominal"
|
|
|
|
files = {
|
|
"nodes.json": self.world_state["nodes"],
|
|
"services.json": self.world_state["services"],
|
|
"deployments.json": self.world_state["deployments"],
|
|
"incidents.json": self.world_state["incidents"],
|
|
"recommendations.json": [],
|
|
"runtime-summary.json": self.world_state["summary"]
|
|
}
|
|
for filename, data in files.items():
|
|
try:
|
|
_atomic_write_json(WORLD_DIR / filename, data)
|
|
except Exception as e:
|
|
logger.error(f"Failed to save {filename}: {e}")
|
|
|
|
def process_event(self, event):
|
|
etype = event.get("type")
|
|
node = event.get("node")
|
|
service = event.get("service")
|
|
severity = event.get("severity")
|
|
timestamp = event.get("timestamp")
|
|
cid = event.get("correlation_id")
|
|
payload = event.get("payload", {})
|
|
|
|
# 1. Update Node State
|
|
if node not in self.world_state["nodes"]:
|
|
self.world_state["nodes"][node] = {
|
|
"status": "unknown",
|
|
"last_seen": None,
|
|
"roles": self.inventory["nodes"].get(node, {}).get("roles", [])
|
|
}
|
|
self.world_state["nodes"][node]["last_seen"] = timestamp
|
|
|
|
if etype == "node_online":
|
|
self.world_state["nodes"][node]["status"] = "online"
|
|
elif etype == "node_offline":
|
|
self.world_state["nodes"][node]["status"] = "offline"
|
|
|
|
elif etype == "node_health":
|
|
# Regular heartbeat from node-agent; updates resource metrics.
|
|
# Clears disk_pressure if disk is now healthy (< warn threshold).
|
|
self.world_state["nodes"][node]["status"] = "online"
|
|
self.world_state["nodes"][node].update({
|
|
"disk_usage_pct": payload.get("disk_pct"),
|
|
"mem_usage_pct": payload.get("mem_pct"),
|
|
"cpu_usage_pct": payload.get("cpu_pct"),
|
|
})
|
|
if (payload.get("disk_pct") or 0) < 75:
|
|
self.world_state["nodes"][node].pop("disk_pressure", None)
|
|
|
|
elif etype == "disk_pressure":
|
|
# Emitted when disk usage crosses 75 % (medium) or 85 % (high).
|
|
# The supervisor reads disk_pressure to generate disk_cleanup actions.
|
|
self.world_state["nodes"][node]["disk_pressure"] = severity
|
|
self.world_state["nodes"][node]["disk_usage_pct"] = payload.get("usage_pct")
|
|
|
|
elif etype == "high_memory":
|
|
# Memory pressure observation; recorded on the node for correlation.
|
|
# No automated action — operator decides if a container restart helps.
|
|
self.world_state["nodes"][node]["memory_pressure"] = severity
|
|
self.world_state["nodes"][node]["mem_usage_pct"] = payload.get("usage_pct")
|
|
|
|
elif etype == "high_cpu":
|
|
# CPU pressure observation; recorded for visibility.
|
|
self.world_state["nodes"][node]["cpu_pressure"] = severity
|
|
self.world_state["nodes"][node]["cpu_usage_pct"] = payload.get("usage_pct")
|
|
|
|
# 2. Update Service State
|
|
if service and service != "all":
|
|
svc_key = f"{node}/{service}"
|
|
if svc_key not in self.world_state["services"]:
|
|
self.world_state["services"][svc_key] = {
|
|
"node": node,
|
|
"service": service,
|
|
"status": "unknown",
|
|
"last_check": None,
|
|
"incident_id": None
|
|
}
|
|
self.world_state["services"][svc_key]["last_check"] = timestamp
|
|
|
|
if etype == "service_recovered":
|
|
self.world_state["services"][svc_key]["status"] = "healthy"
|
|
self._resolve_incident(svc_key, timestamp)
|
|
elif etype == "service_healthy":
|
|
# Positive confirmation from node-agent that a managed container
|
|
# is running. This keeps services.json populated so the supervisor
|
|
# can correctly detect drift (absent entry = never reported = unknown,
|
|
# not the same as confirmed missing).
|
|
# Also resolve any active incident — if a service that had been
|
|
# unhealthy/crashing is now confirmed healthy, the incident is over.
|
|
self.world_state["services"][svc_key]["status"] = "healthy"
|
|
self._resolve_incident(svc_key, timestamp)
|
|
elif etype in ["service_unhealthy", "healthcheck_failed"]:
|
|
self.world_state["services"][svc_key]["status"] = "unhealthy"
|
|
self._handle_incident(svc_key, event)
|
|
|
|
# 3. Update Deployment State
|
|
if etype.startswith("deployment_") and cid:
|
|
if cid not in self.world_state["deployments"]:
|
|
self.world_state["deployments"][cid] = {
|
|
"node": node,
|
|
"service": service,
|
|
"status": "unknown",
|
|
"started_at": None,
|
|
"finished_at": None,
|
|
"events": []
|
|
}
|
|
self.world_state["deployments"][cid]["events"].append({
|
|
"type": etype,
|
|
"timestamp": timestamp,
|
|
"payload": payload
|
|
})
|
|
if etype == "deployment_started":
|
|
self.world_state["deployments"][cid]["status"] = "in_progress"
|
|
self.world_state["deployments"][cid]["started_at"] = timestamp
|
|
elif etype == "deployment_completed":
|
|
self.world_state["deployments"][cid]["status"] = "completed"
|
|
self.world_state["deployments"][cid]["finished_at"] = timestamp
|
|
elif etype == "deployment_failed":
|
|
self.world_state["deployments"][cid]["status"] = "failed"
|
|
self.world_state["deployments"][cid]["finished_at"] = timestamp
|
|
# Deployment failure often creates an incident
|
|
self._handle_deployment_failure(event)
|
|
|
|
def _handle_incident(self, svc_key, event):
|
|
# Correlation: collapse repeated failures for the same service on the same node
|
|
active_incident = self.world_state["services"][svc_key].get("incident_id")
|
|
|
|
if active_incident and active_incident in self.world_state["incidents"]:
|
|
incident = self.world_state["incidents"][active_incident]
|
|
if incident["status"] == "active":
|
|
incident["last_occurrence"] = event["timestamp"]
|
|
incident["occurrence_count"] = incident.get("occurrence_count", 1) + 1
|
|
incident["events"].append(event["timestamp"])
|
|
return
|
|
|
|
# Create new incident
|
|
incident_id = f"inc-{int(time.time())}-{event.get('node')}-{event.get('service')}"
|
|
self.world_state["incidents"][incident_id] = {
|
|
"id": incident_id,
|
|
"node": event.get("node"),
|
|
"service": event.get("service"),
|
|
"status": "active",
|
|
"severity": event.get("severity"),
|
|
# trigger_type records the event type that opened this incident so that
|
|
# the supervisor can choose the appropriate remediation action
|
|
# (e.g. container_restart for containers_not_running / mqtt_unreachable
|
|
# vs. a full redeploy for other causes).
|
|
"trigger_type": event.get("type"),
|
|
"started_at": event.get("timestamp"),
|
|
"last_occurrence": event.get("timestamp"),
|
|
"occurrence_count": 1,
|
|
"events": [event["timestamp"]],
|
|
"correlation_id": event.get("correlation_id")
|
|
}
|
|
self.world_state["services"][svc_key]["incident_id"] = incident_id
|
|
|
|
def _resolve_incident(self, svc_key, timestamp):
|
|
incident_id = self.world_state["services"][svc_key].get("incident_id")
|
|
if incident_id and incident_id in self.world_state["incidents"]:
|
|
if self.world_state["incidents"][incident_id]["status"] == "active":
|
|
self.world_state["incidents"][incident_id]["status"] = "resolved"
|
|
self.world_state["incidents"][incident_id]["resolved_at"] = timestamp
|
|
self.world_state["services"][svc_key]["incident_id"] = None
|
|
|
|
def _handle_deployment_failure(self, event):
|
|
# Specific logic for deployment failures
|
|
svc_key = f"{event.get('node')}/{event.get('service')}"
|
|
self._handle_incident(svc_key, event)
|
|
|
|
# Link diagnostics if available in payload
|
|
incident_id = self.world_state["services"][svc_key].get("incident_id")
|
|
if incident_id and incident_id in self.world_state["incidents"]:
|
|
payload = event.get("payload", {})
|
|
if "diagnostics_file" in payload:
|
|
self.world_state["incidents"][incident_id]["diagnostics_ref"] = payload["diagnostics_file"]
|
|
elif "error" in payload:
|
|
self.world_state["incidents"][incident_id]["last_error"] = payload["error"]
|
|
|
|
def run_once(self):
|
|
# Update heartbeat
|
|
heartbeat_file = STATE_DIR / "observer.heartbeat"
|
|
try:
|
|
heartbeat_file.touch()
|
|
except Exception as e:
|
|
logger.error(f"Failed to touch heartbeat file: {e}")
|
|
|
|
# Collect all event files grouped by node directory.
|
|
# Per-node checkpoints are compared within each directory independently,
|
|
# so late-arriving events from remote nodes (sorted earlier in the path)
|
|
# are never skipped just because another node's checkpoint is further ahead.
|
|
all_files = sorted(glob.glob(str(EVENTS_DIR / "**" / "*.json"), recursive=True))
|
|
|
|
new_files = []
|
|
for file_path in all_files:
|
|
try:
|
|
node_dir = str(Path(file_path).relative_to(EVENTS_DIR).parts[0])
|
|
except (IndexError, ValueError):
|
|
node_dir = "__unknown__"
|
|
last_for_node = self.node_checkpoints.get(node_dir, "")
|
|
if file_path > last_for_node:
|
|
new_files.append((node_dir, file_path))
|
|
|
|
if not new_files:
|
|
# Even if no new events, prune stale entries and refresh summary freshness.
|
|
self._prune_stale_world()
|
|
self._save_world()
|
|
return
|
|
|
|
logger.info(f"Processing {len(new_files)} new events across "
|
|
f"{len({n for n, _ in new_files})} node(s)")
|
|
for node_dir, file_path in new_files:
|
|
try:
|
|
with open(file_path, "r") as f:
|
|
event = json.load(f)
|
|
self.process_event(event)
|
|
# Advance per-node checkpoint (only forward — no regression).
|
|
if file_path > self.node_checkpoints.get(node_dir, ""):
|
|
self.node_checkpoints[node_dir] = file_path
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error processing node_dir=%s file=%s (%s: %s)",
|
|
node_dir, file_path, type(e).__name__, e,
|
|
)
|
|
self._quarantine_event_file(file_path, node_dir, e)
|
|
|
|
self._save_checkpoint()
|
|
self._prune_stale_world()
|
|
self._save_world()
|
|
|
|
def loop(self, interval=5):
|
|
logger.info("Starting observer loop")
|
|
while True:
|
|
self.run_once()
|
|
time.sleep(interval)
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
observer = Observer()
|
|
if "--run-once" in sys.argv:
|
|
observer.run_once()
|
|
else:
|
|
observer.loop()
|