From 8f5b90501577b92f1e46e52fb003ab3ecd3f7f04 Mon Sep 17 00:00:00 2001 From: Oskar Kapala Date: Tue, 12 May 2026 14:07:03 +0200 Subject: [PATCH] Implement observer runtime world synthesis engine --- docs/observer-runtime.md | 63 +++++++ scripts/observer/observer.py | 308 +++++++++++++++++++++++++++++++++ scripts/observer/test_setup.sh | 83 +++++++++ 3 files changed, 454 insertions(+) create mode 100644 docs/observer-runtime.md create mode 100644 scripts/observer/observer.py create mode 100644 scripts/observer/test_setup.sh diff --git a/docs/observer-runtime.md b/docs/observer-runtime.md new file mode 100644 index 0000000..dc6b1e6 --- /dev/null +++ b/docs/observer-runtime.md @@ -0,0 +1,63 @@ +# Observer Runtime + +The Observer Runtime is a lightweight agent responsible for synthesizing the operational world state of the homelab from raw events, logs, and state files. + +## Architecture + +The observer follows a filesystem-first approach, consuming append-only events and generating a normalized world model. It is designed to be idempotent, resumable, and resilient to intermittent node connectivity. + +### Inputs +- `/opt/homelab/events/`: Normalized JSON events. +- `/opt/homelab/state/`: Deployment stage markers and internal observer checkpoint. +- `/opt/homelab/logs/`: Detailed execution logs and diagnostics. +- Repository Inventory: `inventory/topology.yaml` and `hosts/*/services.yaml`. + +### World Model Output +Generated under `/opt/homelab/world/`: +- `nodes.json`: Current node availability, roles, and last seen timestamps. +- `services.json`: Service health status and links to active incidents. +- `deployments.json`: Tracking of active and historical deployment runs by `correlation_id`. +- `incidents.json`: Correlated operational issues, including repeat failures and resolution status. +- `runtime-summary.json`: High-level overview for dashboards and planner agents. + +## Incident Lifecycle + +The observer implements lightweight incident correlation: + +1. **Detection**: When a `service_unhealthy` or `healthcheck_failed` event is consumed, a new incident is created or an existing active incident for that service is updated. +2. **Correlation**: Multiple failure events for the same service on the same node are collapsed into a single incident, tracking the `occurrence_count`. +3. **Diagnostics**: Deployment failures (`deployment_failed`) automatically attach references to diagnostic files if present in the event payload. +4. **Resolution**: A `service_recovered` event for a service will transition any active incidents for that service to a `resolved` state. + +### Example Incident JSON +```json +{ + "inc-1715518800-saturn-mosquitto": { + "id": "inc-1715518800-saturn-mosquitto", + "node": "saturn", + "service": "mosquitto", + "status": "resolved", + "severity": "error", + "started_at": "2026-05-12T12:05:00Z", + "last_occurrence": "2026-05-12T12:06:00Z", + "occurrence_count": 2, + "events": [ + "2026-05-12T12:05:00Z", + "2026-05-12T12:06:00Z" + ], + "correlation_id": "hc-1", + "resolved_at": "2026-05-12T12:10:00Z" + } +} +``` + +## Runtime Behavior + +### Idempotency +The observer processes events in order. If the world state is lost, deleting the checkpoint file (`/opt/homelab/state/observer_checkpoint.json`) will cause the observer to re-process all events and rebuild the world state. + +### Resumability +The observer tracks the last processed event file in its checkpoint. Upon restart, it continues from the next available event. + +### Deployment Tracking +Deployments are tracked via `correlation_id`. The observer synthesizes the start, end, and status of each deployment run, providing a clear history of changes to the environment. diff --git a/scripts/observer/observer.py b/scripts/observer/observer.py new file mode 100644 index 0000000..31695c6 --- /dev/null +++ b/scripts/observer/observer.py @@ -0,0 +1,308 @@ +import os +import json +import time +import glob +import logging +import yaml +from datetime import datetime, timezone +from pathlib import Path + +# Constants and Paths +RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab") +EVENTS_DIR = Path(RUNTIME_PATH) / "events" +STATE_DIR = Path(RUNTIME_PATH) / "state" +LOGS_DIR = Path(RUNTIME_PATH) / "logs" +WORLD_DIR = Path(RUNTIME_PATH) / "world" +OBSERVER_STATE_FILE = STATE_DIR / "observer_checkpoint.json" + +REPO_ROOT = Path(__file__).parent.parent.parent +INVENTORY_TOPOLOGY = REPO_ROOT / "inventory" / "topology.yaml" + +# Logging setup +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger("observer") + +class Observer: + def __init__(self): + self.last_processed_file = None + self.world_state = { + "nodes": {}, + "services": {}, + "deployments": {}, + "incidents": {}, + "summary": { + "last_update": None, + "status": "initializing", + "active_incidents_count": 0 + } + } + self.inventory = self._load_inventory() + self._ensure_dirs() + self._load_checkpoint() + + def _ensure_dirs(self): + WORLD_DIR.mkdir(parents=True, exist_ok=True) + STATE_DIR.mkdir(parents=True, exist_ok=True) + EVENTS_DIR.mkdir(parents=True, exist_ok=True) + LOGS_DIR.mkdir(parents=True, exist_ok=True) + + def _load_inventory(self): + inventory = {"nodes": {}, "services": {}} + try: + if INVENTORY_TOPOLOGY.exists(): + with open(INVENTORY_TOPOLOGY, "r") as f: + topo = yaml.safe_load(f) + for node_name, node_info in topo.get("nodes", {}).items(): + inventory["nodes"][node_name] = { + "roles": node_info.get("roles", []), + "connectivity": node_info.get("connectivity", {}) + } + + # Load service assignments from hosts files + hosts_dir = REPO_ROOT / "hosts" + for host_dir in hosts_dir.iterdir(): + if host_dir.is_dir(): + svc_file = host_dir / "services.yaml" + if svc_file.exists(): + with open(svc_file, "r") as f: + svc_data = yaml.safe_load(f) + host_name = svc_data.get("host") + for svc_name, svc_info in svc_data.get("services", {}).items(): + if host_name not in inventory["services"]: + inventory["services"][host_name] = {} + inventory["services"][host_name][svc_name] = { + "role": svc_info.get("role"), + "exposure": svc_info.get("exposure") + } + except Exception as e: + logger.error(f"Failed to load inventory: {e}") + return inventory + + def _load_checkpoint(self): + if OBSERVER_STATE_FILE.exists(): + try: + with open(OBSERVER_STATE_FILE, "r") as f: + checkpoint = json.load(f) + self.last_processed_file = checkpoint.get("last_processed_file") + # We might want to persist partial world state, + # but for now we rebuild from events (idempotent) + # or we can load existing world state files. + self._load_world_from_disk() + except Exception as e: + logger.error(f"Failed to load checkpoint: {e}") + + def _load_world_from_disk(self): + # Optional: Load existing state to resume faster + files = { + "nodes": WORLD_DIR / "nodes.json", + "services": WORLD_DIR / "services.json", + "deployments": WORLD_DIR / "deployments.json", + "incidents": WORLD_DIR / "incidents.json", + "summary": WORLD_DIR / "runtime-summary.json" + } + for key, path in files.items(): + if path.exists(): + try: + with open(path, "r") as f: + self.world_state[key] = json.load(f) + except Exception as e: + logger.error(f"Failed to load {key} state: {e}") + + def _save_checkpoint(self): + try: + with open(OBSERVER_STATE_FILE, "w") as f: + json.dump({"last_processed_file": self.last_processed_file}, f) + except Exception as e: + logger.error(f"Failed to save checkpoint: {e}") + + def _save_world(self): + self.world_state["summary"]["last_update"] = datetime.now(timezone.utc).isoformat() + active_incidents = [ + k for k, v in self.world_state["incidents"].items() if v.get("status") == "active" + ] + self.world_state["summary"]["active_incidents_count"] = len(active_incidents) + + if active_incidents: + self.world_state["summary"]["status"] = "degraded" + else: + self.world_state["summary"]["status"] = "nominal" + + files = { + "nodes.json": self.world_state["nodes"], + "services.json": self.world_state["services"], + "deployments.json": self.world_state["deployments"], + "incidents.json": self.world_state["incidents"], + "runtime-summary.json": self.world_state["summary"] + } + for filename, data in files.items(): + try: + with open(WORLD_DIR / filename, "w") as f: + json.dump(data, f, indent=2) + except Exception as e: + logger.error(f"Failed to save {filename}: {e}") + + def process_event(self, event): + etype = event.get("type") + node = event.get("node") + service = event.get("service") + severity = event.get("severity") + timestamp = event.get("timestamp") + cid = event.get("correlation_id") + payload = event.get("payload", {}) + + # 1. Update Node State + if node not in self.world_state["nodes"]: + self.world_state["nodes"][node] = { + "status": "unknown", + "last_seen": None, + "roles": self.inventory["nodes"].get(node, {}).get("roles", []) + } + self.world_state["nodes"][node]["last_seen"] = timestamp + + if etype == "node_online": + self.world_state["nodes"][node]["status"] = "online" + elif etype == "node_offline": + self.world_state["nodes"][node]["status"] = "offline" + + # 2. Update Service State + if service and service != "all": + svc_key = f"{node}/{service}" + if svc_key not in self.world_state["services"]: + self.world_state["services"][svc_key] = { + "node": node, + "service": service, + "status": "unknown", + "last_check": None, + "incident_id": None + } + self.world_state["services"][svc_key]["last_check"] = timestamp + + if etype == "service_recovered": + self.world_state["services"][svc_key]["status"] = "healthy" + self._resolve_incident(svc_key, timestamp) + elif etype in ["service_unhealthy", "healthcheck_failed"]: + self.world_state["services"][svc_key]["status"] = "unhealthy" + self._handle_incident(svc_key, event) + + # 3. Update Deployment State + if etype.startswith("deployment_") and cid: + if cid not in self.world_state["deployments"]: + self.world_state["deployments"][cid] = { + "node": node, + "service": service, + "status": "unknown", + "started_at": None, + "finished_at": None, + "events": [] + } + self.world_state["deployments"][cid]["events"].append({ + "type": etype, + "timestamp": timestamp, + "payload": payload + }) + if etype == "deployment_started": + self.world_state["deployments"][cid]["status"] = "in_progress" + self.world_state["deployments"][cid]["started_at"] = timestamp + elif etype == "deployment_completed": + self.world_state["deployments"][cid]["status"] = "completed" + self.world_state["deployments"][cid]["finished_at"] = timestamp + elif etype == "deployment_failed": + self.world_state["deployments"][cid]["status"] = "failed" + self.world_state["deployments"][cid]["finished_at"] = timestamp + # Deployment failure often creates an incident + self._handle_deployment_failure(event) + + def _handle_incident(self, svc_key, event): + # Correlation: collapse repeated failures for the same service on the same node + active_incident = self.world_state["services"][svc_key].get("incident_id") + + if active_incident and active_incident in self.world_state["incidents"]: + incident = self.world_state["incidents"][active_incident] + if incident["status"] == "active": + incident["last_occurrence"] = event["timestamp"] + incident["occurrence_count"] = incident.get("occurrence_count", 1) + 1 + incident["events"].append(event["timestamp"]) + return + + # Create new incident + incident_id = f"inc-{int(time.time())}-{event.get('node')}-{event.get('service')}" + self.world_state["incidents"][incident_id] = { + "id": incident_id, + "node": event.get("node"), + "service": event.get("service"), + "status": "active", + "severity": event.get("severity"), + "started_at": event.get("timestamp"), + "last_occurrence": event.get("timestamp"), + "occurrence_count": 1, + "events": [event["timestamp"]], + "correlation_id": event.get("correlation_id") + } + self.world_state["services"][svc_key]["incident_id"] = incident_id + + def _resolve_incident(self, svc_key, timestamp): + incident_id = self.world_state["services"][svc_key].get("incident_id") + if incident_id and incident_id in self.world_state["incidents"]: + if self.world_state["incidents"][incident_id]["status"] == "active": + self.world_state["incidents"][incident_id]["status"] = "resolved" + self.world_state["incidents"][incident_id]["resolved_at"] = timestamp + self.world_state["services"][svc_key]["incident_id"] = None + + def _handle_deployment_failure(self, event): + # Specific logic for deployment failures + svc_key = f"{event.get('node')}/{event.get('service')}" + self._handle_incident(svc_key, event) + + # Link diagnostics if available in payload + incident_id = self.world_state["services"][svc_key].get("incident_id") + if incident_id and incident_id in self.world_state["incidents"]: + payload = event.get("payload", {}) + if "diagnostics_file" in payload: + self.world_state["incidents"][incident_id]["diagnostics_ref"] = payload["diagnostics_file"] + elif "error" in payload: + self.world_state["incidents"][incident_id]["last_error"] = payload["error"] + + def run_once(self): + # Find all event files + event_files = sorted(glob.glob(str(EVENTS_DIR / "*" / "*" / "*.json"))) + + new_files = [] + if self.last_processed_file: + try: + idx = event_files.index(self.last_processed_file) + new_files = event_files[idx+1:] + except ValueError: + # If last_processed_file is gone or not in list, process all + new_files = event_files + else: + new_files = event_files + + if not new_files: + return + + logger.info(f"Processing {len(new_files)} new events") + for file_path in new_files: + try: + with open(file_path, "r") as f: + event = json.load(f) + self.process_event(event) + self.last_processed_file = file_path + except Exception as e: + logger.error(f"Error processing {file_path}: {e}") + + self._save_checkpoint() + self._save_world() + + def loop(self, interval=5): + logger.info("Starting observer loop") + while True: + self.run_once() + time.sleep(interval) + +if __name__ == "__main__": + import sys + observer = Observer() + if "--run-once" in sys.argv: + observer.run_once() + else: + observer.loop() diff --git a/scripts/observer/test_setup.sh b/scripts/observer/test_setup.sh new file mode 100644 index 0000000..61945f1 --- /dev/null +++ b/scripts/observer/test_setup.sh @@ -0,0 +1,83 @@ +#!/usr/bin/env bash +mkdir -p /tmp/homelab/events/2026-05-12/saturn +mkdir -p /tmp/homelab/state +mkdir -p /tmp/homelab/logs +mkdir -p /tmp/homelab/world + +cat < /tmp/homelab/events/2026-05-12/saturn/120000_node_online_1.json +{ + "timestamp": "2026-05-12T12:00:00Z", + "node": "saturn", + "type": "node_online", + "severity": "info", + "source": "system", + "service": "all", + "correlation_id": "init", + "payload": {} +} +EOF + +cat < /tmp/homelab/events/2026-05-12/saturn/120500_service_unhealthy_1.json +{ + "timestamp": "2026-05-12T12:05:00Z", + "node": "saturn", + "type": "service_unhealthy", + "severity": "error", + "source": "healthcheck", + "service": "mosquitto", + "correlation_id": "hc-1", + "payload": {"error": "connection refused"} +} +EOF + +cat < /tmp/homelab/events/2026-05-12/saturn/120600_service_unhealthy_2.json +{ + "timestamp": "2026-05-12T12:06:00Z", + "node": "saturn", + "type": "service_unhealthy", + "severity": "error", + "source": "healthcheck", + "service": "mosquitto", + "correlation_id": "hc-2", + "payload": {"error": "connection refused"} +} +EOF + +cat < /tmp/homelab/events/2026-05-12/saturn/121000_service_recovered_1.json +{ + "timestamp": "2026-05-12T12:10:00Z", + "node": "saturn", + "type": "service_recovered", + "severity": "info", + "source": "healthcheck", + "service": "mosquitto", + "correlation_id": "hc-3", + "payload": {} +} +EOF + +cat < /tmp/homelab/events/2026-05-12/saturn/121500_deployment_started_1.json +{ + "timestamp": "2026-05-12T12:15:00Z", + "node": "saturn", + "type": "deployment_started", + "severity": "info", + "source": "deploy_agent", + "service": "mosquitto", + "correlation_id": "deploy-1", + "payload": {"version": "2.0.18"} +} +EOF + +cat < /tmp/homelab/events/2026-05-12/saturn/121600_deployment_failed_1.json +{ + "timestamp": "2026-05-12T12:16:00Z", + "node": "saturn", + "type": "deployment_failed", + "severity": "error", + "source": "deploy_agent", + "service": "mosquitto", + "correlation_id": "deploy-1", + "payload": {"error": "container crash", "diagnostics_file": "/opt/homelab/logs/diagnostics-deploy-1.log"} +} +EOF