From 4e8968f9c7aa0d6657884845fde2066e81d0b4a7 Mon Sep 17 00:00:00 2001 From: Oskar Kapala Date: Wed, 27 May 2026 14:49:56 +0200 Subject: [PATCH] Fix service health tracking: emit service_healthy, control-plane endpoint check, cleanup checkpoint migration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - node_agent: emit service_healthy for all running managed containers so observer populates services.json (previously empty → supervisor flooded action queue with missing_service redeploys for healthy services) - node_agent: VPS-only _check_control_plane_health() probes the HTTP endpoint to emit service_healthy/unhealthy for the 'control-plane' logical service (multi-container stack, container names don't match service name) - node_agent: fix _cleanup_control_plane_fs() to read new node_checkpoints format from observer checkpoint (was reading old last_processed_file key, always found nothing, never cleaned up old events) - observer: handle service_healthy event type → sets service status healthy without resolving incidents (unlike service_recovered which also resolves) Co-Authored-By: Claude Sonnet 4.6 --- scripts/observer/observer.py | 6 ++ services/node-agent/src/node_agent.py | 87 ++++++++++++++++++++++++--- 2 files changed, 85 insertions(+), 8 deletions(-) diff --git a/scripts/observer/observer.py b/scripts/observer/observer.py index e9e8a61..4165abb 100644 --- a/scripts/observer/observer.py +++ b/scripts/observer/observer.py @@ -264,6 +264,12 @@ class Observer: if etype == "service_recovered": self.world_state["services"][svc_key]["status"] = "healthy" self._resolve_incident(svc_key, timestamp) + elif etype == "service_healthy": + # Positive confirmation from node-agent that a managed container + # is running. This keeps services.json populated so the supervisor + # can correctly detect drift (absent entry = never reported = unknown, + # not the same as confirmed missing). No incident resolution needed. + self.world_state["services"][svc_key]["status"] = "healthy" elif etype in ["service_unhealthy", "healthcheck_failed"]: self.world_state["services"][svc_key]["status"] = "unhealthy" self._handle_incident(svc_key, event) diff --git a/services/node-agent/src/node_agent.py b/services/node-agent/src/node_agent.py index dbb9b61..b14d8fc 100644 --- a/services/node-agent/src/node_agent.py +++ b/services/node-agent/src/node_agent.py @@ -288,10 +288,13 @@ class NodeAgent: .get("Health", {}) .get("Status", "")) + # Only track containers with a restart policy (long-running services) + is_managed = restart_policy in ("unless-stopped", "always", "on-failure") + if not is_managed: + continue + # Exited container that carries an auto-restart policy - if status in ("exited", "dead") and restart_policy in ( - "unless-stopped", "always", "on-failure" - ): + if status in ("exited", "dead"): logger.warning(f"Container exited: {name} (restart={restart_policy})") self.emit_event( "containers_not_running", "high", name, @@ -308,6 +311,19 @@ class NodeAgent: f"Container '{name}' is running but its health check is failing", {"container": name, "health_status": health_status}, ) + + # Running container that is healthy — confirm to observer so that + # services.json stays populated for the supervisor's drift detection. + # Without this, the supervisor sees services.json as empty and treats + # all desired services as "missing", flooding the action queue. + elif status == "running": + self.emit_event( + "service_healthy", "info", name, + f"Container '{name}' is running", + {"container": name, "status": status, + "health_status": health_status or "none"}, + ) + except Exception as exc: logger.error(f"Error checking container {c.name}: {exc}") @@ -431,20 +447,36 @@ class NodeAgent: # 3. Event files older than 3 days AND already past observer checkpoint. # The dual condition guarantees we never delete an unprocessed event. + # Checkpoint format: {"node_checkpoints": {"piha": "/path/last", ...}} checkpoint_file = STATE_DIR / "observer_checkpoint.json" - last_processed = "" + node_checkpoints: dict = {} if checkpoint_file.exists(): try: cp = json.loads(checkpoint_file.read_text()) - last_processed = cp.get("last_processed_file", "") + if "node_checkpoints" in cp: + node_checkpoints = cp["node_checkpoints"] + elif "last_processed_file" in cp: + # Migrate old single-file format + old = cp.get("last_processed_file", "") + if old: + try: + node_dir = Path(old).relative_to(EVENTS_DIR).parts[0] + node_checkpoints = {node_dir: old} + except Exception: + pass except Exception as exc: logger.error(f"Failed to read observer checkpoint: {exc}") - if last_processed: + if node_checkpoints: for f in EVENTS_DIR.glob("**/*.json"): try: - if (now - f.stat().st_mtime > three_days - and str(f) < last_processed): + # Determine which node directory this event belongs to + rel = Path(f).relative_to(EVENTS_DIR) + node_dir = str(rel.parts[0]) if rel.parts else "__unknown__" + last_for_node = node_checkpoints.get(node_dir, "") + if (last_for_node + and now - f.stat().st_mtime > three_days + and str(f) <= last_for_node): f.unlink(missing_ok=True) logger.info(f"Cleaned old event: {f.name}") except Exception as exc: @@ -494,6 +526,44 @@ class NodeAgent: except Exception as exc: logger.warning(f"Event shipping error: {exc}") + # ------------------------------------------------------------------ + # VPS-specific: control-plane service health check + # ------------------------------------------------------------------ + + def _check_control_plane_health(self): + """ + VPS-only: probe the control-plane HTTP endpoint and emit a service + health event so the observer can populate services.json for the + 'control-plane' entry in services.yaml. + + The control-plane is a multi-container stack (observer, supervisor, + executor, ui), so individual container names don't match the service + name in services.yaml. Checking the HTTP endpoint gives a clean + boundary that maps 1-to-1 with the logical service. + """ + import urllib.request + endpoint = "http://localhost:18180/summary" + try: + resp = urllib.request.urlopen(endpoint, timeout=5) + if resp.status == 200: + self.emit_event( + "service_healthy", "info", "control-plane", + "Control-plane HTTP endpoint is reachable", + {"endpoint": endpoint}, + ) + else: + self.emit_event( + "service_unhealthy", "high", "control-plane", + f"Control-plane HTTP endpoint returned HTTP {resp.status}", + {"endpoint": endpoint, "http_status": resp.status}, + ) + except Exception as exc: + self.emit_event( + "service_unhealthy", "high", "control-plane", + f"Control-plane HTTP endpoint unreachable: {exc}", + {"endpoint": endpoint, "error": str(exc)}, + ) + # ------------------------------------------------------------------ # Heartbeat # ------------------------------------------------------------------ @@ -520,6 +590,7 @@ class NodeAgent: if self.node_name == VPS_NODE_NAME: self._cleanup_control_plane_fs() + self._check_control_plane_health() # Emit a node_health heartbeat so the observer can update node status # and the supervisor can correlate disk/memory metrics with service issues.