From fb7828b52b10d6b057197d8779d2f96fef541a24 Mon Sep 17 00:00:00 2001 From: Oskar Kapala Date: Wed, 27 May 2026 14:58:55 +0200 Subject: [PATCH] supervisor: auto-cancel pending actions when drift is resolved MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a service becomes healthy (node-agent emits service_healthy → observer updates services.json), any previously queued redeploy/container_restart action is stale. Without cleanup, the queue accumulates old actions that require manual rejection. _cancel_resolved_pending_actions() runs after each reconcile cycle: - Reads all pending/*.json with type=redeploy or container_restart - If the service is now healthy in actual_state, moves action to cancelled/ with reason=drift_resolved_auto - Only pending actions are touched; approved/running are left to the operator Co-Authored-By: Claude Sonnet 4.6 --- services/control-plane/src/supervisor.py | 65 ++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/services/control-plane/src/supervisor.py b/services/control-plane/src/supervisor.py index e39daeb..a303743 100644 --- a/services/control-plane/src/supervisor.py +++ b/services/control-plane/src/supervisor.py @@ -224,6 +224,14 @@ class Supervisor: if node_info.get("disk_pressure") == "high": self._generate_disk_cleanup_recommendation(node_name) + # 4. Cancel pending actions whose drift has been resolved. + # When a service becomes healthy again (because node-agent emits + # service_healthy and the observer updates services.json), any + # previously queued redeploy/container_restart action for that + # service is no longer needed. Move it to "cancelled/" so the + # operator can see it was auto-resolved rather than silently dropped. + self._cancel_resolved_pending_actions() + # ------------------------------------------------------------------ # Recommendation generation # ------------------------------------------------------------------ @@ -357,6 +365,63 @@ class Supervisor: except Exception as e: logger.error(f"Failed to save disk cleanup recommendation {action_id}: {e}") + def _cancel_resolved_pending_actions(self): + """ + Auto-cancel pending service actions (redeploy / container_restart) whose + target service is now healthy in the actual state. + + This keeps the action queue clean: when node-agent starts reporting + service_healthy for a container that previously had no world-state entry, + the pending 'missing_service' redeploy action that was generated before + the first health confirmation should be removed automatically rather than + sitting in the queue until an operator manually rejects it. + + Only pending actions are considered — approved/running actions have already + been committed to by the operator and must not be cancelled automatically. + """ + cancelled_dir = ACTIONS_DIR / "cancelled" + cancelled_dir.mkdir(parents=True, exist_ok=True) + pending_dir = ACTIONS_DIR / "pending" + if not pending_dir.exists(): + return + + for action_file in list(pending_dir.glob("*.json")): + try: + with open(action_file, "r") as f: + action = json.load(f) + except Exception as e: + logger.error(f"Failed to read action {action_file.name}: {e}") + continue + + action_type = action.get("type") + node = action.get("node") + service = action.get("service") + + # Only auto-cancel service-level actions (not disk_cleanup) + if action_type not in ("redeploy", "container_restart"): + continue + if not node or not service: + continue + + svc_key = f"{node}/{service}" + actual_info = self.actual_state["services"].get(svc_key) + if actual_info and actual_info.get("status") == "healthy": + # Drift resolved — move to cancelled/ + dest = cancelled_dir / action_file.name + try: + action["status"] = "cancelled" + action["cancelled_reason"] = "drift_resolved_auto" + action["cancelled_at"] = time.time() + with open(dest, "w") as f: + json.dump(action, f, indent=2) + action_file.unlink() + logger.info( + f"Auto-cancelled {action_file.name}: " + f"{svc_key} is now healthy" + ) + except Exception as e: + logger.error(f"Failed to cancel action {action_file.name}: {e}") + def loop(self, interval=30): logger.info("Starting supervisor loop") while True: