""" node_agent.py — Homelab node health monitor daemon. Runs as a Docker container on every managed node. Each cycle it: 1. Collects system metrics (disk, memory, CPU). 2. Checks Docker container health. 3. Emits structured event JSON files to /opt/homelab/events//. 4. Applies safe Docker / filesystem cleanup per the conservative policy. 5. Optionally rsyncs events to VPS so the control-plane observer can process them. Cleanup policy (matches health-monitor.sh): lte_node (chelsty-infra, chelsty-ha) : NO cleanup, NO image operations sd_card (piha, saturn) : dangling images + stopped containers, max once per 24 h ai_node (solaria) : dangling + containers + build cache, NEVER docker image prune -a standard (vps) : dangling + containers + build cache + control-plane filesystem rotation NEVER TOUCHED on any node: /opt/homelab/data/ Frigate recordings, Ollama models, HA db, MQTT state /opt/homelab/config/ All hand-crafted and repo-seeded configuration /opt/homelab/state/ Heartbeat files, observer checkpoint actions/pending|approved|running Live work queue """ import json import logging import os import shutil import socket import subprocess import time from datetime import datetime, timezone from pathlib import Path try: import docker as docker_sdk DOCKER_AVAILABLE = True except ImportError: DOCKER_AVAILABLE = False docker_sdk = None # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) logger = logging.getLogger("node-agent") # --------------------------------------------------------------------------- # Runtime paths # --------------------------------------------------------------------------- RUNTIME_PATH = Path(os.getenv("RUNTIME_PATH", "/opt/homelab")) REPO_ROOT = Path(os.getenv("REPO_ROOT", "/repo")) EVENTS_DIR = RUNTIME_PATH / "events" STATE_DIR = RUNTIME_PATH / "state" LOGS_DIR = RUNTIME_PATH / "logs" ACTIONS_DIR = RUNTIME_PATH / "actions" # --------------------------------------------------------------------------- # Node identity # --------------------------------------------------------------------------- NODE_NAME = os.getenv("NODE_NAME") or socket.gethostname() NODE_TYPE = os.getenv("NODE_TYPE", "") # override auto-detection if set VPS_NODE_NAME = "vps" LTE_NODES = {"chelsty-infra", "chelsty-ha"} SD_CARD_NODES = {"piha", "saturn"} AI_NODES = {"solaria"} # --------------------------------------------------------------------------- # Event shipping (optional — requires VPS_EVENTS_HOST + SSH key in container) # --------------------------------------------------------------------------- VPS_EVENTS_HOST = os.getenv("VPS_EVENTS_HOST", "") VPS_EVENTS_USER = os.getenv("VPS_EVENTS_USER", "oskar") VPS_EVENTS_PATH = os.getenv("VPS_EVENTS_PATH", "/opt/homelab/events") # --------------------------------------------------------------------------- # Thresholds # --------------------------------------------------------------------------- DISK_WARN_PCT = 75 DISK_CRIT_PCT = 85 MEM_WARN_PCT = 85 MEM_CRIT_PCT = 95 # SD-card nodes: enforce 24-hour gap between Docker cleanup runs CLEANUP_INTERVAL_SECS = 86_400 LAST_CLEANUP_FILE = STATE_DIR / "last-docker-cleanup" # How long to wait between full health-check cycles HEALTH_CHECK_INTERVAL = int(os.getenv("CHECK_INTERVAL", "60")) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _resolve_node_type() -> str: if NODE_TYPE: return NODE_TYPE if NODE_NAME in LTE_NODES: return "lte_node" if NODE_NAME in SD_CARD_NODES: return "sd_card" if NODE_NAME in AI_NODES: return "ai_node" return "standard" def _utc_iso() -> str: return datetime.now(timezone.utc).isoformat() # --------------------------------------------------------------------------- # NodeAgent # --------------------------------------------------------------------------- class NodeAgent: def __init__(self): self.node_name = NODE_NAME self.node_type = _resolve_node_type() self._ensure_dirs() self.docker_client = None if DOCKER_AVAILABLE: try: self.docker_client = docker_sdk.from_env() logger.info("Docker SDK connected") except Exception as exc: logger.warning(f"Docker unavailable: {exc}") logger.info(f"node-agent starting: node={self.node_name} type={self.node_type}") # ------------------------------------------------------------------ # Directories # ------------------------------------------------------------------ def _ensure_dirs(self): for d in [EVENTS_DIR, STATE_DIR, LOGS_DIR, EVENTS_DIR / self.node_name]: d.mkdir(parents=True, exist_ok=True) def _node_events_dir(self) -> Path: return EVENTS_DIR / self.node_name # ------------------------------------------------------------------ # Event emission # ------------------------------------------------------------------ def emit_event(self, event_type: str, severity: str, service, message: str, payload: dict = None): ts = int(time.time()) event_id = f"evt-{self.node_name}-{ts}-{event_type}" event = { "id": event_id, "timestamp": ts, "date": _utc_iso(), "type": event_type, "severity": severity, "node": self.node_name, "service": service or "", "message": message, "payload": payload or {}, } path = self._node_events_dir() / f"{event_id}.json" try: path.write_text(json.dumps(event, indent=2)) except Exception as exc: logger.error(f"Failed to write event {event_id}: {exc}") # ------------------------------------------------------------------ # System metrics # ------------------------------------------------------------------ def check_disk(self) -> int: """ Check disk usage of the filesystem that hosts RUNTIME_PATH. Using shutil.disk_usage on the mounted runtime path works both natively on the host and inside a container that mounts /opt/homelab from the host — the reported usage reflects the host partition. """ try: usage = shutil.disk_usage(str(RUNTIME_PATH)) pct = int(usage.used * 100 / usage.total) avail_mb = int(usage.free / (1024 * 1024)) total_mb = int(usage.total / (1024 * 1024)) payload = { "usage_pct": pct, "avail_mb": avail_mb, "total_mb": total_mb, "mount": str(RUNTIME_PATH), } if pct >= DISK_CRIT_PCT: logger.warning(f"Disk critical: {pct}% used") self.emit_event( "disk_pressure", "high", None, f"Disk usage critical: {pct}% on {RUNTIME_PATH} ({avail_mb} MB free)", payload, ) elif pct >= DISK_WARN_PCT: logger.info(f"Disk elevated: {pct}% used") self.emit_event( "disk_pressure", "medium", None, f"Disk usage elevated: {pct}% on {RUNTIME_PATH} ({avail_mb} MB free)", payload, ) return pct except Exception as exc: logger.error(f"Disk check failed: {exc}") return 0 def check_memory(self) -> int: """Read host memory from /proc/meminfo (visible in container without special mounts).""" try: info: dict = {} with open("/proc/meminfo") as fh: for line in fh: k, v = line.split(":") info[k.strip()] = int(v.strip().split()[0]) total = info.get("MemTotal", 1) avail = info.get("MemAvailable", total) pct = int((total - avail) * 100 / total) avail_mb = avail // 1024 total_mb = total // 1024 payload = {"usage_pct": pct, "avail_mb": avail_mb, "total_mb": total_mb} if pct >= MEM_CRIT_PCT: logger.warning(f"Memory critical: {pct}%") self.emit_event("high_memory", "high", None, f"Memory usage critical: {pct}% ({avail_mb} MB available)", payload) elif pct >= MEM_WARN_PCT: self.emit_event("high_memory", "medium", None, f"Memory usage elevated: {pct}% ({avail_mb} MB available)", payload) return pct except Exception as exc: logger.error(f"Memory check failed: {exc}") return 0 def check_cpu(self) -> int: """Two-sample /proc/stat delta for accurate instantaneous CPU usage.""" try: def _read(): with open("/proc/stat") as fh: for line in fh: if line.startswith("cpu "): vals = list(map(int, line.split()[1:])) return vals[3], sum(vals) # idle, total return 0, 1 idle1, total1 = _read() time.sleep(0.5) idle2, total2 = _read() d_total = total2 - total1 d_idle = idle2 - idle1 pct = 100 - int(d_idle * 100 / d_total) if d_total else 0 if pct >= 90: self.emit_event("high_cpu", "medium", None, f"CPU usage elevated: {pct}%", {"usage_pct": pct}) return pct except Exception as exc: logger.error(f"CPU check failed: {exc}") return 0 # ------------------------------------------------------------------ # Docker container health # ------------------------------------------------------------------ def check_containers(self): if not self.docker_client: return try: containers = self.docker_client.containers.list(all=True) except Exception as exc: logger.error(f"Docker container list failed: {exc}") return for c in containers: try: name = c.name status = c.status host_config = c.attrs.get("HostConfig", {}) restart_policy = host_config.get("RestartPolicy", {}).get("Name", "") health_status = (c.attrs.get("State", {}) .get("Health", {}) .get("Status", "")) # Exited container that carries an auto-restart policy if status in ("exited", "dead") and restart_policy in ( "unless-stopped", "always", "on-failure" ): logger.warning(f"Container exited: {name} (restart={restart_policy})") self.emit_event( "containers_not_running", "high", name, f"Container '{name}' has exited (restart={restart_policy})", {"container": name, "status": status, "restart_policy": restart_policy}, ) # Running container with a failing health check elif status == "running" and health_status == "unhealthy": logger.warning(f"Container unhealthy: {name}") self.emit_event( "healthcheck_failed", "high", name, f"Container '{name}' is running but its health check is failing", {"container": name, "health_status": health_status}, ) except Exception as exc: logger.error(f"Error checking container {c.name}: {exc}") # ------------------------------------------------------------------ # Safe Docker cleanup # ------------------------------------------------------------------ def _sd_card_rate_ok(self) -> bool: """Return True only if 24 hours have elapsed since last cleanup.""" if LAST_CLEANUP_FILE.exists(): try: last_ts = int(LAST_CLEANUP_FILE.read_text().strip()) elapsed = time.time() - last_ts if elapsed < CLEANUP_INTERVAL_SECS: logger.debug( f"Docker cleanup rate-limited ({elapsed:.0f}s < {CLEANUP_INTERVAL_SECS}s)" ) return False except Exception: pass return True def _mark_cleanup_done(self): try: LAST_CLEANUP_FILE.write_text(str(int(time.time()))) except Exception as exc: logger.error(f"Failed to update cleanup timestamp: {exc}") def _prune_dangling_images(self): if not self.docker_client: return try: result = self.docker_client.images.prune(filters={"dangling": True}) reclaimed = result.get("SpaceReclaimed", 0) // (1024 * 1024) logger.info(f"Pruned dangling images ({reclaimed} MB reclaimed)") except Exception as exc: logger.error(f"Image prune failed: {exc}") def _prune_stopped_containers(self): if not self.docker_client: return try: result = self.docker_client.containers.prune() reclaimed = result.get("SpaceReclaimed", 0) // (1024 * 1024) logger.info(f"Pruned stopped containers ({reclaimed} MB reclaimed)") except Exception as exc: logger.error(f"Container prune failed: {exc}") def _prune_build_cache(self): if not self.docker_client: return try: result = self.docker_client.api.prune_builds() reclaimed = result.get("SpaceReclaimed", 0) // (1024 * 1024) logger.info(f"Pruned build cache ({reclaimed} MB reclaimed)") except Exception as exc: # prune_builds() was added in docker-py 5.0; log and continue logger.warning(f"Build cache prune unavailable or failed: {exc}") def run_safe_cleanup(self): """Apply the per-node-type Docker cleanup policy.""" if self.node_type == "lte_node": # No cleanup on LTE nodes — any Docker op risks a pull over a # metered/intermittent connection. logger.debug("Skipping Docker cleanup: LTE node") return if self.node_type == "sd_card": if not self._sd_card_rate_ok(): return self._prune_dangling_images() self._prune_stopped_containers() # No builder prune: minimise write cycles on SD card self._mark_cleanup_done() return # ai_node and standard: dangling + containers + build cache # ai_node: NEVER -a (would remove Ollama runtime images) self._prune_dangling_images() self._prune_stopped_containers() self._prune_build_cache() # ------------------------------------------------------------------ # VPS-specific: control-plane filesystem rotation # ------------------------------------------------------------------ def _cleanup_control_plane_fs(self): """ Rotate control-plane filesystem artefacts on VPS. Safe targets only — never touches data/, config/, state/, or live action directories (pending/approved/running). """ now = time.time() seven_days = 7 * 86_400 thirty_days = 30 * 86_400 three_days = 3 * 86_400 # 1. Completed / failed actions older than 7 days for status in ("completed", "failed"): sdir = ACTIONS_DIR / status if not sdir.exists(): continue for f in sdir.glob("*.json"): try: if now - f.stat().st_mtime > seven_days: f.unlink(missing_ok=True) logger.info(f"Cleaned old {status} action: {f.name}") except Exception as exc: logger.error(f"Failed to remove {f}: {exc}") # 2. Deploy logs older than 30 days deploy_dir = LOGS_DIR / "deploy" if deploy_dir.exists(): for f in deploy_dir.glob("*.log"): try: if now - f.stat().st_mtime > thirty_days: f.unlink(missing_ok=True) logger.info(f"Cleaned old deploy log: {f.name}") except Exception as exc: logger.error(f"Failed to remove {f}: {exc}") # 3. Event files older than 3 days AND already past observer checkpoint. # The dual condition guarantees we never delete an unprocessed event. checkpoint_file = STATE_DIR / "observer_checkpoint.json" last_processed = "" if checkpoint_file.exists(): try: cp = json.loads(checkpoint_file.read_text()) last_processed = cp.get("last_processed_file", "") except Exception as exc: logger.error(f"Failed to read observer checkpoint: {exc}") if last_processed: for f in EVENTS_DIR.glob("**/*.json"): try: if (now - f.stat().st_mtime > three_days and str(f) < last_processed): f.unlink(missing_ok=True) logger.info(f"Cleaned old event: {f.name}") except Exception as exc: logger.error(f"Failed to remove {f}: {exc}") else: logger.debug("No observer checkpoint present; skipping event cleanup") # ------------------------------------------------------------------ # Optional: ship events to VPS via rsync # ------------------------------------------------------------------ def _ship_events_to_vps(self): """ Rsync local events to VPS so the observer can process them. Requires: - VPS_EVENTS_HOST env var set to the VPS hostname/IP - SSH key accessible inside the container (mount via docker-compose) - The node is NOT VPS itself """ if not VPS_EVENTS_HOST or self.node_name == VPS_NODE_NAME: return local_dir = str(self._node_events_dir()) + "/" remote_dir = (f"{VPS_EVENTS_USER}@{VPS_EVENTS_HOST}:" f"{VPS_EVENTS_PATH}/{self.node_name}/") cmd = [ "rsync", "-az", "--remove-source-files", "-e", "ssh -o StrictHostKeyChecking=no -o ConnectTimeout=10 -o BatchMode=yes", local_dir, remote_dir, ] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if result.returncode == 0: logger.debug(f"Events shipped to {remote_dir}") else: logger.warning(f"Event shipping failed: {result.stderr.strip()}") except Exception as exc: logger.warning(f"Event shipping error: {exc}") # ------------------------------------------------------------------ # Heartbeat # ------------------------------------------------------------------ def _update_heartbeat(self): try: (STATE_DIR / "node-agent.heartbeat").touch() except Exception as exc: logger.error(f"Failed to update heartbeat: {exc}") # ------------------------------------------------------------------ # Main loop # ------------------------------------------------------------------ def run_once(self): self._update_heartbeat() disk_pct = self.check_disk() mem_pct = self.check_memory() cpu_pct = self.check_cpu() self.check_containers() self.run_safe_cleanup() if self.node_name == VPS_NODE_NAME: self._cleanup_control_plane_fs() # Emit a node_health heartbeat so the observer can update node status # and the supervisor can correlate disk/memory metrics with service issues. self.emit_event( "node_health", "info", None, f"Health check completed on {self.node_name}", {"disk_pct": disk_pct, "mem_pct": mem_pct, "cpu_pct": cpu_pct}, ) self._ship_events_to_vps() def loop(self, interval: int = HEALTH_CHECK_INTERVAL): logger.info( f"node-agent ready — node={self.node_name} type={self.node_type} " f"interval={interval}s" ) while True: try: self.run_once() except Exception as exc: logger.error(f"Health cycle error: {exc}") time.sleep(interval) if __name__ == "__main__": NodeAgent().loop()