homelab-codex-ws/services/node-agent/src/node_agent.py
Oskar Kapala b40b832159 Fix ghost service keys from hash-prefixed Docker container names
node-agent: use com.docker.compose.service label as canonical name
- Add _canonical_container_name() method: prefers compose label,
  falls back to hash-prefix-stripped c.name
- Replace bare c.name usage in check_containers()
- Skip 'created'-state containers (Docker stale-state artifacts)

observer: prune hash-prefixed ghost keys in _prune_stale_world()
- Each reconcile cycle removes service keys matching <node>/<12hex>_<name>
- Acts as safety net for entries already in services.json + future slippage

control-plane/docker-compose.yml already has explicit container_name on
all four services — no change needed there.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-27 15:41:13 +02:00

659 lines
27 KiB
Python

"""
node_agent.py — Homelab node health monitor daemon.
Runs as a Docker container on every managed node. Each cycle it:
1. Collects system metrics (disk, memory, CPU).
2. Checks Docker container health.
3. Emits structured event JSON files to /opt/homelab/events/<node-name>/.
4. Applies safe Docker / filesystem cleanup per the conservative policy.
5. Optionally rsyncs events to VPS so the control-plane observer can process them.
Cleanup policy (matches health-monitor.sh):
lte_node (chelsty-infra, chelsty-ha) : NO cleanup, NO image operations
sd_card (piha, saturn) : dangling images + stopped containers,
max once per 24 h
ai_node (solaria) : dangling + containers + build cache,
NEVER docker image prune -a
standard (vps) : dangling + containers + build cache +
control-plane filesystem rotation
NEVER TOUCHED on any node:
/opt/homelab/data/ Frigate recordings, Ollama models, HA db, MQTT state
/opt/homelab/config/ All hand-crafted and repo-seeded configuration
/opt/homelab/state/ Heartbeat files, observer checkpoint
actions/pending|approved|running Live work queue
"""
import json
import logging
import os
import re
import shutil
import socket
import subprocess
import time
from datetime import datetime, timezone
from pathlib import Path
try:
import docker as docker_sdk
DOCKER_AVAILABLE = True
except ImportError:
DOCKER_AVAILABLE = False
docker_sdk = None
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("node-agent")
# ---------------------------------------------------------------------------
# Runtime paths
# ---------------------------------------------------------------------------
RUNTIME_PATH = Path(os.getenv("RUNTIME_PATH", "/opt/homelab"))
REPO_ROOT = Path(os.getenv("REPO_ROOT", "/repo"))
EVENTS_DIR = RUNTIME_PATH / "events"
STATE_DIR = RUNTIME_PATH / "state"
LOGS_DIR = RUNTIME_PATH / "logs"
ACTIONS_DIR = RUNTIME_PATH / "actions"
# ---------------------------------------------------------------------------
# Node identity
# ---------------------------------------------------------------------------
NODE_NAME = os.getenv("NODE_NAME") or socket.gethostname()
NODE_TYPE = os.getenv("NODE_TYPE", "") # override auto-detection if set
VPS_NODE_NAME = "vps"
LTE_NODES = {"chelsty-infra", "chelsty-ha"}
SD_CARD_NODES = {"piha", "saturn"}
AI_NODES = {"solaria"}
# ---------------------------------------------------------------------------
# Event shipping (optional — requires VPS_EVENTS_HOST + SSH key in container)
# ---------------------------------------------------------------------------
VPS_EVENTS_HOST = os.getenv("VPS_EVENTS_HOST", "")
VPS_EVENTS_USER = os.getenv("VPS_EVENTS_USER", "oskar")
VPS_EVENTS_PATH = os.getenv("VPS_EVENTS_PATH", "/opt/homelab/events")
# ---------------------------------------------------------------------------
# Thresholds
# ---------------------------------------------------------------------------
DISK_WARN_PCT = 75
DISK_CRIT_PCT = 85
MEM_WARN_PCT = 85
MEM_CRIT_PCT = 95
# SD-card nodes: enforce 24-hour gap between Docker cleanup runs
CLEANUP_INTERVAL_SECS = 86_400
LAST_CLEANUP_FILE = STATE_DIR / "last-docker-cleanup"
# How long to wait between full health-check cycles
HEALTH_CHECK_INTERVAL = int(os.getenv("CHECK_INTERVAL", "60"))
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _resolve_node_type() -> str:
if NODE_TYPE:
return NODE_TYPE
if NODE_NAME in LTE_NODES:
return "lte_node"
if NODE_NAME in SD_CARD_NODES:
return "sd_card"
if NODE_NAME in AI_NODES:
return "ai_node"
return "standard"
def _utc_iso() -> str:
return datetime.now(timezone.utc).isoformat()
# ---------------------------------------------------------------------------
# NodeAgent
# ---------------------------------------------------------------------------
class NodeAgent:
def __init__(self):
self.node_name = NODE_NAME
self.node_type = _resolve_node_type()
self._ensure_dirs()
self.docker_client = None
if DOCKER_AVAILABLE:
try:
self.docker_client = docker_sdk.from_env()
logger.info("Docker SDK connected")
except Exception as exc:
logger.warning(f"Docker unavailable: {exc}")
logger.info(f"node-agent starting: node={self.node_name} type={self.node_type}")
# ------------------------------------------------------------------
# Directories
# ------------------------------------------------------------------
def _ensure_dirs(self):
for d in [EVENTS_DIR, STATE_DIR, LOGS_DIR,
EVENTS_DIR / self.node_name]:
d.mkdir(parents=True, exist_ok=True)
def _node_events_dir(self) -> Path:
return EVENTS_DIR / self.node_name
# ------------------------------------------------------------------
# Event emission
# ------------------------------------------------------------------
def emit_event(self, event_type: str, severity: str, service,
message: str, payload: dict = None):
ts = int(time.time())
# Include service slug in the ID so that multiple events of the same type
# emitted within the same second (e.g. service_healthy for N containers)
# don't overwrite each other — each gets a unique filename.
svc_slug = re.sub(r"[^a-z0-9]", "-", (service or "node").lower())[:32].strip("-")
event_id = f"evt-{self.node_name}-{ts}-{event_type}-{svc_slug}"
event = {
"id": event_id,
"timestamp": ts,
"date": _utc_iso(),
"type": event_type,
"severity": severity,
"node": self.node_name,
"service": service or "",
"message": message,
"payload": payload or {},
}
path = self._node_events_dir() / f"{event_id}.json"
try:
path.write_text(json.dumps(event, indent=2))
except Exception as exc:
logger.error(f"Failed to write event {event_id}: {exc}")
# ------------------------------------------------------------------
# System metrics
# ------------------------------------------------------------------
def check_disk(self) -> int:
"""
Check disk usage of the filesystem that hosts RUNTIME_PATH.
Using shutil.disk_usage on the mounted runtime path works both
natively on the host and inside a container that mounts /opt/homelab
from the host — the reported usage reflects the host partition.
"""
try:
usage = shutil.disk_usage(str(RUNTIME_PATH))
pct = int(usage.used * 100 / usage.total)
avail_mb = int(usage.free / (1024 * 1024))
total_mb = int(usage.total / (1024 * 1024))
payload = {
"usage_pct": pct, "avail_mb": avail_mb,
"total_mb": total_mb, "mount": str(RUNTIME_PATH),
}
if pct >= DISK_CRIT_PCT:
logger.warning(f"Disk critical: {pct}% used")
self.emit_event(
"disk_pressure", "high", None,
f"Disk usage critical: {pct}% on {RUNTIME_PATH} ({avail_mb} MB free)",
payload,
)
elif pct >= DISK_WARN_PCT:
logger.info(f"Disk elevated: {pct}% used")
self.emit_event(
"disk_pressure", "medium", None,
f"Disk usage elevated: {pct}% on {RUNTIME_PATH} ({avail_mb} MB free)",
payload,
)
return pct
except Exception as exc:
logger.error(f"Disk check failed: {exc}")
return 0
def check_memory(self) -> int:
"""Read host memory from /proc/meminfo (visible in container without special mounts)."""
try:
info: dict = {}
with open("/proc/meminfo") as fh:
for line in fh:
k, v = line.split(":")
info[k.strip()] = int(v.strip().split()[0])
total = info.get("MemTotal", 1)
avail = info.get("MemAvailable", total)
pct = int((total - avail) * 100 / total)
avail_mb = avail // 1024
total_mb = total // 1024
payload = {"usage_pct": pct, "avail_mb": avail_mb, "total_mb": total_mb}
if pct >= MEM_CRIT_PCT:
logger.warning(f"Memory critical: {pct}%")
self.emit_event("high_memory", "high", None,
f"Memory usage critical: {pct}% ({avail_mb} MB available)", payload)
elif pct >= MEM_WARN_PCT:
self.emit_event("high_memory", "medium", None,
f"Memory usage elevated: {pct}% ({avail_mb} MB available)", payload)
return pct
except Exception as exc:
logger.error(f"Memory check failed: {exc}")
return 0
def check_cpu(self) -> int:
"""Two-sample /proc/stat delta for accurate instantaneous CPU usage."""
try:
def _read():
with open("/proc/stat") as fh:
for line in fh:
if line.startswith("cpu "):
vals = list(map(int, line.split()[1:]))
return vals[3], sum(vals) # idle, total
return 0, 1
idle1, total1 = _read()
time.sleep(0.5)
idle2, total2 = _read()
d_total = total2 - total1
d_idle = idle2 - idle1
pct = 100 - int(d_idle * 100 / d_total) if d_total else 0
if pct >= 90:
self.emit_event("high_cpu", "medium", None,
f"CPU usage elevated: {pct}%", {"usage_pct": pct})
return pct
except Exception as exc:
logger.error(f"CPU check failed: {exc}")
return 0
# ------------------------------------------------------------------
# Docker container health
# ------------------------------------------------------------------
def _canonical_container_name(self, c) -> str:
"""Return a stable, human-readable service name for a container.
Priority:
1. com.docker.compose.service label — always the clean compose-file key
(e.g. "mosquitto", "zigbee2mqtt"), immune to the hash-prefix corruption
Docker uses for stale project-state tracking entries.
2. c.name with hash prefix stripped — fallback for non-Compose containers.
When a container is removed outside of compose and then recreated, Docker
stores the old container record as "<12-char-hex-id>_<original-name>".
c.name returns that corrupted form; we strip the prefix here.
Using c.name directly is the source of ghost service keys like
"vps/9e36297651e7_control-plane-observer" that accumulate in services.json
every time containers are rebuilt.
"""
labels = c.attrs.get("Config", {}).get("Labels", {}) or {}
compose_svc = labels.get("com.docker.compose.service", "").strip()
if compose_svc:
return compose_svc
# Strip Docker-internal stale-state prefix: "<12-char hex>_<real-name>"
name = c.name
if (len(name) > 13
and name[12] == "_"
and all(ch in "0123456789abcdef" for ch in name[:12])):
return name[13:]
return name
def check_containers(self):
if not self.docker_client:
return
try:
containers = self.docker_client.containers.list(all=True)
except Exception as exc:
logger.error(f"Docker container list failed: {exc}")
return
for c in containers:
try:
name = self._canonical_container_name(c)
status = c.status
host_config = c.attrs.get("HostConfig", {})
restart_policy = host_config.get("RestartPolicy", {}).get("Name", "")
health_status = (c.attrs.get("State", {})
.get("Health", {})
.get("Status", ""))
# Skip containers in "created" state — these are Docker Compose
# internal tracking artifacts (never started, often hash-prefixed)
# that appear when a container is rebuilt outside of compose.
if status == "created":
continue
# Only track containers with a restart policy (long-running services)
is_managed = restart_policy in ("unless-stopped", "always", "on-failure")
if not is_managed:
continue
# Exited container that carries an auto-restart policy
if status in ("exited", "dead"):
logger.warning(f"Container exited: {name} (restart={restart_policy})")
self.emit_event(
"containers_not_running", "high", name,
f"Container '{name}' has exited (restart={restart_policy})",
{"container": name, "status": status,
"restart_policy": restart_policy},
)
# Running container with a failing health check
elif status == "running" and health_status == "unhealthy":
logger.warning(f"Container unhealthy: {name}")
self.emit_event(
"healthcheck_failed", "high", name,
f"Container '{name}' is running but its health check is failing",
{"container": name, "health_status": health_status},
)
# Running container that is healthy — confirm to observer so that
# services.json stays populated for the supervisor's drift detection.
# Without this, the supervisor sees services.json as empty and treats
# all desired services as "missing", flooding the action queue.
elif status == "running":
self.emit_event(
"service_healthy", "info", name,
f"Container '{name}' is running",
{"container": name, "status": status,
"health_status": health_status or "none"},
)
except Exception as exc:
logger.error(f"Error checking container {c.name}: {exc}")
# ------------------------------------------------------------------
# Safe Docker cleanup
# ------------------------------------------------------------------
def _sd_card_rate_ok(self) -> bool:
"""Return True only if 24 hours have elapsed since last cleanup."""
if LAST_CLEANUP_FILE.exists():
try:
last_ts = int(LAST_CLEANUP_FILE.read_text().strip())
elapsed = time.time() - last_ts
if elapsed < CLEANUP_INTERVAL_SECS:
logger.debug(
f"Docker cleanup rate-limited ({elapsed:.0f}s < {CLEANUP_INTERVAL_SECS}s)"
)
return False
except Exception:
pass
return True
def _mark_cleanup_done(self):
try:
LAST_CLEANUP_FILE.write_text(str(int(time.time())))
except Exception as exc:
logger.error(f"Failed to update cleanup timestamp: {exc}")
def _prune_dangling_images(self):
if not self.docker_client:
return
try:
result = self.docker_client.images.prune(filters={"dangling": True})
reclaimed = result.get("SpaceReclaimed", 0) // (1024 * 1024)
logger.info(f"Pruned dangling images ({reclaimed} MB reclaimed)")
except Exception as exc:
logger.error(f"Image prune failed: {exc}")
def _prune_stopped_containers(self):
if not self.docker_client:
return
try:
result = self.docker_client.containers.prune()
reclaimed = result.get("SpaceReclaimed", 0) // (1024 * 1024)
logger.info(f"Pruned stopped containers ({reclaimed} MB reclaimed)")
except Exception as exc:
logger.error(f"Container prune failed: {exc}")
def _prune_build_cache(self):
if not self.docker_client:
return
try:
result = self.docker_client.api.prune_builds()
reclaimed = result.get("SpaceReclaimed", 0) // (1024 * 1024)
logger.info(f"Pruned build cache ({reclaimed} MB reclaimed)")
except Exception as exc:
# prune_builds() was added in docker-py 5.0; log and continue
logger.warning(f"Build cache prune unavailable or failed: {exc}")
def run_safe_cleanup(self):
"""Apply the per-node-type Docker cleanup policy."""
if self.node_type == "lte_node":
# No cleanup on LTE nodes — any Docker op risks a pull over a
# metered/intermittent connection.
logger.debug("Skipping Docker cleanup: LTE node")
return
if self.node_type == "sd_card":
if not self._sd_card_rate_ok():
return
self._prune_dangling_images()
self._prune_stopped_containers()
# No builder prune: minimise write cycles on SD card
self._mark_cleanup_done()
return
# ai_node and standard: dangling + containers + build cache
# ai_node: NEVER -a (would remove Ollama runtime images)
self._prune_dangling_images()
self._prune_stopped_containers()
self._prune_build_cache()
# ------------------------------------------------------------------
# VPS-specific: control-plane filesystem rotation
# ------------------------------------------------------------------
def _cleanup_control_plane_fs(self):
"""
Rotate control-plane filesystem artefacts on VPS.
Safe targets only — never touches data/, config/, state/, or
live action directories (pending/approved/running).
"""
now = time.time()
seven_days = 7 * 86_400
thirty_days = 30 * 86_400
three_days = 3 * 86_400
# 1. Completed / failed actions older than 7 days
for status in ("completed", "failed"):
sdir = ACTIONS_DIR / status
if not sdir.exists():
continue
for f in sdir.glob("*.json"):
try:
if now - f.stat().st_mtime > seven_days:
f.unlink(missing_ok=True)
logger.info(f"Cleaned old {status} action: {f.name}")
except Exception as exc:
logger.error(f"Failed to remove {f}: {exc}")
# 2. Deploy logs older than 30 days
deploy_dir = LOGS_DIR / "deploy"
if deploy_dir.exists():
for f in deploy_dir.glob("*.log"):
try:
if now - f.stat().st_mtime > thirty_days:
f.unlink(missing_ok=True)
logger.info(f"Cleaned old deploy log: {f.name}")
except Exception as exc:
logger.error(f"Failed to remove {f}: {exc}")
# 3. Event files older than 3 days AND already past observer checkpoint.
# The dual condition guarantees we never delete an unprocessed event.
# Checkpoint format: {"node_checkpoints": {"piha": "/path/last", ...}}
checkpoint_file = STATE_DIR / "observer_checkpoint.json"
node_checkpoints: dict = {}
if checkpoint_file.exists():
try:
cp = json.loads(checkpoint_file.read_text())
if "node_checkpoints" in cp:
node_checkpoints = cp["node_checkpoints"]
elif "last_processed_file" in cp:
# Migrate old single-file format
old = cp.get("last_processed_file", "")
if old:
try:
node_dir = Path(old).relative_to(EVENTS_DIR).parts[0]
node_checkpoints = {node_dir: old}
except Exception:
pass
except Exception as exc:
logger.error(f"Failed to read observer checkpoint: {exc}")
if node_checkpoints:
for f in EVENTS_DIR.glob("**/*.json"):
try:
# Determine which node directory this event belongs to
rel = Path(f).relative_to(EVENTS_DIR)
node_dir = str(rel.parts[0]) if rel.parts else "__unknown__"
last_for_node = node_checkpoints.get(node_dir, "")
if (last_for_node
and now - f.stat().st_mtime > three_days
and str(f) <= last_for_node):
f.unlink(missing_ok=True)
logger.info(f"Cleaned old event: {f.name}")
except Exception as exc:
logger.error(f"Failed to remove {f}: {exc}")
else:
logger.debug("No observer checkpoint present; skipping event cleanup")
# ------------------------------------------------------------------
# Optional: ship events to VPS via rsync
# ------------------------------------------------------------------
def _ship_events_to_vps(self):
"""
Rsync local events to VPS so the observer can process them.
Requires:
- VPS_EVENTS_HOST env var set to the VPS hostname/IP
- SSH key accessible inside the container (mount via docker-compose)
- The node is NOT VPS itself
"""
if not VPS_EVENTS_HOST or self.node_name == VPS_NODE_NAME:
return
local_dir = str(self._node_events_dir()) + "/"
remote_dir = (f"{VPS_EVENTS_USER}@{VPS_EVENTS_HOST}:"
f"{VPS_EVENTS_PATH}/{self.node_name}/")
cmd = [
"rsync", "-az", "--remove-source-files",
# -F /dev/null: skip ~/.ssh/config entirely. The .ssh dir is
# mounted from the host oskar user into the container which runs
# as root; OpenSSH rejects config files owned by a different UID.
# UserKnownHostsFile=/dev/null pairs with StrictHostKeyChecking=no
# so we never try to write a known_hosts inside a read-only mount.
"-e", ("ssh -F /dev/null"
" -o StrictHostKeyChecking=no"
" -o UserKnownHostsFile=/dev/null"
" -o ConnectTimeout=10"
" -o BatchMode=yes"),
local_dir,
remote_dir,
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
logger.debug(f"Events shipped to {remote_dir}")
else:
logger.warning(f"Event shipping failed: {result.stderr.strip()}")
except Exception as exc:
logger.warning(f"Event shipping error: {exc}")
# ------------------------------------------------------------------
# VPS-specific: control-plane service health check
# ------------------------------------------------------------------
def _check_control_plane_health(self):
"""
VPS-only: probe the control-plane HTTP endpoint and emit a service
health event so the observer can populate services.json for the
'control-plane' entry in services.yaml.
The control-plane is a multi-container stack (observer, supervisor,
executor, ui), so individual container names don't match the service
name in services.yaml. Checking the HTTP endpoint gives a clean
boundary that maps 1-to-1 with the logical service.
"""
import urllib.request
endpoint = "http://localhost:18180/summary"
try:
resp = urllib.request.urlopen(endpoint, timeout=5)
if resp.status == 200:
self.emit_event(
"service_healthy", "info", "control-plane",
"Control-plane HTTP endpoint is reachable",
{"endpoint": endpoint},
)
else:
self.emit_event(
"service_unhealthy", "high", "control-plane",
f"Control-plane HTTP endpoint returned HTTP {resp.status}",
{"endpoint": endpoint, "http_status": resp.status},
)
except Exception as exc:
self.emit_event(
"service_unhealthy", "high", "control-plane",
f"Control-plane HTTP endpoint unreachable: {exc}",
{"endpoint": endpoint, "error": str(exc)},
)
# ------------------------------------------------------------------
# Heartbeat
# ------------------------------------------------------------------
def _update_heartbeat(self):
try:
(STATE_DIR / "node-agent.heartbeat").touch()
except Exception as exc:
logger.error(f"Failed to update heartbeat: {exc}")
# ------------------------------------------------------------------
# Main loop
# ------------------------------------------------------------------
def run_once(self):
self._update_heartbeat()
disk_pct = self.check_disk()
mem_pct = self.check_memory()
cpu_pct = self.check_cpu()
self.check_containers()
self.run_safe_cleanup()
if self.node_name == VPS_NODE_NAME:
self._cleanup_control_plane_fs()
self._check_control_plane_health()
# Emit a node_health heartbeat so the observer can update node status
# and the supervisor can correlate disk/memory metrics with service issues.
self.emit_event(
"node_health", "info", None,
f"Health check completed on {self.node_name}",
{"disk_pct": disk_pct, "mem_pct": mem_pct, "cpu_pct": cpu_pct},
)
self._ship_events_to_vps()
def loop(self, interval: int = HEALTH_CHECK_INTERVAL):
logger.info(
f"node-agent ready — node={self.node_name} type={self.node_type} "
f"interval={interval}s"
)
while True:
try:
self.run_once()
except Exception as exc:
logger.error(f"Health cycle error: {exc}")
time.sleep(interval)
if __name__ == "__main__":
NodeAgent().loop()