diff --git a/hosts/vps/runtime/control-plane/docker-compose.override.yml b/hosts/vps/runtime/control-plane/docker-compose.override.yml new file mode 100644 index 0000000..7e4364c --- /dev/null +++ b/hosts/vps/runtime/control-plane/docker-compose.override.yml @@ -0,0 +1,20 @@ +# Control-plane production overrides for the VPS deployment. +# +# NODE_ALIAS_MAP translates the node names that appear in raw event files +# (written by node agents / seed scripts) to the canonical names used in +# inventory/topology.yaml and hosts/*/services.yaml. +# +# Current live mapping (from /opt/homelab/events/ inspection): +# node-2 → chelsty (zigbee2mqtt / mosquitto / homeassistant node) +# +# Add further entries when new nodes come online and their event-source names +# differ from their topology names. Format is a single-line JSON object, e.g.: +# NODE_ALIAS_MAP='{"node-2":"chelsty","node-3":"piha"}' +# +# The executor inherits the canonical name from the action JSON written by the +# supervisor, so NODE_ALIAS_MAP is only required on the supervisor service. + +services: + supervisor: + environment: + - NODE_ALIAS_MAP={"node-2":"chelsty"} diff --git a/scripts/observer/observer.py b/scripts/observer/observer.py index 0dfdb95..59b09b7 100644 --- a/scripts/observer/observer.py +++ b/scripts/observer/observer.py @@ -233,6 +233,11 @@ class Observer: "service": event.get("service"), "status": "active", "severity": event.get("severity"), + # trigger_type records the event type that opened this incident so that + # the supervisor can choose the appropriate remediation action + # (e.g. container_restart for containers_not_running / mqtt_unreachable + # vs. a full redeploy for other causes). + "trigger_type": event.get("type"), "started_at": event.get("timestamp"), "last_occurrence": event.get("timestamp"), "occurrence_count": 1, diff --git a/services/control-plane/src/executor.py b/services/control-plane/src/executor.py index 967afca..34d7734 100644 --- a/services/control-plane/src/executor.py +++ b/services/control-plane/src/executor.py @@ -10,10 +10,20 @@ RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab") ACTIONS_DIR = Path(RUNTIME_PATH) / "actions" REPO_ROOT = Path(os.getenv("REPO_ROOT", "/repo")) +# SSH configuration +# SSH_USER can be overridden per-deployment environment. +SSH_USER = os.getenv("SSH_USER", "oskar") +SSH_OPTIONS = [ + "-o", "StrictHostKeyChecking=no", + "-o", "ConnectTimeout=10", + "-o", "BatchMode=yes", +] + # Logging setup logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger("executor") + class Executor: def __init__(self): self._ensure_dirs() @@ -32,14 +42,14 @@ class Executor: approved_dir = ACTIONS_DIR / "approved" action_files = sorted(approved_dir.glob("*.json")) - + for action_file in action_files: self._execute_action(action_file) def _execute_action(self, action_file): action_id = action_file.stem logger.info(f"Executing action: {action_id}") - + # Move to running running_path = ACTIONS_DIR / "running" / f"{action_id}.json" try: @@ -54,16 +64,16 @@ class Executor: logger.error(f"Failed to move {action_id} to running: {e}") return - # Execute + # Dispatch by action type success = False error_msg = "" try: action_type = data.get("type") node = data.get("node") service = data.get("service") - + if action_type == "redeploy": - # Call deploy-node.sh + # Full service redeploy via the repo deploy script cmd = [ str(REPO_ROOT / "scripts" / "deploy" / "deploy-node.sh"), node, @@ -76,9 +86,17 @@ class Executor: else: success = False error_msg = result.stderr or result.stdout + + elif action_type == "container_restart": + # Lightweight restart: SSH to node and docker restart the container. + # container_name is set by the supervisor; falls back to service name. + container_name = data.get("container_name") or service + success, error_msg = self._execute_container_restart(node, container_name) + else: success = False error_msg = f"Unknown action type: {action_type}" + except Exception as e: success = False error_msg = str(e) @@ -98,12 +116,60 @@ class Executor: except Exception as e: logger.error(f"Failed to move {action_id} to {target_status}: {e}") + def _execute_container_restart(self, node, container_name, retry_delay=10): + """ + SSH to the target node and run `docker restart `. + + Attempts the restart up to 2 times (initial + 1 retry). If the first + attempt fails, waits retry_delay seconds then tries once more before + declaring the action failed. + + Returns (success: bool, error_msg: str). + """ + cmd = [ + "ssh", + *SSH_OPTIONS, + f"{SSH_USER}@{node}", + f"docker restart {container_name}", + ] + logger.info(f"SSH container restart: {' '.join(cmd)}") + + max_attempts = 2 + last_error = "" + + for attempt in range(1, max_attempts + 1): + result = subprocess.run(cmd, capture_output=True, text=True) + + if result.returncode == 0: + logger.info( + f"Container '{container_name}' on {node} restarted successfully " + f"(attempt {attempt}/{max_attempts})" + ) + return True, "" + + last_error = (result.stderr or result.stdout).strip() + logger.warning( + f"container_restart attempt {attempt}/{max_attempts} failed " + f"for '{container_name}' on {node}: {last_error}" + ) + + if attempt < max_attempts: + logger.info(f"Retrying in {retry_delay}s...") + time.sleep(retry_delay) + + logger.error( + f"container_restart exhausted all {max_attempts} attempts " + f"for '{container_name}' on {node}" + ) + return False, last_error + def loop(self, interval=10): logger.info("Starting executor loop") while True: self.process_actions() time.sleep(interval) + if __name__ == "__main__": executor = Executor() executor.loop() diff --git a/services/control-plane/src/supervisor.py b/services/control-plane/src/supervisor.py index 3ec151a..f7e1469 100644 --- a/services/control-plane/src/supervisor.py +++ b/services/control-plane/src/supervisor.py @@ -11,10 +11,26 @@ WORLD_DIR = Path(RUNTIME_PATH) / "world" ACTIONS_DIR = Path(RUNTIME_PATH) / "actions" REPO_ROOT = Path(os.getenv("REPO_ROOT", "/repo")) +# Node alias map: maps alternative node names (as they appear in events/world state) +# to canonical topology node names (as they appear in hosts/*/services.yaml and topology.yaml). +# Override at runtime via NODE_ALIAS_MAP env var as a JSON string, e.g.: +# NODE_ALIAS_MAP='{"node-2": "chelsty", "node-1": "piha"}' +_NODE_ALIAS_ENV = os.getenv("NODE_ALIAS_MAP", "{}") +try: + NODE_ALIAS_MAP = json.loads(_NODE_ALIAS_ENV) +except Exception: + NODE_ALIAS_MAP = {} + +# Event trigger types that should result in a lightweight container_restart +# rather than a full redeploy. The container is present but not running, +# or a dependency (MQTT) is unreachable — a restart is the right first step. +CONTAINER_RESTART_TRIGGERS = {"containers_not_running", "mqtt_unreachable"} + # Logging setup logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger("supervisor") + class Supervisor: def __init__(self): self.desired_state = {"services": {}} @@ -25,13 +41,49 @@ class Supervisor: ACTIONS_DIR.mkdir(parents=True, exist_ok=True) (ACTIONS_DIR / "pending").mkdir(parents=True, exist_ok=True) + # ------------------------------------------------------------------ + # Node name resolution + # ------------------------------------------------------------------ + + def _resolve_node(self, name): + """Resolve an event/world-state node name to its canonical topology name.""" + return NODE_ALIAS_MAP.get(name, name) + + # ------------------------------------------------------------------ + # Container name lookup + # ------------------------------------------------------------------ + + def _get_container_name(self, service): + """ + Determine the Docker container name for a service. + Parses container_name from the service's docker-compose.yml. + Falls back to the service name if not found. + """ + compose_path = REPO_ROOT / "services" / service / "docker-compose.yml" + if compose_path.exists(): + try: + with open(compose_path, "r") as f: + compose = yaml.safe_load(f) + for svc_block in compose.get("services", {}).values(): + cname = svc_block.get("container_name") + if cname: + return cname + except Exception as e: + logger.warning(f"Could not parse docker-compose for {service}: {e}") + # Convention: container name matches service name + return service + + # ------------------------------------------------------------------ + # State loading + # ------------------------------------------------------------------ + def _load_desired_state(self): services = {} hosts_dir = REPO_ROOT / "hosts" if not hosts_dir.exists(): logger.warning(f"Hosts directory {hosts_dir} does not exist") return - + for host_dir in hosts_dir.iterdir(): if host_dir.is_dir(): svc_file = host_dir / "services.yaml" @@ -57,13 +109,67 @@ class Supervisor: "nodes": WORLD_DIR / "nodes.json", "incidents": WORLD_DIR / "incidents.json" } + raw = {} for key, path in files.items(): if path.exists(): try: with open(path, "r") as f: - self.actual_state[key] = json.load(f) + raw[key] = json.load(f) except Exception as e: logger.error(f"Failed to load {key} actual state: {e}") + raw[key] = {} + else: + raw[key] = {} + + # Normalize node names in services using alias map so that + # event-sourced names (e.g. "node-2") resolve to canonical + # topology names (e.g. "chelsty") before comparison with desired state. + normalized_services = {} + for svc_key, svc_info in raw.get("services", {}).items(): + svc_info = dict(svc_info) + raw_node = svc_info.get("node", "") + canonical_node = self._resolve_node(raw_node) + if canonical_node != raw_node: + logger.debug(f"Resolved node alias: {raw_node} → {canonical_node}") + svc_info["node"] = canonical_node + svc_name = svc_info.get("service") or svc_key.split("/", 1)[-1] + svc_key = f"{canonical_node}/{svc_name}" + normalized_services[svc_key] = svc_info + + # Normalize node names in incidents as well + normalized_incidents = {} + for inc_id, inc in raw.get("incidents", {}).items(): + inc = dict(inc) + raw_node = inc.get("node", "") + inc["node"] = self._resolve_node(raw_node) + normalized_incidents[inc_id] = inc + + self.actual_state["services"] = normalized_services + self.actual_state["nodes"] = raw.get("nodes", {}) + self.actual_state["incidents"] = normalized_incidents + + # ------------------------------------------------------------------ + # Incident helpers + # ------------------------------------------------------------------ + + def _get_incident_trigger(self, svc_key): + """ + Return the trigger_type of the active incident for a service, or None. + trigger_type is set by the observer when it creates an incident from + a specific event type (e.g. 'containers_not_running', 'mqtt_unreachable'). + """ + svc_info = self.actual_state["services"].get(svc_key, {}) + incident_id = svc_info.get("incident_id") + if not incident_id: + return None + incident = self.actual_state["incidents"].get(incident_id, {}) + if incident.get("status") == "active": + return incident.get("trigger_type") + return None + + # ------------------------------------------------------------------ + # Reconciliation loop + # ------------------------------------------------------------------ def reconcile(self): # Update heartbeat @@ -77,59 +183,111 @@ class Supervisor: self._load_actual_state() drifts = [] - + # 1. Check for missing or unhealthy services for svc_key, desired_info in self.desired_state["services"].items(): actual_info = self.actual_state["services"].get(svc_key) - + if not actual_info: drifts.append({ "type": "missing_service", "svc_key": svc_key, "node": desired_info["node"], - "service": desired_info["service"] + "service": desired_info["service"], + "trigger_type": None, }) elif actual_info.get("status") != "healthy": + trigger_type = self._get_incident_trigger(svc_key) drifts.append({ "type": "unhealthy_service", "svc_key": svc_key, "node": desired_info["node"], "service": desired_info["service"], - "status": actual_info.get("status") + "status": actual_info.get("status"), + "trigger_type": trigger_type, }) # 2. Generate recommendations for drift in drifts: self._generate_recommendation(drift) + # ------------------------------------------------------------------ + # Recommendation generation + # ------------------------------------------------------------------ + def _generate_recommendation(self, drift): - action_id = f"reconcile-{drift['node']}-{drift['service']}" - # Check all active states so we don't recreate after approval/execution + node = drift["node"] + service = drift["service"] + trigger_type = drift.get("trigger_type") + + # Choose action type first so we can build the stable, deterministic ID. + # Stable IDs mean reconcile is truly idempotent: the same drift always + # produces the same filename, so we never create duplicates even across + # restarts of the supervisor. + if trigger_type in CONTAINER_RESTART_TRIGGERS: + action_id = f"container-restart-{node}-{service}" + else: + action_id = f"redeploy-{node}-{service}" + + # Skip if an action for this ID is already live in any active state + # (pending → approved → running). This prevents re-creation after + # a human approves an action that hasn't executed yet. for state in ("pending", "approved", "running"): if (ACTIONS_DIR / state / f"{action_id}.json").exists(): + logger.debug(f"Skipping {action_id}: already in state '{state}'") return - action_path = ACTIONS_DIR / "pending" / f"{action_id}.json" - action = { - "action_id": action_id, - "timestamp": time.time(), - "type": "redeploy", - "node": drift["node"], - "service": drift["service"], - "risk_level": "guarded", - "confidence": 0.9, - "description": f"Redeploy {drift['service']} on {drift['node']} due to {drift['type']}", - "status": "pending", - "payload": { - "reason": drift["type"], - "svc_key": drift["svc_key"] + if trigger_type in CONTAINER_RESTART_TRIGGERS: + # Lightweight remediation: the container exists but is not running + # (containers_not_running) or its MQTT dependency is unreachable + # (mqtt_unreachable). A docker restart is sufficient and low-risk. + container_name = self._get_container_name(service) + action = { + "action_id": action_id, + "timestamp": time.time(), + "type": "container_restart", + "node": node, + "service": service, + "container_name": container_name, + "risk_level": "low", + "confidence": 0.95, + "description": ( + f"Restart container '{container_name}' on {node} " + f"(service: {service}, reason: {trigger_type})" + ), + "status": "pending", + "payload": { + "reason": trigger_type, + "svc_key": drift["svc_key"], + }, } - } - + else: + # Full redeploy: container is running but service is broken, + # or the cause is unknown / not a simple restart candidate. + action = { + "action_id": action_id, + "timestamp": time.time(), + "type": "redeploy", + "node": node, + "service": service, + "risk_level": "guarded", + "confidence": 0.9, + "description": f"Redeploy {service} on {node} due to {drift['type']}", + "status": "pending", + "payload": { + "reason": drift["type"], + "svc_key": drift["svc_key"], + }, + } + + action_path = ACTIONS_DIR / "pending" / f"{action_id}.json" try: with open(action_path, "w") as f: json.dump(action, f, indent=2) - logger.info(f"Generated recommendation: {action_id}") + logger.info( + f"Generated recommendation: {action_id} " + f"(type={action['type']}, risk={action['risk_level']})" + ) except Exception as e: logger.error(f"Failed to save recommendation {action_id}: {e}") @@ -139,6 +297,7 @@ class Supervisor: self.reconcile() time.sleep(interval) + if __name__ == "__main__": supervisor = Supervisor() supervisor.loop()