Fix service health tracking: emit service_healthy, control-plane endpoint check, cleanup checkpoint migration

- node_agent: emit service_healthy for all running managed containers so
  observer populates services.json (previously empty → supervisor flooded
  action queue with missing_service redeploys for healthy services)
- node_agent: VPS-only _check_control_plane_health() probes the HTTP
  endpoint to emit service_healthy/unhealthy for the 'control-plane' logical
  service (multi-container stack, container names don't match service name)
- node_agent: fix _cleanup_control_plane_fs() to read new node_checkpoints
  format from observer checkpoint (was reading old last_processed_file key,
  always found nothing, never cleaned up old events)
- observer: handle service_healthy event type → sets service status healthy
  without resolving incidents (unlike service_recovered which also resolves)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Oskar Kapala 2026-05-27 14:49:56 +02:00
parent f4a8db93e4
commit 4e8968f9c7
2 changed files with 85 additions and 8 deletions

View file

@ -264,6 +264,12 @@ class Observer:
if etype == "service_recovered":
self.world_state["services"][svc_key]["status"] = "healthy"
self._resolve_incident(svc_key, timestamp)
elif etype == "service_healthy":
# Positive confirmation from node-agent that a managed container
# is running. This keeps services.json populated so the supervisor
# can correctly detect drift (absent entry = never reported = unknown,
# not the same as confirmed missing). No incident resolution needed.
self.world_state["services"][svc_key]["status"] = "healthy"
elif etype in ["service_unhealthy", "healthcheck_failed"]:
self.world_state["services"][svc_key]["status"] = "unhealthy"
self._handle_incident(svc_key, event)

View file

@ -288,10 +288,13 @@ class NodeAgent:
.get("Health", {})
.get("Status", ""))
# Only track containers with a restart policy (long-running services)
is_managed = restart_policy in ("unless-stopped", "always", "on-failure")
if not is_managed:
continue
# Exited container that carries an auto-restart policy
if status in ("exited", "dead") and restart_policy in (
"unless-stopped", "always", "on-failure"
):
if status in ("exited", "dead"):
logger.warning(f"Container exited: {name} (restart={restart_policy})")
self.emit_event(
"containers_not_running", "high", name,
@ -308,6 +311,19 @@ class NodeAgent:
f"Container '{name}' is running but its health check is failing",
{"container": name, "health_status": health_status},
)
# Running container that is healthy — confirm to observer so that
# services.json stays populated for the supervisor's drift detection.
# Without this, the supervisor sees services.json as empty and treats
# all desired services as "missing", flooding the action queue.
elif status == "running":
self.emit_event(
"service_healthy", "info", name,
f"Container '{name}' is running",
{"container": name, "status": status,
"health_status": health_status or "none"},
)
except Exception as exc:
logger.error(f"Error checking container {c.name}: {exc}")
@ -431,20 +447,36 @@ class NodeAgent:
# 3. Event files older than 3 days AND already past observer checkpoint.
# The dual condition guarantees we never delete an unprocessed event.
# Checkpoint format: {"node_checkpoints": {"piha": "/path/last", ...}}
checkpoint_file = STATE_DIR / "observer_checkpoint.json"
last_processed = ""
node_checkpoints: dict = {}
if checkpoint_file.exists():
try:
cp = json.loads(checkpoint_file.read_text())
last_processed = cp.get("last_processed_file", "")
if "node_checkpoints" in cp:
node_checkpoints = cp["node_checkpoints"]
elif "last_processed_file" in cp:
# Migrate old single-file format
old = cp.get("last_processed_file", "")
if old:
try:
node_dir = Path(old).relative_to(EVENTS_DIR).parts[0]
node_checkpoints = {node_dir: old}
except Exception:
pass
except Exception as exc:
logger.error(f"Failed to read observer checkpoint: {exc}")
if last_processed:
if node_checkpoints:
for f in EVENTS_DIR.glob("**/*.json"):
try:
if (now - f.stat().st_mtime > three_days
and str(f) < last_processed):
# Determine which node directory this event belongs to
rel = Path(f).relative_to(EVENTS_DIR)
node_dir = str(rel.parts[0]) if rel.parts else "__unknown__"
last_for_node = node_checkpoints.get(node_dir, "")
if (last_for_node
and now - f.stat().st_mtime > three_days
and str(f) <= last_for_node):
f.unlink(missing_ok=True)
logger.info(f"Cleaned old event: {f.name}")
except Exception as exc:
@ -494,6 +526,44 @@ class NodeAgent:
except Exception as exc:
logger.warning(f"Event shipping error: {exc}")
# ------------------------------------------------------------------
# VPS-specific: control-plane service health check
# ------------------------------------------------------------------
def _check_control_plane_health(self):
"""
VPS-only: probe the control-plane HTTP endpoint and emit a service
health event so the observer can populate services.json for the
'control-plane' entry in services.yaml.
The control-plane is a multi-container stack (observer, supervisor,
executor, ui), so individual container names don't match the service
name in services.yaml. Checking the HTTP endpoint gives a clean
boundary that maps 1-to-1 with the logical service.
"""
import urllib.request
endpoint = "http://localhost:18180/summary"
try:
resp = urllib.request.urlopen(endpoint, timeout=5)
if resp.status == 200:
self.emit_event(
"service_healthy", "info", "control-plane",
"Control-plane HTTP endpoint is reachable",
{"endpoint": endpoint},
)
else:
self.emit_event(
"service_unhealthy", "high", "control-plane",
f"Control-plane HTTP endpoint returned HTTP {resp.status}",
{"endpoint": endpoint, "http_status": resp.status},
)
except Exception as exc:
self.emit_event(
"service_unhealthy", "high", "control-plane",
f"Control-plane HTTP endpoint unreachable: {exc}",
{"endpoint": endpoint, "error": str(exc)},
)
# ------------------------------------------------------------------
# Heartbeat
# ------------------------------------------------------------------
@ -520,6 +590,7 @@ class NodeAgent:
if self.node_name == VPS_NODE_NAME:
self._cleanup_control_plane_fs()
self._check_control_plane_health()
# Emit a node_health heartbeat so the observer can update node status
# and the supervisor can correlate disk/memory metrics with service issues.