import os import json import time import logging import yaml from pathlib import Path # Constants and Paths RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab") WORLD_DIR = Path(RUNTIME_PATH) / "world" ACTIONS_DIR = Path(RUNTIME_PATH) / "actions" REPO_ROOT = Path(os.getenv("REPO_ROOT", "/repo")) # Logging setup logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger("supervisor") class Supervisor: def __init__(self): self.desired_state = {"services": {}} self.actual_state = {"services": {}, "nodes": {}, "incidents": {}} self._ensure_dirs() def _ensure_dirs(self): ACTIONS_DIR.mkdir(parents=True, exist_ok=True) (ACTIONS_DIR / "pending").mkdir(parents=True, exist_ok=True) def _load_desired_state(self): services = {} hosts_dir = REPO_ROOT / "hosts" if not hosts_dir.exists(): logger.warning(f"Hosts directory {hosts_dir} does not exist") return for host_dir in hosts_dir.iterdir(): if host_dir.is_dir(): svc_file = host_dir / "services.yaml" if svc_file.exists(): try: with open(svc_file, "r") as f: data = yaml.safe_load(f) host_name = data.get("host") for svc_name, svc_info in data.get("services", {}).items(): svc_key = f"{host_name}/{svc_name}" services[svc_key] = { "node": host_name, "service": svc_name, "desired": "running" } except Exception as e: logger.error(f"Failed to load {svc_file}: {e}") self.desired_state["services"] = services def _load_actual_state(self): files = { "services": WORLD_DIR / "services.json", "nodes": WORLD_DIR / "nodes.json", "incidents": WORLD_DIR / "incidents.json" } for key, path in files.items(): if path.exists(): try: with open(path, "r") as f: self.actual_state[key] = json.load(f) except Exception as e: logger.error(f"Failed to load {key} actual state: {e}") def reconcile(self): # Update heartbeat heartbeat_file = WORLD_DIR.parent / "state" / "supervisor.heartbeat" try: heartbeat_file.touch() except Exception as e: logger.error(f"Failed to touch heartbeat file: {e}") self._load_desired_state() self._load_actual_state() drifts = [] # 1. Check for missing or unhealthy services for svc_key, desired_info in self.desired_state["services"].items(): actual_info = self.actual_state["services"].get(svc_key) if not actual_info: drifts.append({ "type": "missing_service", "svc_key": svc_key, "node": desired_info["node"], "service": desired_info["service"] }) elif actual_info.get("status") != "healthy": drifts.append({ "type": "unhealthy_service", "svc_key": svc_key, "node": desired_info["node"], "service": desired_info["service"], "status": actual_info.get("status") }) # 2. Generate recommendations for drift in drifts: self._generate_recommendation(drift) def _generate_recommendation(self, drift): action_id = f"reconcile-{int(time.time())}-{drift['node']}-{drift['service']}" action_path = ACTIONS_DIR / "pending" / f"{action_id}.json" if action_path.exists(): return # Already recommended action = { "action_id": action_id, "timestamp": time.time(), "type": "redeploy", "node": drift["node"], "service": drift["service"], "risk_level": "guarded", "confidence": 0.9, "description": f"Redeploy {drift['service']} on {drift['node']} due to {drift['type']}", "status": "pending", "payload": { "reason": drift["type"], "svc_key": drift["svc_key"] } } try: with open(action_path, "w") as f: json.dump(action, f, indent=2) logger.info(f"Generated recommendation: {action_id}") except Exception as e: logger.error(f"Failed to save recommendation {action_id}: {e}") def loop(self, interval=30): logger.info("Starting supervisor loop") while True: self.reconcile() time.sleep(interval) if __name__ == "__main__": supervisor = Supervisor() supervisor.loop()