import os import json import time import glob import logging import yaml from datetime import datetime, timezone from pathlib import Path # Constants and Paths RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab") EVENTS_DIR = Path(RUNTIME_PATH) / "events" STATE_DIR = Path(RUNTIME_PATH) / "state" LOGS_DIR = Path(RUNTIME_PATH) / "logs" WORLD_DIR = Path(RUNTIME_PATH) / "world" OBSERVER_STATE_FILE = STATE_DIR / "observer_checkpoint.json" REPO_ROOT = Path(__file__).parent.parent.parent INVENTORY_TOPOLOGY = REPO_ROOT / "inventory" / "topology.yaml" # Logging setup logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger("observer") class Observer: def __init__(self): self.last_processed_file = None self.world_state = { "nodes": {}, "services": {}, "deployments": {}, "incidents": {}, "summary": { "last_update": datetime.now(timezone.utc).isoformat(), "status": "initializing", "active_incidents_count": 0 } } self.inventory = self._load_inventory() self._ensure_dirs() self._load_checkpoint() def _ensure_dirs(self): WORLD_DIR.mkdir(parents=True, exist_ok=True) STATE_DIR.mkdir(parents=True, exist_ok=True) EVENTS_DIR.mkdir(parents=True, exist_ok=True) LOGS_DIR.mkdir(parents=True, exist_ok=True) def _load_inventory(self): inventory = {"nodes": {}, "services": {}} try: if INVENTORY_TOPOLOGY.exists(): with open(INVENTORY_TOPOLOGY, "r") as f: topo = yaml.safe_load(f) for node_name, node_info in topo.get("nodes", {}).items(): inventory["nodes"][node_name] = { "roles": node_info.get("roles", []), "connectivity": node_info.get("connectivity", {}) } # Load service assignments from hosts files hosts_dir = REPO_ROOT / "hosts" for host_dir in hosts_dir.iterdir(): if host_dir.is_dir(): svc_file = host_dir / "services.yaml" if svc_file.exists(): with open(svc_file, "r") as f: svc_data = yaml.safe_load(f) host_name = svc_data.get("host") for svc_name, svc_info in svc_data.get("services", {}).items(): if host_name not in inventory["services"]: inventory["services"][host_name] = {} inventory["services"][host_name][svc_name] = { "role": svc_info.get("role"), "exposure": svc_info.get("exposure") } except Exception as e: logger.error(f"Failed to load inventory: {e}") return inventory def _load_checkpoint(self): if OBSERVER_STATE_FILE.exists(): try: with open(OBSERVER_STATE_FILE, "r") as f: checkpoint = json.load(f) self.last_processed_file = checkpoint.get("last_processed_file") # We might want to persist partial world state, # but for now we rebuild from events (idempotent) # or we can load existing world state files. self._load_world_from_disk() except Exception as e: logger.error(f"Failed to load checkpoint: {e}") def _load_world_from_disk(self): # Optional: Load existing state to resume faster files = { "nodes": WORLD_DIR / "nodes.json", "services": WORLD_DIR / "services.json", "deployments": WORLD_DIR / "deployments.json", "incidents": WORLD_DIR / "incidents.json", "summary": WORLD_DIR / "runtime-summary.json" } for key, path in files.items(): if path.exists(): try: with open(path, "r") as f: self.world_state[key] = json.load(f) except Exception as e: logger.error(f"Failed to load {key} state: {e}") def _save_checkpoint(self): try: with open(OBSERVER_STATE_FILE, "w") as f: json.dump({"last_processed_file": self.last_processed_file}, f) except Exception as e: logger.error(f"Failed to save checkpoint: {e}") def _save_world(self): self.world_state["summary"]["last_update"] = datetime.now(timezone.utc).isoformat() active_incidents = [ k for k, v in self.world_state["incidents"].items() if v.get("status") == "active" ] self.world_state["summary"]["active_incidents_count"] = len(active_incidents) if active_incidents: self.world_state["summary"]["status"] = "degraded" else: self.world_state["summary"]["status"] = "nominal" files = { "nodes.json": self.world_state["nodes"], "services.json": self.world_state["services"], "deployments.json": self.world_state["deployments"], "incidents.json": self.world_state["incidents"], "recommendations.json": [], # Placeholder to satisfy requirements "runtime-summary.json": self.world_state["summary"] } for filename, data in files.items(): try: with open(WORLD_DIR / filename, "w") as f: json.dump(data, f, indent=2) except Exception as e: logger.error(f"Failed to save {filename}: {e}") def process_event(self, event): etype = event.get("type") node = event.get("node") service = event.get("service") severity = event.get("severity") timestamp = event.get("timestamp") cid = event.get("correlation_id") payload = event.get("payload", {}) # 1. Update Node State if node not in self.world_state["nodes"]: self.world_state["nodes"][node] = { "status": "unknown", "last_seen": None, "roles": self.inventory["nodes"].get(node, {}).get("roles", []) } self.world_state["nodes"][node]["last_seen"] = timestamp if etype == "node_online": self.world_state["nodes"][node]["status"] = "online" elif etype == "node_offline": self.world_state["nodes"][node]["status"] = "offline" # 2. Update Service State if service and service != "all": svc_key = f"{node}/{service}" if svc_key not in self.world_state["services"]: self.world_state["services"][svc_key] = { "node": node, "service": service, "status": "unknown", "last_check": None, "incident_id": None } self.world_state["services"][svc_key]["last_check"] = timestamp if etype == "service_recovered": self.world_state["services"][svc_key]["status"] = "healthy" self._resolve_incident(svc_key, timestamp) elif etype in ["service_unhealthy", "healthcheck_failed"]: self.world_state["services"][svc_key]["status"] = "unhealthy" self._handle_incident(svc_key, event) # 3. Update Deployment State if etype.startswith("deployment_") and cid: if cid not in self.world_state["deployments"]: self.world_state["deployments"][cid] = { "node": node, "service": service, "status": "unknown", "started_at": None, "finished_at": None, "events": [] } self.world_state["deployments"][cid]["events"].append({ "type": etype, "timestamp": timestamp, "payload": payload }) if etype == "deployment_started": self.world_state["deployments"][cid]["status"] = "in_progress" self.world_state["deployments"][cid]["started_at"] = timestamp elif etype == "deployment_completed": self.world_state["deployments"][cid]["status"] = "completed" self.world_state["deployments"][cid]["finished_at"] = timestamp elif etype == "deployment_failed": self.world_state["deployments"][cid]["status"] = "failed" self.world_state["deployments"][cid]["finished_at"] = timestamp # Deployment failure often creates an incident self._handle_deployment_failure(event) def _handle_incident(self, svc_key, event): # Correlation: collapse repeated failures for the same service on the same node active_incident = self.world_state["services"][svc_key].get("incident_id") if active_incident and active_incident in self.world_state["incidents"]: incident = self.world_state["incidents"][active_incident] if incident["status"] == "active": incident["last_occurrence"] = event["timestamp"] incident["occurrence_count"] = incident.get("occurrence_count", 1) + 1 incident["events"].append(event["timestamp"]) return # Create new incident incident_id = f"inc-{int(time.time())}-{event.get('node')}-{event.get('service')}" self.world_state["incidents"][incident_id] = { "id": incident_id, "node": event.get("node"), "service": event.get("service"), "status": "active", "severity": event.get("severity"), # trigger_type records the event type that opened this incident so that # the supervisor can choose the appropriate remediation action # (e.g. container_restart for containers_not_running / mqtt_unreachable # vs. a full redeploy for other causes). "trigger_type": event.get("type"), "started_at": event.get("timestamp"), "last_occurrence": event.get("timestamp"), "occurrence_count": 1, "events": [event["timestamp"]], "correlation_id": event.get("correlation_id") } self.world_state["services"][svc_key]["incident_id"] = incident_id def _resolve_incident(self, svc_key, timestamp): incident_id = self.world_state["services"][svc_key].get("incident_id") if incident_id and incident_id in self.world_state["incidents"]: if self.world_state["incidents"][incident_id]["status"] == "active": self.world_state["incidents"][incident_id]["status"] = "resolved" self.world_state["incidents"][incident_id]["resolved_at"] = timestamp self.world_state["services"][svc_key]["incident_id"] = None def _handle_deployment_failure(self, event): # Specific logic for deployment failures svc_key = f"{event.get('node')}/{event.get('service')}" self._handle_incident(svc_key, event) # Link diagnostics if available in payload incident_id = self.world_state["services"][svc_key].get("incident_id") if incident_id and incident_id in self.world_state["incidents"]: payload = event.get("payload", {}) if "diagnostics_file" in payload: self.world_state["incidents"][incident_id]["diagnostics_ref"] = payload["diagnostics_file"] elif "error" in payload: self.world_state["incidents"][incident_id]["last_error"] = payload["error"] def run_once(self): # Update heartbeat heartbeat_file = STATE_DIR / "observer.heartbeat" try: heartbeat_file.touch() except Exception as e: logger.error(f"Failed to touch heartbeat file: {e}") # Find all event files event_files = sorted(glob.glob(str(EVENTS_DIR / "**" / "*.json"), recursive=True)) new_files = [] if self.last_processed_file: try: idx = event_files.index(self.last_processed_file) new_files = event_files[idx+1:] except ValueError: # If last_processed_file is gone or not in list, process all new_files = event_files else: new_files = event_files if not new_files: # Even if no new events, we update freshness of summary self._save_world() return logger.info(f"Processing {len(new_files)} new events") for file_path in new_files: try: with open(file_path, "r") as f: event = json.load(f) self.process_event(event) self.last_processed_file = file_path except Exception as e: logger.error(f"Error processing {file_path}: {e}") self._save_checkpoint() self._save_world() def loop(self, interval=5): logger.info("Starting observer loop") while True: self.run_once() time.sleep(interval) if __name__ == "__main__": import sys observer = Observer() if "--run-once" in sys.argv: observer.run_once() else: observer.loop()