diff --git a/services/stability-agent/README.md b/services/stability-agent/README.md index 621c635..8fdbb6c 100644 --- a/services/stability-agent/README.md +++ b/services/stability-agent/README.md @@ -10,6 +10,7 @@ A lightweight filesystem-first watchdog and observer agent for CHELSTY. * **Tailscale Check**: Verifies Tailscale availability. * **MQTT Reachability**: Checks connectivity to the local MQTT broker. * **Zigbee2MQTT Monitoring**: Specifically monitors the Zigbee2MQTT container. +* **Redis Publishing**: (Optional) Publishes runtime state and events to a central Redis server. * **Event Logging**: Writes append-only JSON events to `/opt/homelab/events/YYYY-MM-DD/chelsty/`. * **State Reporting**: Writes heartbeat and status summary to `/opt/homelab/state/`. @@ -21,6 +22,25 @@ Environment variables: * `DISK_THRESHOLD_PCT`: Disk usage percentage to trigger warning (default: 90). * `MQTT_HOST`: Hostname or IP of the MQTT broker to check. * `MQTT_PORT`: Port of the MQTT broker (default: 1883). +* `REDIS_HOST`: Hostname or IP of the Redis server (e.g., PIHA at 100.108.208.3). +* `REDIS_PORT`: Port of the Redis server (default: 6379). +* `REDIS_ENABLED`: Whether to enable Redis publishing (default: true if REDIS_HOST is set). +* `NODE_NAME`: Name of the current node (default: chelsty). + +#### Verification + +You can verify the Redis publishing using `redis-cli`: + +```bash +# Check node state +redis-cli -h 100.108.208.3 HGETALL homelab:nodes:chelsty + +# Check service discovery +redis-cli -h 100.108.208.3 HGETALL homelab:services:chelsty:stability-agent + +# Check event stream +redis-cli -h 100.108.208.3 XRANGE homelab:events - + +``` #### Safety diff --git a/services/stability-agent/docker-compose.yml b/services/stability-agent/docker-compose.yml index 4d0d848..4a09e98 100644 --- a/services/stability-agent/docker-compose.yml +++ b/services/stability-agent/docker-compose.yml @@ -12,7 +12,10 @@ services: - DISK_THRESHOLD_PCT=${DISK_THRESHOLD_PCT:-90} - MQTT_HOST=${MQTT_HOST} - MQTT_PORT=${MQTT_PORT:-1883} - - NODE_NAME=chelsty + - REDIS_HOST=${REDIS_HOST:-100.108.208.3} + - REDIS_PORT=${REDIS_PORT:-6379} + - REDIS_ENABLED=${REDIS_ENABLED:-true} + - NODE_NAME=${NODE_NAME:-chelsty} healthcheck: test: ["CMD", "/bin/sh", "/app/healthcheck.sh"] interval: 1m diff --git a/services/stability-agent/env.example b/services/stability-agent/env.example index 07edb37..787559b 100644 --- a/services/stability-agent/env.example +++ b/services/stability-agent/env.example @@ -2,3 +2,7 @@ STABILITY_CHECK_INTERVAL=60 DISK_THRESHOLD_PCT=90 MQTT_HOST=mosquitto MQTT_PORT=1883 +REDIS_HOST=100.108.208.3 +REDIS_PORT=6379 +REDIS_ENABLED=true +NODE_NAME=chelsty diff --git a/services/stability-agent/src/stability_agent.py b/services/stability-agent/src/stability_agent.py index 2e9caf7..c775fc7 100644 --- a/services/stability-agent/src/stability_agent.py +++ b/services/stability-agent/src/stability_agent.py @@ -13,6 +13,9 @@ DISK_THRESHOLD_PCT = float(os.environ.get("DISK_THRESHOLD_PCT", "90.0")) MQTT_HOST = os.environ.get("MQTT_HOST") MQTT_PORT = int(os.environ.get("MQTT_PORT", "1883")) NODE_NAME = os.environ.get("NODE_NAME", "chelsty") +REDIS_HOST = os.environ.get("REDIS_HOST") +REDIS_PORT = int(os.environ.get("REDIS_PORT", "6379")) +REDIS_ENABLED = os.environ.get("REDIS_ENABLED", "true").lower() == "true" if REDIS_HOST else False SOURCE = "stability-agent" STATE_DIR = "/opt/homelab/state" @@ -27,9 +30,10 @@ def get_datestamp(): return datetime.datetime.utcnow().strftime("%Y-%m-%d") def emit_event(event_type, severity, message, service=None, details=None): + timestamp = get_timestamp() event = { "id": str(uuid.uuid4()), - "timestamp": get_timestamp(), + "timestamp": timestamp, "node": NODE_NAME, "source": SOURCE, "type": event_type, @@ -50,6 +54,21 @@ def emit_event(event_type, severity, message, service=None, details=None): except Exception as e: print(f"Failed to write event to filesystem: {e}") + # Redis publishing + if REDIS_ENABLED and redis_client: + try: + redis_client.xadd("homelab:events", { + "node": NODE_NAME, + "type": event_type, + "severity": severity, + "timestamp": str(int(time.time())), + "message": message, + "details": json.dumps(details or {}) + }) + except Exception as e: + print(f"Failed to publish event to Redis: {e}") + # Do not crash, already logged to filesystem + print(f"[{severity}] {message}") def check_disk(): @@ -157,6 +176,69 @@ def check_mqtt(): emit_event("mqtt_unreachable", "error", f"MQTT broker at {MQTT_HOST}:{MQTT_PORT} is unreachable", details={"error": str(e)}) return {"configured": True, "reachable": False, "error": str(e)} +class RedisClient: + def __init__(self, host, port=6379): + self.host = host + self.port = port + self.sock = None + + def _connect(self): + if self.sock: + try: + # Check if socket is still alive + self.sock.send(b"", socket.MSG_DONTWAIT) + return True + except (socket.error, AttributeError): + self.sock = None + + try: + self.sock = socket.create_connection((self.host, self.port), timeout=2) + self.sock.settimeout(2.0) + return True + except Exception as e: + self.sock = None + print(f"Redis connection error: {e}") + return False + + def _send_command(self, *args): + if not self._connect(): + return False + + # RESP array + cmd = f"*{len(args)}\r\n" + for arg in args: + s_arg = str(arg) + cmd += f"${len(s_arg.encode('utf-8'))}\r\n{s_arg}\r\n" + + try: + self.sock.sendall(cmd.encode('utf-8')) + # Basic response reading + resp = self.sock.recv(4096) + if resp.startswith(b"-"): + print(f"Redis error response: {resp.decode().strip()}") + return False + return True + except Exception as e: + print(f"Redis send error: {e}") + if self.sock: + self.sock.close() + self.sock = None + return False + + def hset(self, key, mapping): + args = ["HSET", key] + for k, v in mapping.items(): + args.extend([k, v]) + return self._send_command(*args) + + def xadd(self, key, fields): + args = ["XADD", key, "*"] + for k, v in fields.items(): + args.extend([k, v]) + return self._send_command(*args) + +redis_client = RedisClient(REDIS_HOST, REDIS_PORT) if REDIS_ENABLED else None + def main(): print(f"Starting stability-agent on {NODE_NAME}...") @@ -200,6 +282,47 @@ def main(): with open(STATUS_FILE, "w") as f: json.dump(status, f, indent=2) + # Redis publishing + if REDIS_ENABLED and redis_client: + try: + # Node state + node_health = "healthy" + for check in status["checks"].values(): + if isinstance(check, dict) and check.get("status") == "error": + node_health = "unhealthy" + + redis_client.hset(f"homelab:nodes:{NODE_NAME}", { + "id": NODE_NAME, + "hostname": socket.gethostname(), + "health": node_health, + "status": "online", + "last_seen": status["timestamp"], + "capabilities": json.dumps(["docker", "tailscale", "mqtt", "disk"]), + "checks": json.dumps(status["checks"]) + }) + + # Services discovered from Docker + if status["checks"]["docker"]["status"] == "ok": + for c in status["checks"]["docker"]["containers"]: + service_name = c["name"] + service_health = "healthy" if c["state"] == "running" else "unhealthy" + + redis_client.hset(f"homelab:services:{NODE_NAME}:{service_name}", { + "name": service_name, + "node": NODE_NAME, + "health": service_health, + "desired_state": "running", + "actual_state": c["state"], + "deployment_state": "deployed", + "updated_at": status["timestamp"], + "dependencies": json.dumps([]), + "recommendations": json.dumps([]) + }) + except Exception as e: + print(f"Failed to publish to Redis: {e}") + # Local event for Redis error + emit_event("redis_publish_error", "warning", f"Failed to publish to Redis: {e}", details={"error": str(e)}) + except Exception as e: print(f"Error in main loop: {e}") emit_event("agent_error", "error", f"Internal agent error: {e}", details={"error": str(e)})