feat(node-agent): implement health monitor and safe cleanup policy
scripts/monitor/health-monitor.sh (new):
- Standalone bash health monitor: disk/RAM/CPU checks + docker container health
- Per-node-type cleanup policy enforced:
lte_node (chelsty-infra, chelsty-ha): NO cleanup, no docker ops
sd_card (piha, saturn): dangling images + containers, rate-limited once/24h
ai_node (solaria): dangling + containers + build cache, NEVER -a
standard (vps): dangling + containers + build cache + CP filesystem rotation
- VPS filesystem rotation: completed/failed actions >7d, deploy logs >30d,
events >3d AND past observer checkpoint
- Emits structured JSON events (node_health, disk_pressure, high_memory, high_cpu,
containers_not_running, healthcheck_failed)
services/node-agent/ (new):
- Python daemon (node_agent.py): same policy as bash script, Docker SDK
for container checks and cleanup, /proc for system metrics
- Optional event shipping to VPS via rsync+SSH (VPS_EVENTS_HOST env var)
- Dockerfile: python:3.11-slim + openssh-client + rsync + docker>=6.0
- docker-compose.yml: mounts docker socket, /opt/homelab, repo read-only
observer.py:
- Handle node_health: update node status + disk/mem/cpu metrics, clear disk_pressure
- Handle disk_pressure: record severity on node, clear when healthy
- Handle high_memory / high_cpu: record pressure level for correlation
supervisor.py:
- Add NO_DISK_CLEANUP_NODES = {chelsty-infra, chelsty-ha}
- reconcile() step 3: generate disk_cleanup actions for nodes with high disk pressure
- _generate_disk_cleanup_recommendation(): stable ID disk-cleanup-{node},
checks all active states, risk=guarded (operator approval required)
executor.py:
- Handle disk_cleanup action type via _execute_disk_cleanup()
- Commands come from action payload; safety gate rejects any command touching
/opt/homelab/data/, /opt/homelab/config/, /opt/homelab/state/, or rm -rf /
hosts/*/services.yaml:
- Rename stability-agent -> node-agent on piha, vps, solaria, chelsty-infra
- Add node-agent to chelsty-ha (previously missing)
- Add cleanup policy notes to LTE node comments
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
7742bda245
commit
01b7758fe6
|
|
@ -2,5 +2,14 @@ host: chelsty-ha
|
|||
site: chelsty
|
||||
|
||||
services:
|
||||
node-agent:
|
||||
role: node-stability-monitor
|
||||
# LTE node: monitors homeassistant container health and emits events only.
|
||||
# No disk cleanup — HA database size is managed by recorder purge_keep_days
|
||||
# in HA configuration, not by Docker or filesystem operations.
|
||||
deployment_model: docker-compose
|
||||
exposure: local-only
|
||||
offline_required: true
|
||||
|
||||
homeassistant:
|
||||
role: home-automation-controller
|
||||
|
|
|
|||
|
|
@ -2,11 +2,20 @@ host: chelsty-infra
|
|||
site: chelsty
|
||||
|
||||
services:
|
||||
node-agent:
|
||||
role: node-stability-monitor
|
||||
# LTE node: node-agent monitors and emits events but does NO Docker cleanup.
|
||||
# Disk pressure on chelsty-infra is typically Frigate recordings; Frigate's
|
||||
# own retain policy is the correct remediation, not docker prune.
|
||||
deployment_model: docker-compose
|
||||
exposure: local-only
|
||||
offline_required: true
|
||||
|
||||
mosquitto:
|
||||
role: local-mqtt-broker
|
||||
|
||||
zigbee2mqtt:
|
||||
role: zigbee-mqtt-bridge
|
||||
stability-agent:
|
||||
role: node-stability-monitor
|
||||
|
||||
frigate:
|
||||
role: nvr
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
host: piha
|
||||
|
||||
services:
|
||||
stability-agent:
|
||||
node-agent:
|
||||
role: node-stability-monitor
|
||||
deployment_model: docker-compose
|
||||
exposure: local-only
|
||||
|
|
@ -10,6 +10,6 @@ services:
|
|||
local: []
|
||||
external: []
|
||||
runtime:
|
||||
config_path: /opt/homelab/config/stability-agent
|
||||
config_path: /opt/homelab/config/node-agent
|
||||
data_path: /opt/homelab/state
|
||||
logs_path: /opt/homelab/events
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
host: solaria
|
||||
|
||||
services:
|
||||
stability-agent:
|
||||
node-agent:
|
||||
role: node-stability-monitor
|
||||
deployment_model: docker-compose
|
||||
exposure: local-only
|
||||
|
|
@ -10,6 +10,6 @@ services:
|
|||
local: []
|
||||
external: []
|
||||
runtime:
|
||||
config_path: /opt/homelab/config/stability-agent
|
||||
config_path: /opt/homelab/config/node-agent
|
||||
data_path: /opt/homelab/state
|
||||
logs_path: /opt/homelab/events
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
host: vps
|
||||
|
||||
services:
|
||||
stability-agent:
|
||||
node-agent:
|
||||
role: node-stability-monitor
|
||||
deployment_model: docker-compose
|
||||
exposure: local-only
|
||||
|
|
@ -10,7 +10,7 @@ services:
|
|||
local: []
|
||||
external: []
|
||||
runtime:
|
||||
config_path: /opt/homelab/config/stability-agent
|
||||
config_path: /opt/homelab/config/node-agent
|
||||
data_path: /opt/homelab/state
|
||||
logs_path: /opt/homelab/events
|
||||
|
||||
|
|
@ -21,7 +21,7 @@ services:
|
|||
offline_required: false
|
||||
depends_on:
|
||||
local:
|
||||
- stability-agent
|
||||
- node-agent
|
||||
external:
|
||||
- piha:redis
|
||||
ports:
|
||||
|
|
|
|||
338
scripts/monitor/health-monitor.sh
Executable file
338
scripts/monitor/health-monitor.sh
Executable file
|
|
@ -0,0 +1,338 @@
|
|||
#!/usr/bin/env bash
|
||||
# health-monitor.sh - Homelab node health monitor and safe disk cleanup
|
||||
#
|
||||
# Designed to run standalone on the host (cron or direct) or to be called by
|
||||
# the node-agent Python daemon. All cleanup decisions follow the conservative
|
||||
# policy agreed in the design review:
|
||||
#
|
||||
# lte_node (chelsty-infra, chelsty-ha) : NO cleanup at all
|
||||
# sd_card (piha, saturn) : dangling images + stopped containers,
|
||||
# rate-limited to once per 24 h
|
||||
# ai_node (solaria) : dangling images + stopped containers
|
||||
# + build cache (NEVER -a)
|
||||
# standard (vps) : dangling images + stopped containers
|
||||
# + build cache
|
||||
#
|
||||
# VPS additionally rotates control-plane filesystem artefacts:
|
||||
# actions/completed + failed > 7 days
|
||||
# logs/deploy > 30 days
|
||||
# events/** > 3 days AND past observer checkpoint
|
||||
#
|
||||
# NEVER TOUCHED (any node): /opt/homelab/data/, config/, state/,
|
||||
# actions/pending|approved|running, Frigate recordings, Ollama models,
|
||||
# Zigbee2MQTT data, Mosquitto data, HA database/config.
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Configuration
|
||||
# ---------------------------------------------------------------------------
|
||||
RUNTIME_PATH="${RUNTIME_PATH:-/opt/homelab}"
|
||||
EVENTS_DIR="${RUNTIME_PATH}/events"
|
||||
STATE_DIR="${RUNTIME_PATH}/state"
|
||||
LOGS_DIR="${RUNTIME_PATH}/logs"
|
||||
ACTIONS_DIR="${RUNTIME_PATH}/actions"
|
||||
|
||||
NODE_NAME="${NODE_NAME:-$(hostname)}"
|
||||
TIMESTAMP=$(date +%s)
|
||||
DATE=$(date -u +%Y-%m-%dT%H:%M:%SZ)
|
||||
|
||||
# Thresholds
|
||||
DISK_WARN_PCT=75
|
||||
DISK_CRIT_PCT=85
|
||||
MEM_WARN_PCT=85
|
||||
MEM_CRIT_PCT=95
|
||||
|
||||
# Rate-limit file for SD-card nodes (max one Docker cleanup per 24 h)
|
||||
CLEANUP_LOCK="${STATE_DIR}/last-docker-cleanup"
|
||||
CLEANUP_INTERVAL=86400 # seconds
|
||||
|
||||
# Node classifications
|
||||
LTE_NODES="chelsty-infra chelsty-ha"
|
||||
SD_CARD_NODES="piha saturn"
|
||||
AI_NODES="solaria"
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
log() { echo "$(date -u +%H:%M:%S) [INFO] $*"; }
|
||||
warn() { echo "$(date -u +%H:%M:%S) [WARN] $*" >&2; }
|
||||
err() { echo "$(date -u +%H:%M:%S) [ERROR] $*" >&2; }
|
||||
|
||||
contains() {
|
||||
local word="$1"; shift
|
||||
for w in "$@"; do [[ "$w" == "$word" ]] && return 0; done
|
||||
return 1
|
||||
}
|
||||
|
||||
get_node_type() {
|
||||
# shellcheck disable=SC2086
|
||||
if contains "$NODE_NAME" $LTE_NODES; then echo "lte_node"; return; fi
|
||||
if contains "$NODE_NAME" $SD_CARD_NODES; then echo "sd_card"; return; fi
|
||||
if contains "$NODE_NAME" $AI_NODES; then echo "ai_node"; return; fi
|
||||
echo "standard"
|
||||
}
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Event emission
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
emit_event() {
|
||||
local type="$1" severity="$2" service="${3:-}" message="$4" payload="${5:-{}}"
|
||||
local id="evt-${NODE_NAME}-${TIMESTAMP}-${type}"
|
||||
local dir="${EVENTS_DIR}/${NODE_NAME}"
|
||||
mkdir -p "$dir"
|
||||
cat > "${dir}/${id}.json" <<EOF
|
||||
{
|
||||
"id": "${id}",
|
||||
"timestamp": ${TIMESTAMP},
|
||||
"date": "${DATE}",
|
||||
"type": "${type}",
|
||||
"severity": "${severity}",
|
||||
"node": "${NODE_NAME}",
|
||||
"service": "${service}",
|
||||
"message": "${message}",
|
||||
"payload": ${payload}
|
||||
}
|
||||
EOF
|
||||
}
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Health checks
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
check_disk() {
|
||||
# Use /opt/homelab as the check target — it lives on the host filesystem
|
||||
# and this path is correct both when running natively and in a container
|
||||
# that mounts /opt/homelab from the host.
|
||||
local mount="${RUNTIME_PATH}"
|
||||
local usage_pct avail_mb total_mb
|
||||
usage_pct=$(df "${mount}" 2>/dev/null | awk 'NR==2 {gsub(/%/,"",$5); print $5}') || return
|
||||
avail_mb=$(df "${mount}" 2>/dev/null | awk 'NR==2 {printf "%d", $4/1024}') || return
|
||||
total_mb=$(df "${mount}" 2>/dev/null | awk 'NR==2 {printf "%d", $2/1024}') || return
|
||||
|
||||
if [[ "${usage_pct}" -ge "${DISK_CRIT_PCT}" ]]; then
|
||||
warn "Disk CRITICAL: ${usage_pct}% used (${avail_mb} MB free)"
|
||||
emit_event "disk_pressure" "high" "" \
|
||||
"Disk usage critical: ${usage_pct}% on ${mount} (${avail_mb} MB free)" \
|
||||
"{\"usage_pct\": ${usage_pct}, \"avail_mb\": ${avail_mb}, \"total_mb\": ${total_mb}, \"mount\": \"${mount}\"}"
|
||||
elif [[ "${usage_pct}" -ge "${DISK_WARN_PCT}" ]]; then
|
||||
warn "Disk elevated: ${usage_pct}% used"
|
||||
emit_event "disk_pressure" "medium" "" \
|
||||
"Disk usage elevated: ${usage_pct}% on ${mount} (${avail_mb} MB free)" \
|
||||
"{\"usage_pct\": ${usage_pct}, \"avail_mb\": ${avail_mb}, \"total_mb\": ${total_mb}, \"mount\": \"${mount}\"}"
|
||||
fi
|
||||
echo "${usage_pct}"
|
||||
}
|
||||
|
||||
check_memory() {
|
||||
local total avail pct avail_mb
|
||||
total=$(awk '/^MemTotal/ {print $2}' /proc/meminfo)
|
||||
avail=$(awk '/^MemAvailable/ {print $2}' /proc/meminfo)
|
||||
pct=$(( (total - avail) * 100 / total ))
|
||||
avail_mb=$(( avail / 1024 ))
|
||||
|
||||
if [[ "${pct}" -ge "${MEM_CRIT_PCT}" ]]; then
|
||||
warn "Memory CRITICAL: ${pct}% used"
|
||||
emit_event "high_memory" "high" "" \
|
||||
"Memory usage critical: ${pct}% (${avail_mb} MB available)" \
|
||||
"{\"usage_pct\": ${pct}, \"avail_mb\": ${avail_mb}, \"total_mb\": $((total/1024))}"
|
||||
elif [[ "${pct}" -ge "${MEM_WARN_PCT}" ]]; then
|
||||
warn "Memory elevated: ${pct}%"
|
||||
emit_event "high_memory" "medium" "" \
|
||||
"Memory usage elevated: ${pct}% (${avail_mb} MB available)" \
|
||||
"{\"usage_pct\": ${pct}, \"avail_mb\": ${avail_mb}, \"total_mb\": $((total/1024))}"
|
||||
fi
|
||||
echo "${pct}"
|
||||
}
|
||||
|
||||
check_cpu() {
|
||||
# Two-sample /proc/stat delta for accurate instantaneous CPU usage.
|
||||
local idle1 total1 idle2 total2 pct
|
||||
read -r idle1 total1 < <(awk '/^cpu / {idle=$5; total=0; for(i=2;i<=NF;i++) total+=$i; print idle, total}' /proc/stat)
|
||||
sleep 1
|
||||
read -r idle2 total2 < <(awk '/^cpu / {idle=$5; total=0; for(i=2;i<=NF;i++) total+=$i; print idle, total}' /proc/stat)
|
||||
|
||||
local d_idle=$(( idle2 - idle1 ))
|
||||
local d_total=$(( total2 - total1 ))
|
||||
pct=$(( d_total > 0 ? 100 - d_idle * 100 / d_total : 0 ))
|
||||
|
||||
if [[ "${pct}" -ge 90 ]]; then
|
||||
warn "CPU elevated: ${pct}%"
|
||||
emit_event "high_cpu" "medium" "" \
|
||||
"CPU usage elevated: ${pct}%" \
|
||||
"{\"usage_pct\": ${pct}}"
|
||||
fi
|
||||
echo "${pct}"
|
||||
}
|
||||
|
||||
check_containers() {
|
||||
command -v docker &>/dev/null || return
|
||||
|
||||
# Containers that have exited but carry a restart policy meaning they should be up
|
||||
local cname
|
||||
while IFS= read -r cname; do
|
||||
[[ -z "$cname" ]] && continue
|
||||
warn "Container exited (should be running): ${cname}"
|
||||
emit_event "containers_not_running" "high" "${cname}" \
|
||||
"Container '${cname}' has exited unexpectedly (restart=unless-stopped)" \
|
||||
"{\"container\": \"${cname}\"}"
|
||||
done < <(docker ps -a \
|
||||
--filter "status=exited" \
|
||||
--filter "label=com.docker.compose.project" \
|
||||
--format "{{.Names}}" 2>/dev/null || true)
|
||||
|
||||
# Containers that are running but their health check is failing
|
||||
while IFS= read -r cname; do
|
||||
[[ -z "$cname" ]] && continue
|
||||
warn "Container unhealthy: ${cname}"
|
||||
emit_event "healthcheck_failed" "high" "${cname}" \
|
||||
"Container '${cname}' is running but health check is failing" \
|
||||
"{\"container\": \"${cname}\"}"
|
||||
done < <(docker ps \
|
||||
--filter "health=unhealthy" \
|
||||
--format "{{.Names}}" 2>/dev/null || true)
|
||||
}
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Safe Docker cleanup (per policy)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_sd_card_rate_ok() {
|
||||
if [[ -f "${CLEANUP_LOCK}" ]]; then
|
||||
local last_ts elapsed
|
||||
last_ts=$(cat "${CLEANUP_LOCK}" 2>/dev/null || echo 0)
|
||||
elapsed=$(( TIMESTAMP - last_ts ))
|
||||
if [[ "${elapsed}" -lt "${CLEANUP_INTERVAL}" ]]; then
|
||||
log "Docker cleanup skipped: last run ${elapsed}s ago (limit ${CLEANUP_INTERVAL}s)"
|
||||
return 1
|
||||
fi
|
||||
fi
|
||||
return 0
|
||||
}
|
||||
|
||||
_mark_cleanup_done() {
|
||||
echo "${TIMESTAMP}" > "${CLEANUP_LOCK}"
|
||||
}
|
||||
|
||||
run_safe_cleanup() {
|
||||
command -v docker &>/dev/null || return
|
||||
local node_type
|
||||
node_type=$(get_node_type)
|
||||
|
||||
case "${node_type}" in
|
||||
lte_node)
|
||||
# NO cleanup on LTE nodes. Any docker operation risks triggering
|
||||
# a pull over a metered/intermittent connection.
|
||||
log "Skipping Docker cleanup: LTE node (${NODE_NAME})"
|
||||
;;
|
||||
|
||||
sd_card)
|
||||
# Dangling images + stopped containers only.
|
||||
# Rate-limited to once per 24 hours to protect SD card write endurance.
|
||||
_sd_card_rate_ok || return
|
||||
log "Running rate-limited Docker cleanup (SD card node)"
|
||||
docker image prune -f >/dev/null 2>&1 || true
|
||||
docker container prune -f >/dev/null 2>&1 || true
|
||||
_mark_cleanup_done
|
||||
;;
|
||||
|
||||
ai_node)
|
||||
# Dangling images + stopped containers + build cache.
|
||||
# NEVER docker image prune -a (would remove Ollama runtime images,
|
||||
# requiring a multi-hour re-pull of model weights).
|
||||
log "Running AI-node Docker cleanup (dangling images + containers + build cache)"
|
||||
docker image prune -f >/dev/null 2>&1 || true
|
||||
docker container prune -f >/dev/null 2>&1 || true
|
||||
docker builder prune -f >/dev/null 2>&1 || true
|
||||
;;
|
||||
|
||||
standard)
|
||||
# VPS and other standard nodes: full safe cleanup.
|
||||
log "Running standard Docker cleanup"
|
||||
docker image prune -f >/dev/null 2>&1 || true
|
||||
docker container prune -f >/dev/null 2>&1 || true
|
||||
docker builder prune -f >/dev/null 2>&1 || true
|
||||
;;
|
||||
esac
|
||||
}
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# VPS-specific: control-plane filesystem rotation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
cleanup_control_plane_fs() {
|
||||
log "Running control-plane filesystem rotation"
|
||||
|
||||
# Completed / failed actions older than 7 days
|
||||
for status in completed failed; do
|
||||
local dir="${ACTIONS_DIR}/${status}"
|
||||
[[ -d "${dir}" ]] || continue
|
||||
find "${dir}" -name "*.json" -mtime +7 -delete 2>/dev/null && \
|
||||
log "Cleaned ${status} actions older than 7 days" || true
|
||||
done
|
||||
|
||||
# Deploy logs older than 30 days
|
||||
local deploy_logs="${LOGS_DIR}/deploy"
|
||||
if [[ -d "${deploy_logs}" ]]; then
|
||||
find "${deploy_logs}" -name "*.log" -mtime +30 -delete 2>/dev/null && \
|
||||
log "Cleaned deploy logs older than 30 days" || true
|
||||
fi
|
||||
|
||||
# Event files older than 3 days AND already past the observer checkpoint.
|
||||
# The dual condition ensures we never delete an event the observer hasn't seen.
|
||||
local checkpoint="${STATE_DIR}/observer_checkpoint.json"
|
||||
if [[ -f "${checkpoint}" ]] && command -v python3 &>/dev/null; then
|
||||
local last_processed
|
||||
last_processed=$(python3 -c "
|
||||
import json, sys
|
||||
try:
|
||||
d = json.load(open('${checkpoint}'))
|
||||
print(d.get('last_processed_file', ''))
|
||||
except Exception:
|
||||
print('')
|
||||
" 2>/dev/null || echo "")
|
||||
|
||||
if [[ -n "${last_processed}" ]]; then
|
||||
find "${EVENTS_DIR}" -name "*.json" -mtime +3 | while IFS= read -r f; do
|
||||
# Only delete files that sort before the checkpoint path
|
||||
# (i.e., the observer has already processed them).
|
||||
if [[ "$f" < "${last_processed}" ]]; then
|
||||
rm -f "$f"
|
||||
log "Cleaned old event: $(basename "$f")"
|
||||
fi
|
||||
done
|
||||
else
|
||||
log "No observer checkpoint set; skipping event file cleanup"
|
||||
fi
|
||||
fi
|
||||
}
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
mkdir -p "${EVENTS_DIR}/${NODE_NAME}" "${STATE_DIR}"
|
||||
|
||||
log "Health check starting on ${NODE_NAME} (type=$(get_node_type))"
|
||||
|
||||
disk_pct=$(check_disk || echo 0)
|
||||
mem_pct=$(check_memory || echo 0)
|
||||
cpu_pct=$(check_cpu || echo 0)
|
||||
check_containers
|
||||
|
||||
run_safe_cleanup
|
||||
|
||||
# VPS: also rotate control-plane filesystem artefacts
|
||||
if [[ "${NODE_NAME}" == "vps" ]]; then
|
||||
cleanup_control_plane_fs
|
||||
fi
|
||||
|
||||
# Emit a node_health heartbeat so the observer can update node status
|
||||
# and the supervisor can see up-to-date resource metrics.
|
||||
emit_event "node_health" "info" "" \
|
||||
"Health check completed on ${NODE_NAME}" \
|
||||
"{\"disk_pct\": ${disk_pct}, \"mem_pct\": ${mem_pct}, \"cpu_pct\": ${cpu_pct}}"
|
||||
|
||||
log "Health check complete (disk=${disk_pct}% mem=${mem_pct}% cpu=${cpu_pct}%)"
|
||||
|
|
@ -159,12 +159,41 @@ class Observer:
|
|||
"roles": self.inventory["nodes"].get(node, {}).get("roles", [])
|
||||
}
|
||||
self.world_state["nodes"][node]["last_seen"] = timestamp
|
||||
|
||||
|
||||
if etype == "node_online":
|
||||
self.world_state["nodes"][node]["status"] = "online"
|
||||
elif etype == "node_offline":
|
||||
self.world_state["nodes"][node]["status"] = "offline"
|
||||
|
||||
elif etype == "node_health":
|
||||
# Regular heartbeat from node-agent; updates resource metrics.
|
||||
# Clears disk_pressure if disk is now healthy (< warn threshold).
|
||||
self.world_state["nodes"][node]["status"] = "online"
|
||||
self.world_state["nodes"][node].update({
|
||||
"disk_usage_pct": payload.get("disk_pct"),
|
||||
"mem_usage_pct": payload.get("mem_pct"),
|
||||
"cpu_usage_pct": payload.get("cpu_pct"),
|
||||
})
|
||||
if (payload.get("disk_pct") or 0) < 75:
|
||||
self.world_state["nodes"][node].pop("disk_pressure", None)
|
||||
|
||||
elif etype == "disk_pressure":
|
||||
# Emitted when disk usage crosses 75 % (medium) or 85 % (high).
|
||||
# The supervisor reads disk_pressure to generate disk_cleanup actions.
|
||||
self.world_state["nodes"][node]["disk_pressure"] = severity
|
||||
self.world_state["nodes"][node]["disk_usage_pct"] = payload.get("usage_pct")
|
||||
|
||||
elif etype == "high_memory":
|
||||
# Memory pressure observation; recorded on the node for correlation.
|
||||
# No automated action — operator decides if a container restart helps.
|
||||
self.world_state["nodes"][node]["memory_pressure"] = severity
|
||||
self.world_state["nodes"][node]["mem_usage_pct"] = payload.get("usage_pct")
|
||||
|
||||
elif etype == "high_cpu":
|
||||
# CPU pressure observation; recorded for visibility.
|
||||
self.world_state["nodes"][node]["cpu_pressure"] = severity
|
||||
self.world_state["nodes"][node]["cpu_usage_pct"] = payload.get("usage_pct")
|
||||
|
||||
# 2. Update Service State
|
||||
if service and service != "all":
|
||||
svc_key = f"{node}/{service}"
|
||||
|
|
|
|||
|
|
@ -93,6 +93,14 @@ class Executor:
|
|||
container_name = data.get("container_name") or service
|
||||
success, error_msg = self._execute_container_restart(node, container_name)
|
||||
|
||||
elif action_type == "disk_cleanup":
|
||||
# Operator-approved aggressive Docker cleanup (image prune -a +
|
||||
# volume prune). Commands come from the action payload so the
|
||||
# supervisor controls exactly what runs; the executor adds a
|
||||
# safety check to reject anything touching protected paths.
|
||||
payload = data.get("payload", {})
|
||||
success, error_msg = self._execute_disk_cleanup(node, payload)
|
||||
|
||||
else:
|
||||
success = False
|
||||
error_msg = f"Unknown action type: {action_type}"
|
||||
|
|
@ -163,6 +171,57 @@ class Executor:
|
|||
)
|
||||
return False, last_error
|
||||
|
||||
def _execute_disk_cleanup(self, node: str, payload: dict):
|
||||
"""
|
||||
SSH to the target node and run the operator-approved disk cleanup
|
||||
commands from the action payload.
|
||||
|
||||
Safety invariants enforced here regardless of payload content:
|
||||
- No command may reference /opt/homelab/data/, /opt/homelab/config/,
|
||||
or /opt/homelab/state/ (application data and configuration).
|
||||
- No command may contain rm -rf / or similar destructive patterns.
|
||||
If any command fails the safety check the entire action is rejected
|
||||
(not run at all) and the rejection reason is recorded.
|
||||
|
||||
Returns (success: bool, error_msg: str).
|
||||
"""
|
||||
commands = payload.get("commands", [
|
||||
"docker image prune -a -f",
|
||||
"docker volume prune -f",
|
||||
])
|
||||
|
||||
# Safety gate: reject commands that touch protected paths
|
||||
FORBIDDEN = [
|
||||
"/opt/homelab/data",
|
||||
"/opt/homelab/config",
|
||||
"/opt/homelab/state",
|
||||
"rm -rf /",
|
||||
]
|
||||
for cmd in commands:
|
||||
for forbidden in FORBIDDEN:
|
||||
if forbidden in cmd:
|
||||
msg = f"Rejected: command contains forbidden pattern '{forbidden}': {cmd}"
|
||||
logger.error(msg)
|
||||
return False, msg
|
||||
|
||||
full_command = " && ".join(commands)
|
||||
cmd = [
|
||||
"ssh",
|
||||
*SSH_OPTIONS,
|
||||
f"{SSH_USER}@{node}",
|
||||
full_command,
|
||||
]
|
||||
logger.info(f"Disk cleanup on {node}: {full_command}")
|
||||
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
if result.returncode == 0:
|
||||
logger.info(f"Disk cleanup on {node} succeeded")
|
||||
return True, ""
|
||||
|
||||
error_msg = (result.stderr or result.stdout).strip()
|
||||
logger.error(f"Disk cleanup on {node} failed: {error_msg}")
|
||||
return False, error_msg
|
||||
|
||||
def loop(self, interval=10):
|
||||
logger.info("Starting executor loop")
|
||||
while True:
|
||||
|
|
|
|||
|
|
@ -26,6 +26,12 @@ except Exception:
|
|||
# or a dependency (MQTT) is unreachable — a restart is the right first step.
|
||||
CONTAINER_RESTART_TRIGGERS = {"containers_not_running", "mqtt_unreachable"}
|
||||
|
||||
# Nodes where automatic disk_cleanup actions must NOT be generated.
|
||||
# On chelsty nodes disk fullness is overwhelmingly caused by Frigate recordings
|
||||
# or the HA database — Docker cleanup will not help and the operator must
|
||||
# decide explicitly (e.g. adjust Frigate retain policy or purge HA recorder).
|
||||
NO_DISK_CLEANUP_NODES = {"chelsty-infra", "chelsty-ha"}
|
||||
|
||||
# Logging setup
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
logger = logging.getLogger("supervisor")
|
||||
|
|
@ -207,10 +213,17 @@ class Supervisor:
|
|||
"trigger_type": trigger_type,
|
||||
})
|
||||
|
||||
# 2. Generate recommendations
|
||||
# 2. Generate service-level recommendations
|
||||
for drift in drifts:
|
||||
self._generate_recommendation(drift)
|
||||
|
||||
# 3. Generate node-level recommendations (disk pressure)
|
||||
for node_name, node_info in self.actual_state["nodes"].items():
|
||||
if node_name in NO_DISK_CLEANUP_NODES:
|
||||
continue
|
||||
if node_info.get("disk_pressure") == "high":
|
||||
self._generate_disk_cleanup_recommendation(node_name)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Recommendation generation
|
||||
# ------------------------------------------------------------------
|
||||
|
|
@ -291,6 +304,59 @@ class Supervisor:
|
|||
except Exception as e:
|
||||
logger.error(f"Failed to save recommendation {action_id}: {e}")
|
||||
|
||||
def _generate_disk_cleanup_recommendation(self, node: str):
|
||||
"""
|
||||
Generate a disk_cleanup action when node-agent reports critical disk
|
||||
pressure (>85 %) on a node that supports automated Docker cleanup.
|
||||
|
||||
This is an OPERATOR-APPROVED action (risk=guarded): it runs
|
||||
`docker image prune -a -f` and `docker volume prune -f`, which are
|
||||
more aggressive than the safe auto-cleanup the node-agent runs itself.
|
||||
|
||||
Nodes in NO_DISK_CLEANUP_NODES never reach this method (filtered in
|
||||
reconcile) because their disk fullness is caused by application data
|
||||
(Frigate, HA) that the operator must handle manually.
|
||||
"""
|
||||
action_id = f"disk-cleanup-{node}"
|
||||
|
||||
for state in ("pending", "approved", "running"):
|
||||
if (ACTIONS_DIR / state / f"{action_id}.json").exists():
|
||||
logger.debug(f"Skipping {action_id}: already in state '{state}'")
|
||||
return
|
||||
|
||||
action = {
|
||||
"action_id": action_id,
|
||||
"timestamp": time.time(),
|
||||
"type": "disk_cleanup",
|
||||
"node": node,
|
||||
"service": "",
|
||||
"risk_level": "guarded",
|
||||
"confidence": 0.85,
|
||||
"description": (
|
||||
f"Aggressive disk cleanup on {node}: docker image prune -a "
|
||||
f"and docker volume prune (requires operator approval)"
|
||||
),
|
||||
"status": "pending",
|
||||
"payload": {
|
||||
"reason": "disk_pressure",
|
||||
"commands": [
|
||||
"docker image prune -a -f",
|
||||
"docker volume prune -f",
|
||||
],
|
||||
},
|
||||
}
|
||||
|
||||
action_path = ACTIONS_DIR / "pending" / f"{action_id}.json"
|
||||
try:
|
||||
with open(action_path, "w") as f:
|
||||
json.dump(action, f, indent=2)
|
||||
logger.info(
|
||||
f"Generated disk cleanup recommendation: {action_id} "
|
||||
f"(node={node}, risk=guarded)"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save disk cleanup recommendation {action_id}: {e}")
|
||||
|
||||
def loop(self, interval=30):
|
||||
logger.info("Starting supervisor loop")
|
||||
while True:
|
||||
|
|
|
|||
21
services/node-agent/Dockerfile
Normal file
21
services/node-agent/Dockerfile
Normal file
|
|
@ -0,0 +1,21 @@
|
|||
FROM python:3.11-slim
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# openssh-client + rsync: used for optional event shipping to VPS
|
||||
# (only active when VPS_EVENTS_HOST is set in the environment)
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
openssh-client \
|
||||
rsync \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# docker SDK : container health checks and cleanup (dangling images, prune)
|
||||
# psutil : fallback system metrics (not used in main path; /proc is primary)
|
||||
# pyyaml : may be needed for reading host config snippets
|
||||
RUN pip install --no-cache-dir "docker>=6.0" psutil pyyaml
|
||||
|
||||
COPY src/ /app/src/
|
||||
|
||||
ENV PYTHONUNBUFFERED=1
|
||||
|
||||
CMD ["python", "src/node_agent.py"]
|
||||
44
services/node-agent/docker-compose.yml
Normal file
44
services/node-agent/docker-compose.yml
Normal file
|
|
@ -0,0 +1,44 @@
|
|||
services:
|
||||
node-agent:
|
||||
build: .
|
||||
container_name: node-agent
|
||||
restart: unless-stopped
|
||||
|
||||
environment:
|
||||
- RUNTIME_PATH=/opt/homelab
|
||||
- REPO_ROOT=/repo
|
||||
# NODE_NAME must be set to the canonical topology node name, e.g.:
|
||||
# NODE_NAME=piha
|
||||
# The agent uses this to determine its cleanup policy (lte_node / sd_card /
|
||||
# ai_node / standard) and to tag emitted events with the correct node name.
|
||||
- NODE_NAME=${NODE_NAME:-}
|
||||
# NODE_TYPE overrides auto-detection if needed:
|
||||
# lte_node | sd_card | ai_node | standard
|
||||
- NODE_TYPE=${NODE_TYPE:-}
|
||||
# VPS event shipping (non-VPS nodes only).
|
||||
# Set VPS_EVENTS_HOST to the VPS Tailscale hostname or IP so that events
|
||||
# emitted on this node are rsynced to the VPS observer.
|
||||
# Also mount an SSH key (see commented volume below).
|
||||
- VPS_EVENTS_HOST=${VPS_EVENTS_HOST:-}
|
||||
- VPS_EVENTS_USER=${VPS_EVENTS_USER:-oskar}
|
||||
- VPS_EVENTS_PATH=${VPS_EVENTS_PATH:-/opt/homelab/events}
|
||||
# How often (seconds) to run a full health check cycle (default: 60)
|
||||
- CHECK_INTERVAL=${CHECK_INTERVAL:-60}
|
||||
|
||||
volumes:
|
||||
# Runtime filesystem — events, state, actions, logs
|
||||
- /opt/homelab:/opt/homelab
|
||||
# Docker socket — required for container health checks and Docker cleanup
|
||||
- /var/run/docker.sock:/var/run/docker.sock
|
||||
# Repo (read-only) — scripts and host config accessible to agent
|
||||
- ../..:/repo:ro
|
||||
# SSH key for event shipping to VPS.
|
||||
# Uncomment and set SSH_KEY_PATH on nodes where VPS_EVENTS_HOST is set:
|
||||
# - ${SSH_KEY_PATH:-/home/oskar/.ssh/id_ed25519}:/root/.ssh/id_rsa:ro
|
||||
|
||||
healthcheck:
|
||||
test: ["CMD", "test", "-f", "/opt/homelab/state/node-agent.heartbeat"]
|
||||
interval: 30s
|
||||
timeout: 5s
|
||||
retries: 3
|
||||
start_period: 15s
|
||||
539
services/node-agent/src/node_agent.py
Normal file
539
services/node-agent/src/node_agent.py
Normal file
|
|
@ -0,0 +1,539 @@
|
|||
"""
|
||||
node_agent.py — Homelab node health monitor daemon.
|
||||
|
||||
Runs as a Docker container on every managed node. Each cycle it:
|
||||
1. Collects system metrics (disk, memory, CPU).
|
||||
2. Checks Docker container health.
|
||||
3. Emits structured event JSON files to /opt/homelab/events/<node-name>/.
|
||||
4. Applies safe Docker / filesystem cleanup per the conservative policy.
|
||||
5. Optionally rsyncs events to VPS so the control-plane observer can process them.
|
||||
|
||||
Cleanup policy (matches health-monitor.sh):
|
||||
lte_node (chelsty-infra, chelsty-ha) : NO cleanup, NO image operations
|
||||
sd_card (piha, saturn) : dangling images + stopped containers,
|
||||
max once per 24 h
|
||||
ai_node (solaria) : dangling + containers + build cache,
|
||||
NEVER docker image prune -a
|
||||
standard (vps) : dangling + containers + build cache +
|
||||
control-plane filesystem rotation
|
||||
|
||||
NEVER TOUCHED on any node:
|
||||
/opt/homelab/data/ Frigate recordings, Ollama models, HA db, MQTT state
|
||||
/opt/homelab/config/ All hand-crafted and repo-seeded configuration
|
||||
/opt/homelab/state/ Heartbeat files, observer checkpoint
|
||||
actions/pending|approved|running Live work queue
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import socket
|
||||
import subprocess
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
try:
|
||||
import docker as docker_sdk
|
||||
DOCKER_AVAILABLE = True
|
||||
except ImportError:
|
||||
DOCKER_AVAILABLE = False
|
||||
docker_sdk = None
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Logging
|
||||
# ---------------------------------------------------------------------------
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s - %(levelname)s - %(message)s"
|
||||
)
|
||||
logger = logging.getLogger("node-agent")
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Runtime paths
|
||||
# ---------------------------------------------------------------------------
|
||||
RUNTIME_PATH = Path(os.getenv("RUNTIME_PATH", "/opt/homelab"))
|
||||
REPO_ROOT = Path(os.getenv("REPO_ROOT", "/repo"))
|
||||
EVENTS_DIR = RUNTIME_PATH / "events"
|
||||
STATE_DIR = RUNTIME_PATH / "state"
|
||||
LOGS_DIR = RUNTIME_PATH / "logs"
|
||||
ACTIONS_DIR = RUNTIME_PATH / "actions"
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Node identity
|
||||
# ---------------------------------------------------------------------------
|
||||
NODE_NAME = os.getenv("NODE_NAME") or socket.gethostname()
|
||||
NODE_TYPE = os.getenv("NODE_TYPE", "") # override auto-detection if set
|
||||
|
||||
VPS_NODE_NAME = "vps"
|
||||
LTE_NODES = {"chelsty-infra", "chelsty-ha"}
|
||||
SD_CARD_NODES = {"piha", "saturn"}
|
||||
AI_NODES = {"solaria"}
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Event shipping (optional — requires VPS_EVENTS_HOST + SSH key in container)
|
||||
# ---------------------------------------------------------------------------
|
||||
VPS_EVENTS_HOST = os.getenv("VPS_EVENTS_HOST", "")
|
||||
VPS_EVENTS_USER = os.getenv("VPS_EVENTS_USER", "oskar")
|
||||
VPS_EVENTS_PATH = os.getenv("VPS_EVENTS_PATH", "/opt/homelab/events")
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Thresholds
|
||||
# ---------------------------------------------------------------------------
|
||||
DISK_WARN_PCT = 75
|
||||
DISK_CRIT_PCT = 85
|
||||
MEM_WARN_PCT = 85
|
||||
MEM_CRIT_PCT = 95
|
||||
|
||||
# SD-card nodes: enforce 24-hour gap between Docker cleanup runs
|
||||
CLEANUP_INTERVAL_SECS = 86_400
|
||||
LAST_CLEANUP_FILE = STATE_DIR / "last-docker-cleanup"
|
||||
|
||||
# How long to wait between full health-check cycles
|
||||
HEALTH_CHECK_INTERVAL = int(os.getenv("CHECK_INTERVAL", "60"))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _resolve_node_type() -> str:
|
||||
if NODE_TYPE:
|
||||
return NODE_TYPE
|
||||
if NODE_NAME in LTE_NODES:
|
||||
return "lte_node"
|
||||
if NODE_NAME in SD_CARD_NODES:
|
||||
return "sd_card"
|
||||
if NODE_NAME in AI_NODES:
|
||||
return "ai_node"
|
||||
return "standard"
|
||||
|
||||
|
||||
def _utc_iso() -> str:
|
||||
return datetime.now(timezone.utc).isoformat()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# NodeAgent
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class NodeAgent:
|
||||
def __init__(self):
|
||||
self.node_name = NODE_NAME
|
||||
self.node_type = _resolve_node_type()
|
||||
self._ensure_dirs()
|
||||
|
||||
self.docker_client = None
|
||||
if DOCKER_AVAILABLE:
|
||||
try:
|
||||
self.docker_client = docker_sdk.from_env()
|
||||
logger.info("Docker SDK connected")
|
||||
except Exception as exc:
|
||||
logger.warning(f"Docker unavailable: {exc}")
|
||||
|
||||
logger.info(f"node-agent starting: node={self.node_name} type={self.node_type}")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Directories
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _ensure_dirs(self):
|
||||
for d in [EVENTS_DIR, STATE_DIR, LOGS_DIR,
|
||||
EVENTS_DIR / self.node_name]:
|
||||
d.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def _node_events_dir(self) -> Path:
|
||||
return EVENTS_DIR / self.node_name
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Event emission
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def emit_event(self, event_type: str, severity: str, service,
|
||||
message: str, payload: dict = None):
|
||||
ts = int(time.time())
|
||||
event_id = f"evt-{self.node_name}-{ts}-{event_type}"
|
||||
event = {
|
||||
"id": event_id,
|
||||
"timestamp": ts,
|
||||
"date": _utc_iso(),
|
||||
"type": event_type,
|
||||
"severity": severity,
|
||||
"node": self.node_name,
|
||||
"service": service or "",
|
||||
"message": message,
|
||||
"payload": payload or {},
|
||||
}
|
||||
path = self._node_events_dir() / f"{event_id}.json"
|
||||
try:
|
||||
path.write_text(json.dumps(event, indent=2))
|
||||
except Exception as exc:
|
||||
logger.error(f"Failed to write event {event_id}: {exc}")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# System metrics
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def check_disk(self) -> int:
|
||||
"""
|
||||
Check disk usage of the filesystem that hosts RUNTIME_PATH.
|
||||
Using shutil.disk_usage on the mounted runtime path works both
|
||||
natively on the host and inside a container that mounts /opt/homelab
|
||||
from the host — the reported usage reflects the host partition.
|
||||
"""
|
||||
try:
|
||||
usage = shutil.disk_usage(str(RUNTIME_PATH))
|
||||
pct = int(usage.used * 100 / usage.total)
|
||||
avail_mb = int(usage.free / (1024 * 1024))
|
||||
total_mb = int(usage.total / (1024 * 1024))
|
||||
payload = {
|
||||
"usage_pct": pct, "avail_mb": avail_mb,
|
||||
"total_mb": total_mb, "mount": str(RUNTIME_PATH),
|
||||
}
|
||||
if pct >= DISK_CRIT_PCT:
|
||||
logger.warning(f"Disk critical: {pct}% used")
|
||||
self.emit_event(
|
||||
"disk_pressure", "high", None,
|
||||
f"Disk usage critical: {pct}% on {RUNTIME_PATH} ({avail_mb} MB free)",
|
||||
payload,
|
||||
)
|
||||
elif pct >= DISK_WARN_PCT:
|
||||
logger.info(f"Disk elevated: {pct}% used")
|
||||
self.emit_event(
|
||||
"disk_pressure", "medium", None,
|
||||
f"Disk usage elevated: {pct}% on {RUNTIME_PATH} ({avail_mb} MB free)",
|
||||
payload,
|
||||
)
|
||||
return pct
|
||||
except Exception as exc:
|
||||
logger.error(f"Disk check failed: {exc}")
|
||||
return 0
|
||||
|
||||
def check_memory(self) -> int:
|
||||
"""Read host memory from /proc/meminfo (visible in container without special mounts)."""
|
||||
try:
|
||||
info: dict = {}
|
||||
with open("/proc/meminfo") as fh:
|
||||
for line in fh:
|
||||
k, v = line.split(":")
|
||||
info[k.strip()] = int(v.strip().split()[0])
|
||||
|
||||
total = info.get("MemTotal", 1)
|
||||
avail = info.get("MemAvailable", total)
|
||||
pct = int((total - avail) * 100 / total)
|
||||
avail_mb = avail // 1024
|
||||
total_mb = total // 1024
|
||||
payload = {"usage_pct": pct, "avail_mb": avail_mb, "total_mb": total_mb}
|
||||
|
||||
if pct >= MEM_CRIT_PCT:
|
||||
logger.warning(f"Memory critical: {pct}%")
|
||||
self.emit_event("high_memory", "high", None,
|
||||
f"Memory usage critical: {pct}% ({avail_mb} MB available)", payload)
|
||||
elif pct >= MEM_WARN_PCT:
|
||||
self.emit_event("high_memory", "medium", None,
|
||||
f"Memory usage elevated: {pct}% ({avail_mb} MB available)", payload)
|
||||
return pct
|
||||
except Exception as exc:
|
||||
logger.error(f"Memory check failed: {exc}")
|
||||
return 0
|
||||
|
||||
def check_cpu(self) -> int:
|
||||
"""Two-sample /proc/stat delta for accurate instantaneous CPU usage."""
|
||||
try:
|
||||
def _read():
|
||||
with open("/proc/stat") as fh:
|
||||
for line in fh:
|
||||
if line.startswith("cpu "):
|
||||
vals = list(map(int, line.split()[1:]))
|
||||
return vals[3], sum(vals) # idle, total
|
||||
return 0, 1
|
||||
|
||||
idle1, total1 = _read()
|
||||
time.sleep(0.5)
|
||||
idle2, total2 = _read()
|
||||
|
||||
d_total = total2 - total1
|
||||
d_idle = idle2 - idle1
|
||||
pct = 100 - int(d_idle * 100 / d_total) if d_total else 0
|
||||
|
||||
if pct >= 90:
|
||||
self.emit_event("high_cpu", "medium", None,
|
||||
f"CPU usage elevated: {pct}%", {"usage_pct": pct})
|
||||
return pct
|
||||
except Exception as exc:
|
||||
logger.error(f"CPU check failed: {exc}")
|
||||
return 0
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Docker container health
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def check_containers(self):
|
||||
if not self.docker_client:
|
||||
return
|
||||
try:
|
||||
containers = self.docker_client.containers.list(all=True)
|
||||
except Exception as exc:
|
||||
logger.error(f"Docker container list failed: {exc}")
|
||||
return
|
||||
|
||||
for c in containers:
|
||||
try:
|
||||
name = c.name
|
||||
status = c.status
|
||||
host_config = c.attrs.get("HostConfig", {})
|
||||
restart_policy = host_config.get("RestartPolicy", {}).get("Name", "")
|
||||
health_status = (c.attrs.get("State", {})
|
||||
.get("Health", {})
|
||||
.get("Status", ""))
|
||||
|
||||
# Exited container that carries an auto-restart policy
|
||||
if status in ("exited", "dead") and restart_policy in (
|
||||
"unless-stopped", "always", "on-failure"
|
||||
):
|
||||
logger.warning(f"Container exited: {name} (restart={restart_policy})")
|
||||
self.emit_event(
|
||||
"containers_not_running", "high", name,
|
||||
f"Container '{name}' has exited (restart={restart_policy})",
|
||||
{"container": name, "status": status,
|
||||
"restart_policy": restart_policy},
|
||||
)
|
||||
|
||||
# Running container with a failing health check
|
||||
elif status == "running" and health_status == "unhealthy":
|
||||
logger.warning(f"Container unhealthy: {name}")
|
||||
self.emit_event(
|
||||
"healthcheck_failed", "high", name,
|
||||
f"Container '{name}' is running but its health check is failing",
|
||||
{"container": name, "health_status": health_status},
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.error(f"Error checking container {c.name}: {exc}")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Safe Docker cleanup
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _sd_card_rate_ok(self) -> bool:
|
||||
"""Return True only if 24 hours have elapsed since last cleanup."""
|
||||
if LAST_CLEANUP_FILE.exists():
|
||||
try:
|
||||
last_ts = int(LAST_CLEANUP_FILE.read_text().strip())
|
||||
elapsed = time.time() - last_ts
|
||||
if elapsed < CLEANUP_INTERVAL_SECS:
|
||||
logger.debug(
|
||||
f"Docker cleanup rate-limited ({elapsed:.0f}s < {CLEANUP_INTERVAL_SECS}s)"
|
||||
)
|
||||
return False
|
||||
except Exception:
|
||||
pass
|
||||
return True
|
||||
|
||||
def _mark_cleanup_done(self):
|
||||
try:
|
||||
LAST_CLEANUP_FILE.write_text(str(int(time.time())))
|
||||
except Exception as exc:
|
||||
logger.error(f"Failed to update cleanup timestamp: {exc}")
|
||||
|
||||
def _prune_dangling_images(self):
|
||||
if not self.docker_client:
|
||||
return
|
||||
try:
|
||||
result = self.docker_client.images.prune(filters={"dangling": True})
|
||||
reclaimed = result.get("SpaceReclaimed", 0) // (1024 * 1024)
|
||||
logger.info(f"Pruned dangling images ({reclaimed} MB reclaimed)")
|
||||
except Exception as exc:
|
||||
logger.error(f"Image prune failed: {exc}")
|
||||
|
||||
def _prune_stopped_containers(self):
|
||||
if not self.docker_client:
|
||||
return
|
||||
try:
|
||||
result = self.docker_client.containers.prune()
|
||||
reclaimed = result.get("SpaceReclaimed", 0) // (1024 * 1024)
|
||||
logger.info(f"Pruned stopped containers ({reclaimed} MB reclaimed)")
|
||||
except Exception as exc:
|
||||
logger.error(f"Container prune failed: {exc}")
|
||||
|
||||
def _prune_build_cache(self):
|
||||
if not self.docker_client:
|
||||
return
|
||||
try:
|
||||
result = self.docker_client.api.prune_builds()
|
||||
reclaimed = result.get("SpaceReclaimed", 0) // (1024 * 1024)
|
||||
logger.info(f"Pruned build cache ({reclaimed} MB reclaimed)")
|
||||
except Exception as exc:
|
||||
# prune_builds() was added in docker-py 5.0; log and continue
|
||||
logger.warning(f"Build cache prune unavailable or failed: {exc}")
|
||||
|
||||
def run_safe_cleanup(self):
|
||||
"""Apply the per-node-type Docker cleanup policy."""
|
||||
if self.node_type == "lte_node":
|
||||
# No cleanup on LTE nodes — any Docker op risks a pull over a
|
||||
# metered/intermittent connection.
|
||||
logger.debug("Skipping Docker cleanup: LTE node")
|
||||
return
|
||||
|
||||
if self.node_type == "sd_card":
|
||||
if not self._sd_card_rate_ok():
|
||||
return
|
||||
self._prune_dangling_images()
|
||||
self._prune_stopped_containers()
|
||||
# No builder prune: minimise write cycles on SD card
|
||||
self._mark_cleanup_done()
|
||||
return
|
||||
|
||||
# ai_node and standard: dangling + containers + build cache
|
||||
# ai_node: NEVER -a (would remove Ollama runtime images)
|
||||
self._prune_dangling_images()
|
||||
self._prune_stopped_containers()
|
||||
self._prune_build_cache()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# VPS-specific: control-plane filesystem rotation
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _cleanup_control_plane_fs(self):
|
||||
"""
|
||||
Rotate control-plane filesystem artefacts on VPS.
|
||||
Safe targets only — never touches data/, config/, state/, or
|
||||
live action directories (pending/approved/running).
|
||||
"""
|
||||
now = time.time()
|
||||
seven_days = 7 * 86_400
|
||||
thirty_days = 30 * 86_400
|
||||
three_days = 3 * 86_400
|
||||
|
||||
# 1. Completed / failed actions older than 7 days
|
||||
for status in ("completed", "failed"):
|
||||
sdir = ACTIONS_DIR / status
|
||||
if not sdir.exists():
|
||||
continue
|
||||
for f in sdir.glob("*.json"):
|
||||
try:
|
||||
if now - f.stat().st_mtime > seven_days:
|
||||
f.unlink(missing_ok=True)
|
||||
logger.info(f"Cleaned old {status} action: {f.name}")
|
||||
except Exception as exc:
|
||||
logger.error(f"Failed to remove {f}: {exc}")
|
||||
|
||||
# 2. Deploy logs older than 30 days
|
||||
deploy_dir = LOGS_DIR / "deploy"
|
||||
if deploy_dir.exists():
|
||||
for f in deploy_dir.glob("*.log"):
|
||||
try:
|
||||
if now - f.stat().st_mtime > thirty_days:
|
||||
f.unlink(missing_ok=True)
|
||||
logger.info(f"Cleaned old deploy log: {f.name}")
|
||||
except Exception as exc:
|
||||
logger.error(f"Failed to remove {f}: {exc}")
|
||||
|
||||
# 3. Event files older than 3 days AND already past observer checkpoint.
|
||||
# The dual condition guarantees we never delete an unprocessed event.
|
||||
checkpoint_file = STATE_DIR / "observer_checkpoint.json"
|
||||
last_processed = ""
|
||||
if checkpoint_file.exists():
|
||||
try:
|
||||
cp = json.loads(checkpoint_file.read_text())
|
||||
last_processed = cp.get("last_processed_file", "")
|
||||
except Exception as exc:
|
||||
logger.error(f"Failed to read observer checkpoint: {exc}")
|
||||
|
||||
if last_processed:
|
||||
for f in EVENTS_DIR.glob("**/*.json"):
|
||||
try:
|
||||
if (now - f.stat().st_mtime > three_days
|
||||
and str(f) < last_processed):
|
||||
f.unlink(missing_ok=True)
|
||||
logger.info(f"Cleaned old event: {f.name}")
|
||||
except Exception as exc:
|
||||
logger.error(f"Failed to remove {f}: {exc}")
|
||||
else:
|
||||
logger.debug("No observer checkpoint present; skipping event cleanup")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Optional: ship events to VPS via rsync
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _ship_events_to_vps(self):
|
||||
"""
|
||||
Rsync local events to VPS so the observer can process them.
|
||||
Requires:
|
||||
- VPS_EVENTS_HOST env var set to the VPS hostname/IP
|
||||
- SSH key accessible inside the container (mount via docker-compose)
|
||||
- The node is NOT VPS itself
|
||||
"""
|
||||
if not VPS_EVENTS_HOST or self.node_name == VPS_NODE_NAME:
|
||||
return
|
||||
|
||||
local_dir = str(self._node_events_dir()) + "/"
|
||||
remote_dir = (f"{VPS_EVENTS_USER}@{VPS_EVENTS_HOST}:"
|
||||
f"{VPS_EVENTS_PATH}/{self.node_name}/")
|
||||
cmd = [
|
||||
"rsync", "-az", "--remove-source-files",
|
||||
"-e", "ssh -o StrictHostKeyChecking=no -o ConnectTimeout=10 -o BatchMode=yes",
|
||||
local_dir,
|
||||
remote_dir,
|
||||
]
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
||||
if result.returncode == 0:
|
||||
logger.debug(f"Events shipped to {remote_dir}")
|
||||
else:
|
||||
logger.warning(f"Event shipping failed: {result.stderr.strip()}")
|
||||
except Exception as exc:
|
||||
logger.warning(f"Event shipping error: {exc}")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Heartbeat
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _update_heartbeat(self):
|
||||
try:
|
||||
(STATE_DIR / "node-agent.heartbeat").touch()
|
||||
except Exception as exc:
|
||||
logger.error(f"Failed to update heartbeat: {exc}")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Main loop
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def run_once(self):
|
||||
self._update_heartbeat()
|
||||
|
||||
disk_pct = self.check_disk()
|
||||
mem_pct = self.check_memory()
|
||||
cpu_pct = self.check_cpu()
|
||||
self.check_containers()
|
||||
|
||||
self.run_safe_cleanup()
|
||||
|
||||
if self.node_name == VPS_NODE_NAME:
|
||||
self._cleanup_control_plane_fs()
|
||||
|
||||
# Emit a node_health heartbeat so the observer can update node status
|
||||
# and the supervisor can correlate disk/memory metrics with service issues.
|
||||
self.emit_event(
|
||||
"node_health", "info", None,
|
||||
f"Health check completed on {self.node_name}",
|
||||
{"disk_pct": disk_pct, "mem_pct": mem_pct, "cpu_pct": cpu_pct},
|
||||
)
|
||||
|
||||
self._ship_events_to_vps()
|
||||
|
||||
def loop(self, interval: int = HEALTH_CHECK_INTERVAL):
|
||||
logger.info(
|
||||
f"node-agent ready — node={self.node_name} type={self.node_type} "
|
||||
f"interval={interval}s"
|
||||
)
|
||||
while True:
|
||||
try:
|
||||
self.run_once()
|
||||
except Exception as exc:
|
||||
logger.error(f"Health cycle error: {exc}")
|
||||
time.sleep(interval)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
NodeAgent().loop()
|
||||
Loading…
Reference in a new issue