diff --git a/hosts/chelsty-ha/services.yaml b/hosts/chelsty-ha/services.yaml index a6da182..59a9cca 100644 --- a/hosts/chelsty-ha/services.yaml +++ b/hosts/chelsty-ha/services.yaml @@ -2,5 +2,14 @@ host: chelsty-ha site: chelsty services: + node-agent: + role: node-stability-monitor + # LTE node: monitors homeassistant container health and emits events only. + # No disk cleanup — HA database size is managed by recorder purge_keep_days + # in HA configuration, not by Docker or filesystem operations. + deployment_model: docker-compose + exposure: local-only + offline_required: true + homeassistant: role: home-automation-controller diff --git a/hosts/chelsty-infra/services.yaml b/hosts/chelsty-infra/services.yaml index 475f8df..5049e79 100644 --- a/hosts/chelsty-infra/services.yaml +++ b/hosts/chelsty-infra/services.yaml @@ -2,11 +2,20 @@ host: chelsty-infra site: chelsty services: + node-agent: + role: node-stability-monitor + # LTE node: node-agent monitors and emits events but does NO Docker cleanup. + # Disk pressure on chelsty-infra is typically Frigate recordings; Frigate's + # own retain policy is the correct remediation, not docker prune. + deployment_model: docker-compose + exposure: local-only + offline_required: true + mosquitto: role: local-mqtt-broker + zigbee2mqtt: role: zigbee-mqtt-bridge - stability-agent: - role: node-stability-monitor + frigate: role: nvr diff --git a/hosts/piha/services.yaml b/hosts/piha/services.yaml index 6dc2f80..6676f97 100644 --- a/hosts/piha/services.yaml +++ b/hosts/piha/services.yaml @@ -1,7 +1,7 @@ host: piha services: - stability-agent: + node-agent: role: node-stability-monitor deployment_model: docker-compose exposure: local-only @@ -10,6 +10,6 @@ services: local: [] external: [] runtime: - config_path: /opt/homelab/config/stability-agent + config_path: /opt/homelab/config/node-agent data_path: /opt/homelab/state logs_path: /opt/homelab/events diff --git a/hosts/solaria/services.yaml b/hosts/solaria/services.yaml index 5324beb..2cddb0f 100644 --- a/hosts/solaria/services.yaml +++ b/hosts/solaria/services.yaml @@ -1,7 +1,7 @@ host: solaria services: - stability-agent: + node-agent: role: node-stability-monitor deployment_model: docker-compose exposure: local-only @@ -10,6 +10,6 @@ services: local: [] external: [] runtime: - config_path: /opt/homelab/config/stability-agent + config_path: /opt/homelab/config/node-agent data_path: /opt/homelab/state logs_path: /opt/homelab/events diff --git a/hosts/vps/services.yaml b/hosts/vps/services.yaml index a913804..b1b97d3 100644 --- a/hosts/vps/services.yaml +++ b/hosts/vps/services.yaml @@ -1,7 +1,7 @@ host: vps services: - stability-agent: + node-agent: role: node-stability-monitor deployment_model: docker-compose exposure: local-only @@ -10,7 +10,7 @@ services: local: [] external: [] runtime: - config_path: /opt/homelab/config/stability-agent + config_path: /opt/homelab/config/node-agent data_path: /opt/homelab/state logs_path: /opt/homelab/events @@ -21,7 +21,7 @@ services: offline_required: false depends_on: local: - - stability-agent + - node-agent external: - piha:redis ports: diff --git a/scripts/monitor/health-monitor.sh b/scripts/monitor/health-monitor.sh new file mode 100755 index 0000000..684c5f4 --- /dev/null +++ b/scripts/monitor/health-monitor.sh @@ -0,0 +1,338 @@ +#!/usr/bin/env bash +# health-monitor.sh - Homelab node health monitor and safe disk cleanup +# +# Designed to run standalone on the host (cron or direct) or to be called by +# the node-agent Python daemon. All cleanup decisions follow the conservative +# policy agreed in the design review: +# +# lte_node (chelsty-infra, chelsty-ha) : NO cleanup at all +# sd_card (piha, saturn) : dangling images + stopped containers, +# rate-limited to once per 24 h +# ai_node (solaria) : dangling images + stopped containers +# + build cache (NEVER -a) +# standard (vps) : dangling images + stopped containers +# + build cache +# +# VPS additionally rotates control-plane filesystem artefacts: +# actions/completed + failed > 7 days +# logs/deploy > 30 days +# events/** > 3 days AND past observer checkpoint +# +# NEVER TOUCHED (any node): /opt/homelab/data/, config/, state/, +# actions/pending|approved|running, Frigate recordings, Ollama models, +# Zigbee2MQTT data, Mosquitto data, HA database/config. + +set -euo pipefail + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- +RUNTIME_PATH="${RUNTIME_PATH:-/opt/homelab}" +EVENTS_DIR="${RUNTIME_PATH}/events" +STATE_DIR="${RUNTIME_PATH}/state" +LOGS_DIR="${RUNTIME_PATH}/logs" +ACTIONS_DIR="${RUNTIME_PATH}/actions" + +NODE_NAME="${NODE_NAME:-$(hostname)}" +TIMESTAMP=$(date +%s) +DATE=$(date -u +%Y-%m-%dT%H:%M:%SZ) + +# Thresholds +DISK_WARN_PCT=75 +DISK_CRIT_PCT=85 +MEM_WARN_PCT=85 +MEM_CRIT_PCT=95 + +# Rate-limit file for SD-card nodes (max one Docker cleanup per 24 h) +CLEANUP_LOCK="${STATE_DIR}/last-docker-cleanup" +CLEANUP_INTERVAL=86400 # seconds + +# Node classifications +LTE_NODES="chelsty-infra chelsty-ha" +SD_CARD_NODES="piha saturn" +AI_NODES="solaria" + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +log() { echo "$(date -u +%H:%M:%S) [INFO] $*"; } +warn() { echo "$(date -u +%H:%M:%S) [WARN] $*" >&2; } +err() { echo "$(date -u +%H:%M:%S) [ERROR] $*" >&2; } + +contains() { + local word="$1"; shift + for w in "$@"; do [[ "$w" == "$word" ]] && return 0; done + return 1 +} + +get_node_type() { + # shellcheck disable=SC2086 + if contains "$NODE_NAME" $LTE_NODES; then echo "lte_node"; return; fi + if contains "$NODE_NAME" $SD_CARD_NODES; then echo "sd_card"; return; fi + if contains "$NODE_NAME" $AI_NODES; then echo "ai_node"; return; fi + echo "standard" +} + +# --------------------------------------------------------------------------- +# Event emission +# --------------------------------------------------------------------------- + +emit_event() { + local type="$1" severity="$2" service="${3:-}" message="$4" payload="${5:-{}}" + local id="evt-${NODE_NAME}-${TIMESTAMP}-${type}" + local dir="${EVENTS_DIR}/${NODE_NAME}" + mkdir -p "$dir" + cat > "${dir}/${id}.json" </dev/null | awk 'NR==2 {gsub(/%/,"",$5); print $5}') || return + avail_mb=$(df "${mount}" 2>/dev/null | awk 'NR==2 {printf "%d", $4/1024}') || return + total_mb=$(df "${mount}" 2>/dev/null | awk 'NR==2 {printf "%d", $2/1024}') || return + + if [[ "${usage_pct}" -ge "${DISK_CRIT_PCT}" ]]; then + warn "Disk CRITICAL: ${usage_pct}% used (${avail_mb} MB free)" + emit_event "disk_pressure" "high" "" \ + "Disk usage critical: ${usage_pct}% on ${mount} (${avail_mb} MB free)" \ + "{\"usage_pct\": ${usage_pct}, \"avail_mb\": ${avail_mb}, \"total_mb\": ${total_mb}, \"mount\": \"${mount}\"}" + elif [[ "${usage_pct}" -ge "${DISK_WARN_PCT}" ]]; then + warn "Disk elevated: ${usage_pct}% used" + emit_event "disk_pressure" "medium" "" \ + "Disk usage elevated: ${usage_pct}% on ${mount} (${avail_mb} MB free)" \ + "{\"usage_pct\": ${usage_pct}, \"avail_mb\": ${avail_mb}, \"total_mb\": ${total_mb}, \"mount\": \"${mount}\"}" + fi + echo "${usage_pct}" +} + +check_memory() { + local total avail pct avail_mb + total=$(awk '/^MemTotal/ {print $2}' /proc/meminfo) + avail=$(awk '/^MemAvailable/ {print $2}' /proc/meminfo) + pct=$(( (total - avail) * 100 / total )) + avail_mb=$(( avail / 1024 )) + + if [[ "${pct}" -ge "${MEM_CRIT_PCT}" ]]; then + warn "Memory CRITICAL: ${pct}% used" + emit_event "high_memory" "high" "" \ + "Memory usage critical: ${pct}% (${avail_mb} MB available)" \ + "{\"usage_pct\": ${pct}, \"avail_mb\": ${avail_mb}, \"total_mb\": $((total/1024))}" + elif [[ "${pct}" -ge "${MEM_WARN_PCT}" ]]; then + warn "Memory elevated: ${pct}%" + emit_event "high_memory" "medium" "" \ + "Memory usage elevated: ${pct}% (${avail_mb} MB available)" \ + "{\"usage_pct\": ${pct}, \"avail_mb\": ${avail_mb}, \"total_mb\": $((total/1024))}" + fi + echo "${pct}" +} + +check_cpu() { + # Two-sample /proc/stat delta for accurate instantaneous CPU usage. + local idle1 total1 idle2 total2 pct + read -r idle1 total1 < <(awk '/^cpu / {idle=$5; total=0; for(i=2;i<=NF;i++) total+=$i; print idle, total}' /proc/stat) + sleep 1 + read -r idle2 total2 < <(awk '/^cpu / {idle=$5; total=0; for(i=2;i<=NF;i++) total+=$i; print idle, total}' /proc/stat) + + local d_idle=$(( idle2 - idle1 )) + local d_total=$(( total2 - total1 )) + pct=$(( d_total > 0 ? 100 - d_idle * 100 / d_total : 0 )) + + if [[ "${pct}" -ge 90 ]]; then + warn "CPU elevated: ${pct}%" + emit_event "high_cpu" "medium" "" \ + "CPU usage elevated: ${pct}%" \ + "{\"usage_pct\": ${pct}}" + fi + echo "${pct}" +} + +check_containers() { + command -v docker &>/dev/null || return + + # Containers that have exited but carry a restart policy meaning they should be up + local cname + while IFS= read -r cname; do + [[ -z "$cname" ]] && continue + warn "Container exited (should be running): ${cname}" + emit_event "containers_not_running" "high" "${cname}" \ + "Container '${cname}' has exited unexpectedly (restart=unless-stopped)" \ + "{\"container\": \"${cname}\"}" + done < <(docker ps -a \ + --filter "status=exited" \ + --filter "label=com.docker.compose.project" \ + --format "{{.Names}}" 2>/dev/null || true) + + # Containers that are running but their health check is failing + while IFS= read -r cname; do + [[ -z "$cname" ]] && continue + warn "Container unhealthy: ${cname}" + emit_event "healthcheck_failed" "high" "${cname}" \ + "Container '${cname}' is running but health check is failing" \ + "{\"container\": \"${cname}\"}" + done < <(docker ps \ + --filter "health=unhealthy" \ + --format "{{.Names}}" 2>/dev/null || true) +} + +# --------------------------------------------------------------------------- +# Safe Docker cleanup (per policy) +# --------------------------------------------------------------------------- + +_sd_card_rate_ok() { + if [[ -f "${CLEANUP_LOCK}" ]]; then + local last_ts elapsed + last_ts=$(cat "${CLEANUP_LOCK}" 2>/dev/null || echo 0) + elapsed=$(( TIMESTAMP - last_ts )) + if [[ "${elapsed}" -lt "${CLEANUP_INTERVAL}" ]]; then + log "Docker cleanup skipped: last run ${elapsed}s ago (limit ${CLEANUP_INTERVAL}s)" + return 1 + fi + fi + return 0 +} + +_mark_cleanup_done() { + echo "${TIMESTAMP}" > "${CLEANUP_LOCK}" +} + +run_safe_cleanup() { + command -v docker &>/dev/null || return + local node_type + node_type=$(get_node_type) + + case "${node_type}" in + lte_node) + # NO cleanup on LTE nodes. Any docker operation risks triggering + # a pull over a metered/intermittent connection. + log "Skipping Docker cleanup: LTE node (${NODE_NAME})" + ;; + + sd_card) + # Dangling images + stopped containers only. + # Rate-limited to once per 24 hours to protect SD card write endurance. + _sd_card_rate_ok || return + log "Running rate-limited Docker cleanup (SD card node)" + docker image prune -f >/dev/null 2>&1 || true + docker container prune -f >/dev/null 2>&1 || true + _mark_cleanup_done + ;; + + ai_node) + # Dangling images + stopped containers + build cache. + # NEVER docker image prune -a (would remove Ollama runtime images, + # requiring a multi-hour re-pull of model weights). + log "Running AI-node Docker cleanup (dangling images + containers + build cache)" + docker image prune -f >/dev/null 2>&1 || true + docker container prune -f >/dev/null 2>&1 || true + docker builder prune -f >/dev/null 2>&1 || true + ;; + + standard) + # VPS and other standard nodes: full safe cleanup. + log "Running standard Docker cleanup" + docker image prune -f >/dev/null 2>&1 || true + docker container prune -f >/dev/null 2>&1 || true + docker builder prune -f >/dev/null 2>&1 || true + ;; + esac +} + +# --------------------------------------------------------------------------- +# VPS-specific: control-plane filesystem rotation +# --------------------------------------------------------------------------- + +cleanup_control_plane_fs() { + log "Running control-plane filesystem rotation" + + # Completed / failed actions older than 7 days + for status in completed failed; do + local dir="${ACTIONS_DIR}/${status}" + [[ -d "${dir}" ]] || continue + find "${dir}" -name "*.json" -mtime +7 -delete 2>/dev/null && \ + log "Cleaned ${status} actions older than 7 days" || true + done + + # Deploy logs older than 30 days + local deploy_logs="${LOGS_DIR}/deploy" + if [[ -d "${deploy_logs}" ]]; then + find "${deploy_logs}" -name "*.log" -mtime +30 -delete 2>/dev/null && \ + log "Cleaned deploy logs older than 30 days" || true + fi + + # Event files older than 3 days AND already past the observer checkpoint. + # The dual condition ensures we never delete an event the observer hasn't seen. + local checkpoint="${STATE_DIR}/observer_checkpoint.json" + if [[ -f "${checkpoint}" ]] && command -v python3 &>/dev/null; then + local last_processed + last_processed=$(python3 -c " +import json, sys +try: + d = json.load(open('${checkpoint}')) + print(d.get('last_processed_file', '')) +except Exception: + print('') +" 2>/dev/null || echo "") + + if [[ -n "${last_processed}" ]]; then + find "${EVENTS_DIR}" -name "*.json" -mtime +3 | while IFS= read -r f; do + # Only delete files that sort before the checkpoint path + # (i.e., the observer has already processed them). + if [[ "$f" < "${last_processed}" ]]; then + rm -f "$f" + log "Cleaned old event: $(basename "$f")" + fi + done + else + log "No observer checkpoint set; skipping event file cleanup" + fi + fi +} + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +mkdir -p "${EVENTS_DIR}/${NODE_NAME}" "${STATE_DIR}" + +log "Health check starting on ${NODE_NAME} (type=$(get_node_type))" + +disk_pct=$(check_disk || echo 0) +mem_pct=$(check_memory || echo 0) +cpu_pct=$(check_cpu || echo 0) +check_containers + +run_safe_cleanup + +# VPS: also rotate control-plane filesystem artefacts +if [[ "${NODE_NAME}" == "vps" ]]; then + cleanup_control_plane_fs +fi + +# Emit a node_health heartbeat so the observer can update node status +# and the supervisor can see up-to-date resource metrics. +emit_event "node_health" "info" "" \ + "Health check completed on ${NODE_NAME}" \ + "{\"disk_pct\": ${disk_pct}, \"mem_pct\": ${mem_pct}, \"cpu_pct\": ${cpu_pct}}" + +log "Health check complete (disk=${disk_pct}% mem=${mem_pct}% cpu=${cpu_pct}%)" diff --git a/scripts/observer/observer.py b/scripts/observer/observer.py index 59b09b7..1b1b750 100644 --- a/scripts/observer/observer.py +++ b/scripts/observer/observer.py @@ -159,12 +159,41 @@ class Observer: "roles": self.inventory["nodes"].get(node, {}).get("roles", []) } self.world_state["nodes"][node]["last_seen"] = timestamp - + if etype == "node_online": self.world_state["nodes"][node]["status"] = "online" elif etype == "node_offline": self.world_state["nodes"][node]["status"] = "offline" + elif etype == "node_health": + # Regular heartbeat from node-agent; updates resource metrics. + # Clears disk_pressure if disk is now healthy (< warn threshold). + self.world_state["nodes"][node]["status"] = "online" + self.world_state["nodes"][node].update({ + "disk_usage_pct": payload.get("disk_pct"), + "mem_usage_pct": payload.get("mem_pct"), + "cpu_usage_pct": payload.get("cpu_pct"), + }) + if (payload.get("disk_pct") or 0) < 75: + self.world_state["nodes"][node].pop("disk_pressure", None) + + elif etype == "disk_pressure": + # Emitted when disk usage crosses 75 % (medium) or 85 % (high). + # The supervisor reads disk_pressure to generate disk_cleanup actions. + self.world_state["nodes"][node]["disk_pressure"] = severity + self.world_state["nodes"][node]["disk_usage_pct"] = payload.get("usage_pct") + + elif etype == "high_memory": + # Memory pressure observation; recorded on the node for correlation. + # No automated action — operator decides if a container restart helps. + self.world_state["nodes"][node]["memory_pressure"] = severity + self.world_state["nodes"][node]["mem_usage_pct"] = payload.get("usage_pct") + + elif etype == "high_cpu": + # CPU pressure observation; recorded for visibility. + self.world_state["nodes"][node]["cpu_pressure"] = severity + self.world_state["nodes"][node]["cpu_usage_pct"] = payload.get("usage_pct") + # 2. Update Service State if service and service != "all": svc_key = f"{node}/{service}" diff --git a/services/control-plane/src/executor.py b/services/control-plane/src/executor.py index 34d7734..3dddd6d 100644 --- a/services/control-plane/src/executor.py +++ b/services/control-plane/src/executor.py @@ -93,6 +93,14 @@ class Executor: container_name = data.get("container_name") or service success, error_msg = self._execute_container_restart(node, container_name) + elif action_type == "disk_cleanup": + # Operator-approved aggressive Docker cleanup (image prune -a + + # volume prune). Commands come from the action payload so the + # supervisor controls exactly what runs; the executor adds a + # safety check to reject anything touching protected paths. + payload = data.get("payload", {}) + success, error_msg = self._execute_disk_cleanup(node, payload) + else: success = False error_msg = f"Unknown action type: {action_type}" @@ -163,6 +171,57 @@ class Executor: ) return False, last_error + def _execute_disk_cleanup(self, node: str, payload: dict): + """ + SSH to the target node and run the operator-approved disk cleanup + commands from the action payload. + + Safety invariants enforced here regardless of payload content: + - No command may reference /opt/homelab/data/, /opt/homelab/config/, + or /opt/homelab/state/ (application data and configuration). + - No command may contain rm -rf / or similar destructive patterns. + If any command fails the safety check the entire action is rejected + (not run at all) and the rejection reason is recorded. + + Returns (success: bool, error_msg: str). + """ + commands = payload.get("commands", [ + "docker image prune -a -f", + "docker volume prune -f", + ]) + + # Safety gate: reject commands that touch protected paths + FORBIDDEN = [ + "/opt/homelab/data", + "/opt/homelab/config", + "/opt/homelab/state", + "rm -rf /", + ] + for cmd in commands: + for forbidden in FORBIDDEN: + if forbidden in cmd: + msg = f"Rejected: command contains forbidden pattern '{forbidden}': {cmd}" + logger.error(msg) + return False, msg + + full_command = " && ".join(commands) + cmd = [ + "ssh", + *SSH_OPTIONS, + f"{SSH_USER}@{node}", + full_command, + ] + logger.info(f"Disk cleanup on {node}: {full_command}") + + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode == 0: + logger.info(f"Disk cleanup on {node} succeeded") + return True, "" + + error_msg = (result.stderr or result.stdout).strip() + logger.error(f"Disk cleanup on {node} failed: {error_msg}") + return False, error_msg + def loop(self, interval=10): logger.info("Starting executor loop") while True: diff --git a/services/control-plane/src/supervisor.py b/services/control-plane/src/supervisor.py index f7e1469..e39daeb 100644 --- a/services/control-plane/src/supervisor.py +++ b/services/control-plane/src/supervisor.py @@ -26,6 +26,12 @@ except Exception: # or a dependency (MQTT) is unreachable — a restart is the right first step. CONTAINER_RESTART_TRIGGERS = {"containers_not_running", "mqtt_unreachable"} +# Nodes where automatic disk_cleanup actions must NOT be generated. +# On chelsty nodes disk fullness is overwhelmingly caused by Frigate recordings +# or the HA database — Docker cleanup will not help and the operator must +# decide explicitly (e.g. adjust Frigate retain policy or purge HA recorder). +NO_DISK_CLEANUP_NODES = {"chelsty-infra", "chelsty-ha"} + # Logging setup logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger("supervisor") @@ -207,10 +213,17 @@ class Supervisor: "trigger_type": trigger_type, }) - # 2. Generate recommendations + # 2. Generate service-level recommendations for drift in drifts: self._generate_recommendation(drift) + # 3. Generate node-level recommendations (disk pressure) + for node_name, node_info in self.actual_state["nodes"].items(): + if node_name in NO_DISK_CLEANUP_NODES: + continue + if node_info.get("disk_pressure") == "high": + self._generate_disk_cleanup_recommendation(node_name) + # ------------------------------------------------------------------ # Recommendation generation # ------------------------------------------------------------------ @@ -291,6 +304,59 @@ class Supervisor: except Exception as e: logger.error(f"Failed to save recommendation {action_id}: {e}") + def _generate_disk_cleanup_recommendation(self, node: str): + """ + Generate a disk_cleanup action when node-agent reports critical disk + pressure (>85 %) on a node that supports automated Docker cleanup. + + This is an OPERATOR-APPROVED action (risk=guarded): it runs + `docker image prune -a -f` and `docker volume prune -f`, which are + more aggressive than the safe auto-cleanup the node-agent runs itself. + + Nodes in NO_DISK_CLEANUP_NODES never reach this method (filtered in + reconcile) because their disk fullness is caused by application data + (Frigate, HA) that the operator must handle manually. + """ + action_id = f"disk-cleanup-{node}" + + for state in ("pending", "approved", "running"): + if (ACTIONS_DIR / state / f"{action_id}.json").exists(): + logger.debug(f"Skipping {action_id}: already in state '{state}'") + return + + action = { + "action_id": action_id, + "timestamp": time.time(), + "type": "disk_cleanup", + "node": node, + "service": "", + "risk_level": "guarded", + "confidence": 0.85, + "description": ( + f"Aggressive disk cleanup on {node}: docker image prune -a " + f"and docker volume prune (requires operator approval)" + ), + "status": "pending", + "payload": { + "reason": "disk_pressure", + "commands": [ + "docker image prune -a -f", + "docker volume prune -f", + ], + }, + } + + action_path = ACTIONS_DIR / "pending" / f"{action_id}.json" + try: + with open(action_path, "w") as f: + json.dump(action, f, indent=2) + logger.info( + f"Generated disk cleanup recommendation: {action_id} " + f"(node={node}, risk=guarded)" + ) + except Exception as e: + logger.error(f"Failed to save disk cleanup recommendation {action_id}: {e}") + def loop(self, interval=30): logger.info("Starting supervisor loop") while True: diff --git a/services/node-agent/Dockerfile b/services/node-agent/Dockerfile new file mode 100644 index 0000000..efce50d --- /dev/null +++ b/services/node-agent/Dockerfile @@ -0,0 +1,21 @@ +FROM python:3.11-slim + +WORKDIR /app + +# openssh-client + rsync: used for optional event shipping to VPS +# (only active when VPS_EVENTS_HOST is set in the environment) +RUN apt-get update && apt-get install -y --no-install-recommends \ + openssh-client \ + rsync \ + && rm -rf /var/lib/apt/lists/* + +# docker SDK : container health checks and cleanup (dangling images, prune) +# psutil : fallback system metrics (not used in main path; /proc is primary) +# pyyaml : may be needed for reading host config snippets +RUN pip install --no-cache-dir "docker>=6.0" psutil pyyaml + +COPY src/ /app/src/ + +ENV PYTHONUNBUFFERED=1 + +CMD ["python", "src/node_agent.py"] diff --git a/services/node-agent/docker-compose.yml b/services/node-agent/docker-compose.yml new file mode 100644 index 0000000..0303987 --- /dev/null +++ b/services/node-agent/docker-compose.yml @@ -0,0 +1,44 @@ +services: + node-agent: + build: . + container_name: node-agent + restart: unless-stopped + + environment: + - RUNTIME_PATH=/opt/homelab + - REPO_ROOT=/repo + # NODE_NAME must be set to the canonical topology node name, e.g.: + # NODE_NAME=piha + # The agent uses this to determine its cleanup policy (lte_node / sd_card / + # ai_node / standard) and to tag emitted events with the correct node name. + - NODE_NAME=${NODE_NAME:-} + # NODE_TYPE overrides auto-detection if needed: + # lte_node | sd_card | ai_node | standard + - NODE_TYPE=${NODE_TYPE:-} + # VPS event shipping (non-VPS nodes only). + # Set VPS_EVENTS_HOST to the VPS Tailscale hostname or IP so that events + # emitted on this node are rsynced to the VPS observer. + # Also mount an SSH key (see commented volume below). + - VPS_EVENTS_HOST=${VPS_EVENTS_HOST:-} + - VPS_EVENTS_USER=${VPS_EVENTS_USER:-oskar} + - VPS_EVENTS_PATH=${VPS_EVENTS_PATH:-/opt/homelab/events} + # How often (seconds) to run a full health check cycle (default: 60) + - CHECK_INTERVAL=${CHECK_INTERVAL:-60} + + volumes: + # Runtime filesystem — events, state, actions, logs + - /opt/homelab:/opt/homelab + # Docker socket — required for container health checks and Docker cleanup + - /var/run/docker.sock:/var/run/docker.sock + # Repo (read-only) — scripts and host config accessible to agent + - ../..:/repo:ro + # SSH key for event shipping to VPS. + # Uncomment and set SSH_KEY_PATH on nodes where VPS_EVENTS_HOST is set: + # - ${SSH_KEY_PATH:-/home/oskar/.ssh/id_ed25519}:/root/.ssh/id_rsa:ro + + healthcheck: + test: ["CMD", "test", "-f", "/opt/homelab/state/node-agent.heartbeat"] + interval: 30s + timeout: 5s + retries: 3 + start_period: 15s diff --git a/services/node-agent/src/node_agent.py b/services/node-agent/src/node_agent.py new file mode 100644 index 0000000..75cb2e5 --- /dev/null +++ b/services/node-agent/src/node_agent.py @@ -0,0 +1,539 @@ +""" +node_agent.py — Homelab node health monitor daemon. + +Runs as a Docker container on every managed node. Each cycle it: + 1. Collects system metrics (disk, memory, CPU). + 2. Checks Docker container health. + 3. Emits structured event JSON files to /opt/homelab/events//. + 4. Applies safe Docker / filesystem cleanup per the conservative policy. + 5. Optionally rsyncs events to VPS so the control-plane observer can process them. + +Cleanup policy (matches health-monitor.sh): + lte_node (chelsty-infra, chelsty-ha) : NO cleanup, NO image operations + sd_card (piha, saturn) : dangling images + stopped containers, + max once per 24 h + ai_node (solaria) : dangling + containers + build cache, + NEVER docker image prune -a + standard (vps) : dangling + containers + build cache + + control-plane filesystem rotation + +NEVER TOUCHED on any node: + /opt/homelab/data/ Frigate recordings, Ollama models, HA db, MQTT state + /opt/homelab/config/ All hand-crafted and repo-seeded configuration + /opt/homelab/state/ Heartbeat files, observer checkpoint + actions/pending|approved|running Live work queue +""" + +import json +import logging +import os +import shutil +import socket +import subprocess +import time +from datetime import datetime, timezone +from pathlib import Path + +try: + import docker as docker_sdk + DOCKER_AVAILABLE = True +except ImportError: + DOCKER_AVAILABLE = False + docker_sdk = None + +# --------------------------------------------------------------------------- +# Logging +# --------------------------------------------------------------------------- +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger("node-agent") + +# --------------------------------------------------------------------------- +# Runtime paths +# --------------------------------------------------------------------------- +RUNTIME_PATH = Path(os.getenv("RUNTIME_PATH", "/opt/homelab")) +REPO_ROOT = Path(os.getenv("REPO_ROOT", "/repo")) +EVENTS_DIR = RUNTIME_PATH / "events" +STATE_DIR = RUNTIME_PATH / "state" +LOGS_DIR = RUNTIME_PATH / "logs" +ACTIONS_DIR = RUNTIME_PATH / "actions" + +# --------------------------------------------------------------------------- +# Node identity +# --------------------------------------------------------------------------- +NODE_NAME = os.getenv("NODE_NAME") or socket.gethostname() +NODE_TYPE = os.getenv("NODE_TYPE", "") # override auto-detection if set + +VPS_NODE_NAME = "vps" +LTE_NODES = {"chelsty-infra", "chelsty-ha"} +SD_CARD_NODES = {"piha", "saturn"} +AI_NODES = {"solaria"} + +# --------------------------------------------------------------------------- +# Event shipping (optional — requires VPS_EVENTS_HOST + SSH key in container) +# --------------------------------------------------------------------------- +VPS_EVENTS_HOST = os.getenv("VPS_EVENTS_HOST", "") +VPS_EVENTS_USER = os.getenv("VPS_EVENTS_USER", "oskar") +VPS_EVENTS_PATH = os.getenv("VPS_EVENTS_PATH", "/opt/homelab/events") + +# --------------------------------------------------------------------------- +# Thresholds +# --------------------------------------------------------------------------- +DISK_WARN_PCT = 75 +DISK_CRIT_PCT = 85 +MEM_WARN_PCT = 85 +MEM_CRIT_PCT = 95 + +# SD-card nodes: enforce 24-hour gap between Docker cleanup runs +CLEANUP_INTERVAL_SECS = 86_400 +LAST_CLEANUP_FILE = STATE_DIR / "last-docker-cleanup" + +# How long to wait between full health-check cycles +HEALTH_CHECK_INTERVAL = int(os.getenv("CHECK_INTERVAL", "60")) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _resolve_node_type() -> str: + if NODE_TYPE: + return NODE_TYPE + if NODE_NAME in LTE_NODES: + return "lte_node" + if NODE_NAME in SD_CARD_NODES: + return "sd_card" + if NODE_NAME in AI_NODES: + return "ai_node" + return "standard" + + +def _utc_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +# --------------------------------------------------------------------------- +# NodeAgent +# --------------------------------------------------------------------------- + +class NodeAgent: + def __init__(self): + self.node_name = NODE_NAME + self.node_type = _resolve_node_type() + self._ensure_dirs() + + self.docker_client = None + if DOCKER_AVAILABLE: + try: + self.docker_client = docker_sdk.from_env() + logger.info("Docker SDK connected") + except Exception as exc: + logger.warning(f"Docker unavailable: {exc}") + + logger.info(f"node-agent starting: node={self.node_name} type={self.node_type}") + + # ------------------------------------------------------------------ + # Directories + # ------------------------------------------------------------------ + + def _ensure_dirs(self): + for d in [EVENTS_DIR, STATE_DIR, LOGS_DIR, + EVENTS_DIR / self.node_name]: + d.mkdir(parents=True, exist_ok=True) + + def _node_events_dir(self) -> Path: + return EVENTS_DIR / self.node_name + + # ------------------------------------------------------------------ + # Event emission + # ------------------------------------------------------------------ + + def emit_event(self, event_type: str, severity: str, service, + message: str, payload: dict = None): + ts = int(time.time()) + event_id = f"evt-{self.node_name}-{ts}-{event_type}" + event = { + "id": event_id, + "timestamp": ts, + "date": _utc_iso(), + "type": event_type, + "severity": severity, + "node": self.node_name, + "service": service or "", + "message": message, + "payload": payload or {}, + } + path = self._node_events_dir() / f"{event_id}.json" + try: + path.write_text(json.dumps(event, indent=2)) + except Exception as exc: + logger.error(f"Failed to write event {event_id}: {exc}") + + # ------------------------------------------------------------------ + # System metrics + # ------------------------------------------------------------------ + + def check_disk(self) -> int: + """ + Check disk usage of the filesystem that hosts RUNTIME_PATH. + Using shutil.disk_usage on the mounted runtime path works both + natively on the host and inside a container that mounts /opt/homelab + from the host — the reported usage reflects the host partition. + """ + try: + usage = shutil.disk_usage(str(RUNTIME_PATH)) + pct = int(usage.used * 100 / usage.total) + avail_mb = int(usage.free / (1024 * 1024)) + total_mb = int(usage.total / (1024 * 1024)) + payload = { + "usage_pct": pct, "avail_mb": avail_mb, + "total_mb": total_mb, "mount": str(RUNTIME_PATH), + } + if pct >= DISK_CRIT_PCT: + logger.warning(f"Disk critical: {pct}% used") + self.emit_event( + "disk_pressure", "high", None, + f"Disk usage critical: {pct}% on {RUNTIME_PATH} ({avail_mb} MB free)", + payload, + ) + elif pct >= DISK_WARN_PCT: + logger.info(f"Disk elevated: {pct}% used") + self.emit_event( + "disk_pressure", "medium", None, + f"Disk usage elevated: {pct}% on {RUNTIME_PATH} ({avail_mb} MB free)", + payload, + ) + return pct + except Exception as exc: + logger.error(f"Disk check failed: {exc}") + return 0 + + def check_memory(self) -> int: + """Read host memory from /proc/meminfo (visible in container without special mounts).""" + try: + info: dict = {} + with open("/proc/meminfo") as fh: + for line in fh: + k, v = line.split(":") + info[k.strip()] = int(v.strip().split()[0]) + + total = info.get("MemTotal", 1) + avail = info.get("MemAvailable", total) + pct = int((total - avail) * 100 / total) + avail_mb = avail // 1024 + total_mb = total // 1024 + payload = {"usage_pct": pct, "avail_mb": avail_mb, "total_mb": total_mb} + + if pct >= MEM_CRIT_PCT: + logger.warning(f"Memory critical: {pct}%") + self.emit_event("high_memory", "high", None, + f"Memory usage critical: {pct}% ({avail_mb} MB available)", payload) + elif pct >= MEM_WARN_PCT: + self.emit_event("high_memory", "medium", None, + f"Memory usage elevated: {pct}% ({avail_mb} MB available)", payload) + return pct + except Exception as exc: + logger.error(f"Memory check failed: {exc}") + return 0 + + def check_cpu(self) -> int: + """Two-sample /proc/stat delta for accurate instantaneous CPU usage.""" + try: + def _read(): + with open("/proc/stat") as fh: + for line in fh: + if line.startswith("cpu "): + vals = list(map(int, line.split()[1:])) + return vals[3], sum(vals) # idle, total + return 0, 1 + + idle1, total1 = _read() + time.sleep(0.5) + idle2, total2 = _read() + + d_total = total2 - total1 + d_idle = idle2 - idle1 + pct = 100 - int(d_idle * 100 / d_total) if d_total else 0 + + if pct >= 90: + self.emit_event("high_cpu", "medium", None, + f"CPU usage elevated: {pct}%", {"usage_pct": pct}) + return pct + except Exception as exc: + logger.error(f"CPU check failed: {exc}") + return 0 + + # ------------------------------------------------------------------ + # Docker container health + # ------------------------------------------------------------------ + + def check_containers(self): + if not self.docker_client: + return + try: + containers = self.docker_client.containers.list(all=True) + except Exception as exc: + logger.error(f"Docker container list failed: {exc}") + return + + for c in containers: + try: + name = c.name + status = c.status + host_config = c.attrs.get("HostConfig", {}) + restart_policy = host_config.get("RestartPolicy", {}).get("Name", "") + health_status = (c.attrs.get("State", {}) + .get("Health", {}) + .get("Status", "")) + + # Exited container that carries an auto-restart policy + if status in ("exited", "dead") and restart_policy in ( + "unless-stopped", "always", "on-failure" + ): + logger.warning(f"Container exited: {name} (restart={restart_policy})") + self.emit_event( + "containers_not_running", "high", name, + f"Container '{name}' has exited (restart={restart_policy})", + {"container": name, "status": status, + "restart_policy": restart_policy}, + ) + + # Running container with a failing health check + elif status == "running" and health_status == "unhealthy": + logger.warning(f"Container unhealthy: {name}") + self.emit_event( + "healthcheck_failed", "high", name, + f"Container '{name}' is running but its health check is failing", + {"container": name, "health_status": health_status}, + ) + except Exception as exc: + logger.error(f"Error checking container {c.name}: {exc}") + + # ------------------------------------------------------------------ + # Safe Docker cleanup + # ------------------------------------------------------------------ + + def _sd_card_rate_ok(self) -> bool: + """Return True only if 24 hours have elapsed since last cleanup.""" + if LAST_CLEANUP_FILE.exists(): + try: + last_ts = int(LAST_CLEANUP_FILE.read_text().strip()) + elapsed = time.time() - last_ts + if elapsed < CLEANUP_INTERVAL_SECS: + logger.debug( + f"Docker cleanup rate-limited ({elapsed:.0f}s < {CLEANUP_INTERVAL_SECS}s)" + ) + return False + except Exception: + pass + return True + + def _mark_cleanup_done(self): + try: + LAST_CLEANUP_FILE.write_text(str(int(time.time()))) + except Exception as exc: + logger.error(f"Failed to update cleanup timestamp: {exc}") + + def _prune_dangling_images(self): + if not self.docker_client: + return + try: + result = self.docker_client.images.prune(filters={"dangling": True}) + reclaimed = result.get("SpaceReclaimed", 0) // (1024 * 1024) + logger.info(f"Pruned dangling images ({reclaimed} MB reclaimed)") + except Exception as exc: + logger.error(f"Image prune failed: {exc}") + + def _prune_stopped_containers(self): + if not self.docker_client: + return + try: + result = self.docker_client.containers.prune() + reclaimed = result.get("SpaceReclaimed", 0) // (1024 * 1024) + logger.info(f"Pruned stopped containers ({reclaimed} MB reclaimed)") + except Exception as exc: + logger.error(f"Container prune failed: {exc}") + + def _prune_build_cache(self): + if not self.docker_client: + return + try: + result = self.docker_client.api.prune_builds() + reclaimed = result.get("SpaceReclaimed", 0) // (1024 * 1024) + logger.info(f"Pruned build cache ({reclaimed} MB reclaimed)") + except Exception as exc: + # prune_builds() was added in docker-py 5.0; log and continue + logger.warning(f"Build cache prune unavailable or failed: {exc}") + + def run_safe_cleanup(self): + """Apply the per-node-type Docker cleanup policy.""" + if self.node_type == "lte_node": + # No cleanup on LTE nodes — any Docker op risks a pull over a + # metered/intermittent connection. + logger.debug("Skipping Docker cleanup: LTE node") + return + + if self.node_type == "sd_card": + if not self._sd_card_rate_ok(): + return + self._prune_dangling_images() + self._prune_stopped_containers() + # No builder prune: minimise write cycles on SD card + self._mark_cleanup_done() + return + + # ai_node and standard: dangling + containers + build cache + # ai_node: NEVER -a (would remove Ollama runtime images) + self._prune_dangling_images() + self._prune_stopped_containers() + self._prune_build_cache() + + # ------------------------------------------------------------------ + # VPS-specific: control-plane filesystem rotation + # ------------------------------------------------------------------ + + def _cleanup_control_plane_fs(self): + """ + Rotate control-plane filesystem artefacts on VPS. + Safe targets only — never touches data/, config/, state/, or + live action directories (pending/approved/running). + """ + now = time.time() + seven_days = 7 * 86_400 + thirty_days = 30 * 86_400 + three_days = 3 * 86_400 + + # 1. Completed / failed actions older than 7 days + for status in ("completed", "failed"): + sdir = ACTIONS_DIR / status + if not sdir.exists(): + continue + for f in sdir.glob("*.json"): + try: + if now - f.stat().st_mtime > seven_days: + f.unlink(missing_ok=True) + logger.info(f"Cleaned old {status} action: {f.name}") + except Exception as exc: + logger.error(f"Failed to remove {f}: {exc}") + + # 2. Deploy logs older than 30 days + deploy_dir = LOGS_DIR / "deploy" + if deploy_dir.exists(): + for f in deploy_dir.glob("*.log"): + try: + if now - f.stat().st_mtime > thirty_days: + f.unlink(missing_ok=True) + logger.info(f"Cleaned old deploy log: {f.name}") + except Exception as exc: + logger.error(f"Failed to remove {f}: {exc}") + + # 3. Event files older than 3 days AND already past observer checkpoint. + # The dual condition guarantees we never delete an unprocessed event. + checkpoint_file = STATE_DIR / "observer_checkpoint.json" + last_processed = "" + if checkpoint_file.exists(): + try: + cp = json.loads(checkpoint_file.read_text()) + last_processed = cp.get("last_processed_file", "") + except Exception as exc: + logger.error(f"Failed to read observer checkpoint: {exc}") + + if last_processed: + for f in EVENTS_DIR.glob("**/*.json"): + try: + if (now - f.stat().st_mtime > three_days + and str(f) < last_processed): + f.unlink(missing_ok=True) + logger.info(f"Cleaned old event: {f.name}") + except Exception as exc: + logger.error(f"Failed to remove {f}: {exc}") + else: + logger.debug("No observer checkpoint present; skipping event cleanup") + + # ------------------------------------------------------------------ + # Optional: ship events to VPS via rsync + # ------------------------------------------------------------------ + + def _ship_events_to_vps(self): + """ + Rsync local events to VPS so the observer can process them. + Requires: + - VPS_EVENTS_HOST env var set to the VPS hostname/IP + - SSH key accessible inside the container (mount via docker-compose) + - The node is NOT VPS itself + """ + if not VPS_EVENTS_HOST or self.node_name == VPS_NODE_NAME: + return + + local_dir = str(self._node_events_dir()) + "/" + remote_dir = (f"{VPS_EVENTS_USER}@{VPS_EVENTS_HOST}:" + f"{VPS_EVENTS_PATH}/{self.node_name}/") + cmd = [ + "rsync", "-az", "--remove-source-files", + "-e", "ssh -o StrictHostKeyChecking=no -o ConnectTimeout=10 -o BatchMode=yes", + local_dir, + remote_dir, + ] + try: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + if result.returncode == 0: + logger.debug(f"Events shipped to {remote_dir}") + else: + logger.warning(f"Event shipping failed: {result.stderr.strip()}") + except Exception as exc: + logger.warning(f"Event shipping error: {exc}") + + # ------------------------------------------------------------------ + # Heartbeat + # ------------------------------------------------------------------ + + def _update_heartbeat(self): + try: + (STATE_DIR / "node-agent.heartbeat").touch() + except Exception as exc: + logger.error(f"Failed to update heartbeat: {exc}") + + # ------------------------------------------------------------------ + # Main loop + # ------------------------------------------------------------------ + + def run_once(self): + self._update_heartbeat() + + disk_pct = self.check_disk() + mem_pct = self.check_memory() + cpu_pct = self.check_cpu() + self.check_containers() + + self.run_safe_cleanup() + + if self.node_name == VPS_NODE_NAME: + self._cleanup_control_plane_fs() + + # Emit a node_health heartbeat so the observer can update node status + # and the supervisor can correlate disk/memory metrics with service issues. + self.emit_event( + "node_health", "info", None, + f"Health check completed on {self.node_name}", + {"disk_pct": disk_pct, "mem_pct": mem_pct, "cpu_pct": cpu_pct}, + ) + + self._ship_events_to_vps() + + def loop(self, interval: int = HEALTH_CHECK_INTERVAL): + logger.info( + f"node-agent ready — node={self.node_name} type={self.node_type} " + f"interval={interval}s" + ) + while True: + try: + self.run_once() + except Exception as exc: + logger.error(f"Health cycle error: {exc}") + time.sleep(interval) + + +if __name__ == "__main__": + NodeAgent().loop()