#!/usr/bin/env python3 import os import sys import yaml import json import time import glob import uuid from pathlib import Path # Configuration WORLD_STATE_PATH = Path(os.getenv("HOMELAB_WORLD_ROOT", "/opt/homelab/world")) ACTIONS_ROOT = Path(os.getenv("HOMELAB_ACTIONS_ROOT", "/opt/homelab/actions")) INVENTORY_PATH = Path("hosts") EVENT_LOG = Path("/tmp/agent-events.log") CHECKPOINT_FILE = Path("/tmp/supervisor-checkpoint.json") # Action Queue Layout ACTION_DIRS = ["pending", "approved", "running", "completed", "failed", "rejected"] # Reconcile event types RECONCILE_REQUIRED = "reconcile_required" RECONCILE_RECOMMENDED = "reconcile_recommended" RECONCILE_BLOCKED = "reconcile_blocked" # Runtime summary states STATE_NOMINAL = "nominal" STATE_DEGRADED = "degraded" STATE_UNSTABLE = "unstable" STATE_RECONCILING = "reconciling" def ensure_action_dirs(): """Ensure action queue directories exist.""" for d in ACTION_DIRS: (ACTIONS_ROOT / d).mkdir(parents=True, exist_ok=True) def emit_action_proposal(recommendation): """Convert recommendation to action proposal and save to pending/.""" ensure_action_dirs() action_type_map = { "redeploy": "redeploy_service", "deploy": "redeploy_service", "diagnostics": "collect_diagnostics", "failover_review": "collect_diagnostics", "review": "collect_diagnostics", "delayed_deployment": "rerun_deployment_stage" } action_type = action_type_map.get(recommendation["action"], "collect_diagnostics") risk_level_map = { "redeploy_service": "guarded", "rerun_healthcheck": "safe", "rerun_deployment_stage": "guarded", "collect_diagnostics": "safe" } risk_level = risk_level_map.get(action_type, "dangerous") # Dangerous always requires approval # Guarded defaults to approval approval_required = risk_level in ["dangerous", "guarded"] action_id = str(uuid.uuid4()) action = { "action_id": action_id, "created_at": time.time(), "proposed_by": "supervisor", "correlation_id": str(uuid.uuid4()), # In a real system, link to drift ID "node": recommendation["drift"].get("node"), "service": recommendation["drift"].get("service"), "action_type": action_type, "risk_level": risk_level, "confidence": 0.9, # Default confidence "approval_required": approval_required, "autonomous_eligible": False, # No autonomy yet "status": "pending", "payload": recommendation["drift"], "rollback_reference": None } file_path = ACTIONS_ROOT / "pending" / f"{action_id}.json" try: with open(file_path, "w") as f: json.dump(action, f, indent=2) emit_event("action_created", f"Action proposed: {action_type} for {action.get('service') or action.get('node')}", { "action_id": action_id, "action_type": action_type, "node": action.get("node"), "service": action.get("service") }) except Exception as e: print(f"Error emitting action proposal: {e}", file=sys.stderr) def emit_event(event_type, message, details=None): """Emit reconciliation events using existing event system (append-only file).""" event = { "type": event_type, "message": message, "timestamp": time.time(), "details": details or {} } line = json.dumps(event) print(line) try: # Append-only semantics with open(EVENT_LOG, "a", encoding="utf-8") as f: f.write(line + "\n") f.flush() except Exception as e: print(f"Error writing to event log: {e}", file=sys.stderr) def load_desired_state(): """Load desired state from hosts/*/services.yaml.""" desired = {"services": [], "nodes": []} if not INVENTORY_PATH.exists(): return desired # Inventory model: hosts/{node_name}/services.yaml for yaml_file in glob.glob(str(INVENTORY_PATH / "*" / "services.yaml")): try: with open(yaml_file, "r") as f: data = yaml.safe_load(f) if data and "services" in data: node_name = Path(yaml_file).parent.name for svc in data["services"]: if isinstance(svc, str): svc = {"name": svc} svc["node"] = node_name desired["services"].append(svc) if node_name not in desired["nodes"]: desired["nodes"].append(node_name) except Exception as e: print(f"Error loading {yaml_file}: {e}", file=sys.stderr) return desired def load_world_state(): """Load current world state from /opt/homelab/world/ (filesystem-first).""" world = { "services": {}, "nodes": {}, "deployments": {}, "incidents": {} } if not WORLD_STATE_PATH.exists(): return world # Filesystem-first design: each category is a directory, each item is a JSON file for category in ["services", "nodes", "deployments", "incidents"]: cat_path = WORLD_STATE_PATH / category if cat_path.exists() and cat_path.is_dir(): for json_file in cat_path.glob("*.json"): try: with open(json_file, "r") as f: world[category][json_file.stem] = json.load(f) except Exception as e: print(f"Error loading {json_file}: {e}", file=sys.stderr) return world def detect_drift(desired, world): """Compare desired infrastructure state with observed runtime state.""" drifts = [] # 1. Missing service & 2. Unhealthy service desired_service_names = set() for d_svc in desired["services"]: name = d_svc["name"] desired_service_names.add(name) a_svc = world["services"].get(name) if not a_svc: drifts.append({ "type": "missing_service", "service": name, "node": d_svc["node"] }) elif a_svc.get("status") not in ("ok", "healthy", "up"): drifts.append({ "type": "unhealthy_service", "service": name, "status": a_svc.get("status"), "node": a_svc.get("node") or d_svc["node"] }) # 4. Offline node for node_name in desired["nodes"]: a_node = world["nodes"].get(node_name) if not a_node or a_node.get("status") not in ("online", "ok", "up"): drifts.append({ "type": "offline_node", "node": node_name, "status": a_node.get("status") if a_node else "missing" }) # 3. Failed deployment # Check for recent failures in world/deployments for d_id, d_data in world["deployments"].items(): if d_data.get("status") == "failed": drifts.append({ "type": "failed_deployment", "deployment_id": d_id, "service": d_data.get("service") }) # 5. Unresolved incidents for i_id, i_data in world["incidents"].items(): if i_data.get("status") not in ("resolved", "closed"): drifts.append({ "type": "unresolved_incident", "incident_id": i_id, "description": i_data.get("description"), "status": i_data.get("status") }) return drifts def recommendation_engine(drifts, world): """Recommendation mode only: emit proposed actions without mutation.""" recommendations = [] for drift in drifts: if drift["type"] == "unhealthy_service": recommendations.append({ "drift": drift, "action": "redeploy", "message": f"Service {drift['service']} is unhealthy. Recommend redeploy.", "type": RECONCILE_REQUIRED }) elif drift["type"] == "failed_deployment": service = drift["service"] # Recommendation: repeated deployment failures -> recommend diagnostics failures = [d for d in world["deployments"].values() if d.get("service") == service and d.get("status") == "failed"] if len(failures) > 2: recommendations.append({ "drift": drift, "action": "diagnostics", "message": f"Repeated deployment failures for {service}. Recommend diagnostics.", "type": RECONCILE_BLOCKED }) else: recommendations.append({ "drift": drift, "action": "redeploy", "message": f"Deployment failed for {service}. Recommend retry.", "type": RECONCILE_REQUIRED }) elif drift["type"] == "offline_node": # Recommendation: node offline -> recommend failover review recommendations.append({ "drift": drift, "action": "failover_review", "message": f"Node {drift['node']} is offline. Recommend failover review.", "type": RECONCILE_REQUIRED }) elif drift["type"] == "missing_service": # Recommendation: dependency unavailable -> recommend delayed deployment # Mock dependency check: if a service is 'webapp' and 'database' is not healthy dependencies_met = True if drift["service"] == "webapp": db_svc = world["services"].get("database") if not db_svc or db_svc.get("status") not in ("ok", "healthy", "up"): dependencies_met = False if not dependencies_met: recommendations.append({ "drift": drift, "action": "delayed_deployment", "message": f"Dependency unavailable for {drift['service']}. Recommend delayed deployment.", "type": RECONCILE_RECOMMENDED }) else: recommendations.append({ "drift": drift, "action": "deploy", "message": f"Service {drift['service']} is missing. Recommend deployment.", "type": RECONCILE_REQUIRED }) elif drift["type"] == "unresolved_incident": recommendations.append({ "drift": drift, "action": "review", "message": f"Unresolved incident: {drift['description']}. Recommend review.", "type": RECONCILE_RECOMMENDED }) return recommendations def save_checkpoint(state): """Checkpoint support for idempotent operation.""" try: with open(CHECKPOINT_FILE, "w") as f: json.dump({ "last_run": time.time(), "state": state }, f) except Exception as e: print(f"Error saving checkpoint: {e}", file=sys.stderr) def load_checkpoint(): """Load last run checkpoint.""" if CHECKPOINT_FILE.exists(): try: with open(CHECKPOINT_FILE, "r") as f: return json.load(f) except Exception as e: print(f"Error loading checkpoint: {e}", file=sys.stderr) return None def main(): print(f"--- Supervisor Run: {time.ctime()} ---") checkpoint = load_checkpoint() if checkpoint: print(f"Last run: {time.ctime(checkpoint.get('last_run', 0))}") # 1. Load desired state desired = load_desired_state() # 2. Load world state world = load_world_state() # 3. Detect drift drifts = detect_drift(desired, world) # 4. Generate recommendations (Recommendation mode only) recommendations = recommendation_engine(drifts, world) # 5. Emit events & Update summary state if not recommendations and not drifts: emit_event("summary_state", f"System state: {STATE_NOMINAL}", {"state": STATE_NOMINAL}) else: # Extend runtime summary states: nominal, degraded, unstable, reconciling has_blocked = any(r["type"] == RECONCILE_BLOCKED for r in recommendations) has_required = any(r["type"] == RECONCILE_REQUIRED for r in recommendations) overall_state = STATE_NOMINAL if has_blocked: overall_state = STATE_UNSTABLE elif has_required: overall_state = STATE_DEGRADED elif recommendations: overall_state = STATE_DEGRADED emit_event("summary_state", f"System state: {overall_state}", {"state": overall_state}) # Emit reconciliation events for rec in recommendations: emit_event(rec["type"], rec["message"], rec["drift"]) # Proposed: Emit action proposals to action queue emit_action_proposal(rec) # 6. Save checkpoint save_checkpoint({ "drift_count": len(drifts), "recommendation_count": len(recommendations), "timestamp": time.time() }) print("Run complete.") if __name__ == "__main__": main()