import os import json import time import logging import yaml from pathlib import Path # Constants and Paths RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab") WORLD_DIR = Path(RUNTIME_PATH) / "world" ACTIONS_DIR = Path(RUNTIME_PATH) / "actions" REPO_ROOT = Path(os.getenv("REPO_ROOT", "/repo")) # Node alias map: maps alternative node names (as they appear in events/world state) # to canonical topology node names (as they appear in hosts/*/services.yaml and topology.yaml). # Override at runtime via NODE_ALIAS_MAP env var as a JSON string, e.g.: # NODE_ALIAS_MAP='{"node-2": "chelsty", "node-1": "piha"}' _NODE_ALIAS_ENV = os.getenv("NODE_ALIAS_MAP", "{}") try: NODE_ALIAS_MAP = json.loads(_NODE_ALIAS_ENV) except Exception: NODE_ALIAS_MAP = {} # Event trigger types that should result in a lightweight container_restart # rather than a full redeploy. The container is present but not running, # or a dependency (MQTT) is unreachable — a restart is the right first step. CONTAINER_RESTART_TRIGGERS = {"containers_not_running", "mqtt_unreachable"} # Nodes where automatic disk_cleanup actions must NOT be generated. # On chelsty nodes disk fullness is overwhelmingly caused by Frigate recordings # or the HA database — Docker cleanup will not help and the operator must # decide explicitly (e.g. adjust Frigate retain policy or purge HA recorder). NO_DISK_CLEANUP_NODES = {"chelsty-infra", "chelsty-ha"} # Logging setup logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger("supervisor") class Supervisor: def __init__(self): self.desired_state = {"services": {}} self.actual_state = {"services": {}, "nodes": {}, "incidents": {}} self._ensure_dirs() def _ensure_dirs(self): ACTIONS_DIR.mkdir(parents=True, exist_ok=True) (ACTIONS_DIR / "pending").mkdir(parents=True, exist_ok=True) # ------------------------------------------------------------------ # Node name resolution # ------------------------------------------------------------------ def _resolve_node(self, name): """Resolve an event/world-state node name to its canonical topology name.""" return NODE_ALIAS_MAP.get(name, name) # ------------------------------------------------------------------ # Container name lookup # ------------------------------------------------------------------ def _get_container_name(self, service): """ Determine the Docker container name for a service. Parses container_name from the service's docker-compose.yml. Falls back to the service name if not found. """ compose_path = REPO_ROOT / "services" / service / "docker-compose.yml" if compose_path.exists(): try: with open(compose_path, "r") as f: compose = yaml.safe_load(f) for svc_block in compose.get("services", {}).values(): cname = svc_block.get("container_name") if cname: return cname except Exception as e: logger.warning(f"Could not parse docker-compose for {service}: {e}") # Convention: container name matches service name return service # ------------------------------------------------------------------ # State loading # ------------------------------------------------------------------ def _load_desired_state(self): services = {} hosts_dir = REPO_ROOT / "hosts" if not hosts_dir.exists(): logger.warning(f"Hosts directory {hosts_dir} does not exist") return for host_dir in hosts_dir.iterdir(): if host_dir.is_dir(): svc_file = host_dir / "services.yaml" if svc_file.exists(): try: with open(svc_file, "r") as f: data = yaml.safe_load(f) host_name = data.get("host") for svc_name, svc_info in data.get("services", {}).items(): svc_info = svc_info or {} # monitor: false — service is documented as desired but # intentionally excluded from supervisor action generation. # Use this when a service is not yet bootstrapped on an # offline/LTE node so the queue stays clean until it is. if svc_info.get("monitor") is False: logger.debug( f"Skipping {host_name}/{svc_name}: monitor=false" ) continue svc_key = f"{host_name}/{svc_name}" services[svc_key] = { "node": host_name, "service": svc_name, "desired": "running" } except Exception as e: logger.error(f"Failed to load {svc_file}: {e}") self.desired_state["services"] = services def _load_actual_state(self): files = { "services": WORLD_DIR / "services.json", "nodes": WORLD_DIR / "nodes.json", "incidents": WORLD_DIR / "incidents.json" } raw = {} for key, path in files.items(): if path.exists(): try: with open(path, "r") as f: raw[key] = json.load(f) except Exception as e: logger.error(f"Failed to load {key} actual state: {e}") raw[key] = {} else: raw[key] = {} # Normalize node names in services using alias map so that # event-sourced names (e.g. "node-2") resolve to canonical # topology names (e.g. "chelsty") before comparison with desired state. normalized_services = {} for svc_key, svc_info in raw.get("services", {}).items(): svc_info = dict(svc_info) raw_node = svc_info.get("node", "") canonical_node = self._resolve_node(raw_node) if canonical_node != raw_node: logger.debug(f"Resolved node alias: {raw_node} → {canonical_node}") svc_info["node"] = canonical_node svc_name = svc_info.get("service") or svc_key.split("/", 1)[-1] svc_key = f"{canonical_node}/{svc_name}" normalized_services[svc_key] = svc_info # Normalize node names in incidents as well normalized_incidents = {} for inc_id, inc in raw.get("incidents", {}).items(): inc = dict(inc) raw_node = inc.get("node", "") inc["node"] = self._resolve_node(raw_node) normalized_incidents[inc_id] = inc self.actual_state["services"] = normalized_services self.actual_state["nodes"] = raw.get("nodes", {}) self.actual_state["incidents"] = normalized_incidents # ------------------------------------------------------------------ # Incident helpers # ------------------------------------------------------------------ def _get_incident_trigger(self, svc_key): """ Return the trigger_type of the active incident for a service, or None. trigger_type is set by the observer when it creates an incident from a specific event type (e.g. 'containers_not_running', 'mqtt_unreachable'). """ svc_info = self.actual_state["services"].get(svc_key, {}) incident_id = svc_info.get("incident_id") if not incident_id: return None incident = self.actual_state["incidents"].get(incident_id, {}) if incident.get("status") == "active": return incident.get("trigger_type") return None # ------------------------------------------------------------------ # Reconciliation loop # ------------------------------------------------------------------ def reconcile(self): # Update heartbeat heartbeat_file = WORLD_DIR.parent / "state" / "supervisor.heartbeat" try: heartbeat_file.touch() except Exception as e: logger.error(f"Failed to touch heartbeat file: {e}") self._load_desired_state() self._load_actual_state() drifts = [] # 1. Check for missing or unhealthy services for svc_key, desired_info in self.desired_state["services"].items(): actual_info = self.actual_state["services"].get(svc_key) if not actual_info: drifts.append({ "type": "missing_service", "svc_key": svc_key, "node": desired_info["node"], "service": desired_info["service"], "trigger_type": None, }) elif actual_info.get("status") != "healthy": trigger_type = self._get_incident_trigger(svc_key) drifts.append({ "type": "unhealthy_service", "svc_key": svc_key, "node": desired_info["node"], "service": desired_info["service"], "status": actual_info.get("status"), "trigger_type": trigger_type, }) # 2. Generate service-level recommendations for drift in drifts: self._generate_recommendation(drift) # 3. Generate node-level recommendations (disk pressure) for node_name, node_info in self.actual_state["nodes"].items(): if node_name in NO_DISK_CLEANUP_NODES: continue if node_info.get("disk_pressure") == "high": self._generate_disk_cleanup_recommendation(node_name) # 4. Cancel pending actions whose drift has been resolved. # When a service becomes healthy again (because node-agent emits # service_healthy and the observer updates services.json), any # previously queued redeploy/container_restart action for that # service is no longer needed. Move it to "cancelled/" so the # operator can see it was auto-resolved rather than silently dropped. self._cancel_resolved_pending_actions() # ------------------------------------------------------------------ # Recommendation generation # ------------------------------------------------------------------ def _generate_recommendation(self, drift): node = drift["node"] service = drift["service"] trigger_type = drift.get("trigger_type") # Choose action type first so we can build the stable, deterministic ID. # Stable IDs mean reconcile is truly idempotent: the same drift always # produces the same filename, so we never create duplicates even across # restarts of the supervisor. if trigger_type in CONTAINER_RESTART_TRIGGERS: action_id = f"container-restart-{node}-{service}" else: action_id = f"redeploy-{node}-{service}" # Skip if an action for this ID is already live in any active state # (pending → approved → running). This prevents re-creation after # a human approves an action that hasn't executed yet. for state in ("pending", "approved", "running"): if (ACTIONS_DIR / state / f"{action_id}.json").exists(): logger.debug(f"Skipping {action_id}: already in state '{state}'") return if trigger_type in CONTAINER_RESTART_TRIGGERS: # Lightweight remediation: the container exists but is not running # (containers_not_running) or its MQTT dependency is unreachable # (mqtt_unreachable). A docker restart is sufficient and low-risk. container_name = self._get_container_name(service) action = { "action_id": action_id, "timestamp": time.time(), "type": "container_restart", "node": node, "service": service, "container_name": container_name, "risk_level": "low", "confidence": 0.95, "description": ( f"Restart container '{container_name}' on {node} " f"(service: {service}, reason: {trigger_type})" ), "status": "pending", "payload": { "reason": trigger_type, "svc_key": drift["svc_key"], }, } else: # Full redeploy: container is running but service is broken, # or the cause is unknown / not a simple restart candidate. action = { "action_id": action_id, "timestamp": time.time(), "type": "redeploy", "node": node, "service": service, "risk_level": "guarded", "confidence": 0.9, "description": f"Redeploy {service} on {node} due to {drift['type']}", "status": "pending", "payload": { "reason": drift["type"], "svc_key": drift["svc_key"], }, } action_path = ACTIONS_DIR / "pending" / f"{action_id}.json" try: with open(action_path, "w") as f: json.dump(action, f, indent=2) logger.info( f"Generated recommendation: {action_id} " f"(type={action['type']}, risk={action['risk_level']})" ) except Exception as e: logger.error(f"Failed to save recommendation {action_id}: {e}") def _generate_disk_cleanup_recommendation(self, node: str): """ Generate a disk_cleanup action when node-agent reports critical disk pressure (>85 %) on a node that supports automated Docker cleanup. This is an OPERATOR-APPROVED action (risk=guarded): it runs `docker image prune -a -f` and `docker volume prune -f`, which are more aggressive than the safe auto-cleanup the node-agent runs itself. Nodes in NO_DISK_CLEANUP_NODES never reach this method (filtered in reconcile) because their disk fullness is caused by application data (Frigate, HA) that the operator must handle manually. """ action_id = f"disk-cleanup-{node}" for state in ("pending", "approved", "running"): if (ACTIONS_DIR / state / f"{action_id}.json").exists(): logger.debug(f"Skipping {action_id}: already in state '{state}'") return action = { "action_id": action_id, "timestamp": time.time(), "type": "disk_cleanup", "node": node, "service": "", "risk_level": "guarded", "confidence": 0.85, "description": ( f"Aggressive disk cleanup on {node}: docker image prune -a " f"and docker volume prune (requires operator approval)" ), "status": "pending", "payload": { "reason": "disk_pressure", "commands": [ "docker image prune -a -f", "docker volume prune -f", ], }, } action_path = ACTIONS_DIR / "pending" / f"{action_id}.json" try: with open(action_path, "w") as f: json.dump(action, f, indent=2) logger.info( f"Generated disk cleanup recommendation: {action_id} " f"(node={node}, risk=guarded)" ) except Exception as e: logger.error(f"Failed to save disk cleanup recommendation {action_id}: {e}") def _cancel_resolved_pending_actions(self): """ Auto-cancel pending service actions (redeploy / container_restart) whose target service is now healthy in the actual state. This keeps the action queue clean: when node-agent starts reporting service_healthy for a container that previously had no world-state entry, the pending 'missing_service' redeploy action that was generated before the first health confirmation should be removed automatically rather than sitting in the queue until an operator manually rejects it. Only pending actions are considered — approved/running actions have already been committed to by the operator and must not be cancelled automatically. """ cancelled_dir = ACTIONS_DIR / "cancelled" cancelled_dir.mkdir(parents=True, exist_ok=True) pending_dir = ACTIONS_DIR / "pending" if not pending_dir.exists(): return for action_file in list(pending_dir.glob("*.json")): try: with open(action_file, "r") as f: action = json.load(f) except Exception as e: logger.error(f"Failed to read action {action_file.name}: {e}") continue action_type = action.get("type") node = action.get("node") service = action.get("service") # Only auto-cancel service-level actions (not disk_cleanup) if action_type not in ("redeploy", "container_restart"): continue if not node or not service: continue svc_key = f"{node}/{service}" actual_info = self.actual_state["services"].get(svc_key) if actual_info and actual_info.get("status") == "healthy": # Drift resolved — move to cancelled/ dest = cancelled_dir / action_file.name try: action["status"] = "cancelled" action["cancelled_reason"] = "drift_resolved_auto" action["cancelled_at"] = time.time() with open(dest, "w") as f: json.dump(action, f, indent=2) action_file.unlink() logger.info( f"Auto-cancelled {action_file.name}: " f"{svc_key} is now healthy" ) except Exception as e: logger.error(f"Failed to cancel action {action_file.name}: {e}") def loop(self, interval=30): logger.info("Starting supervisor loop") while True: self.reconcile() time.sleep(interval) if __name__ == "__main__": supervisor = Supervisor() supervisor.loop()