Publish stability agent state to Redis
This commit is contained in:
parent
8d0f2379ba
commit
e8d6d6d473
|
|
@ -10,6 +10,7 @@ A lightweight filesystem-first watchdog and observer agent for CHELSTY.
|
||||||
* **Tailscale Check**: Verifies Tailscale availability.
|
* **Tailscale Check**: Verifies Tailscale availability.
|
||||||
* **MQTT Reachability**: Checks connectivity to the local MQTT broker.
|
* **MQTT Reachability**: Checks connectivity to the local MQTT broker.
|
||||||
* **Zigbee2MQTT Monitoring**: Specifically monitors the Zigbee2MQTT container.
|
* **Zigbee2MQTT Monitoring**: Specifically monitors the Zigbee2MQTT container.
|
||||||
|
* **Redis Publishing**: (Optional) Publishes runtime state and events to a central Redis server.
|
||||||
* **Event Logging**: Writes append-only JSON events to `/opt/homelab/events/YYYY-MM-DD/chelsty/`.
|
* **Event Logging**: Writes append-only JSON events to `/opt/homelab/events/YYYY-MM-DD/chelsty/`.
|
||||||
* **State Reporting**: Writes heartbeat and status summary to `/opt/homelab/state/`.
|
* **State Reporting**: Writes heartbeat and status summary to `/opt/homelab/state/`.
|
||||||
|
|
||||||
|
|
@ -21,6 +22,25 @@ Environment variables:
|
||||||
* `DISK_THRESHOLD_PCT`: Disk usage percentage to trigger warning (default: 90).
|
* `DISK_THRESHOLD_PCT`: Disk usage percentage to trigger warning (default: 90).
|
||||||
* `MQTT_HOST`: Hostname or IP of the MQTT broker to check.
|
* `MQTT_HOST`: Hostname or IP of the MQTT broker to check.
|
||||||
* `MQTT_PORT`: Port of the MQTT broker (default: 1883).
|
* `MQTT_PORT`: Port of the MQTT broker (default: 1883).
|
||||||
|
* `REDIS_HOST`: Hostname or IP of the Redis server (e.g., PIHA at 100.108.208.3).
|
||||||
|
* `REDIS_PORT`: Port of the Redis server (default: 6379).
|
||||||
|
* `REDIS_ENABLED`: Whether to enable Redis publishing (default: true if REDIS_HOST is set).
|
||||||
|
* `NODE_NAME`: Name of the current node (default: chelsty).
|
||||||
|
|
||||||
|
#### Verification
|
||||||
|
|
||||||
|
You can verify the Redis publishing using `redis-cli`:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Check node state
|
||||||
|
redis-cli -h 100.108.208.3 HGETALL homelab:nodes:chelsty
|
||||||
|
|
||||||
|
# Check service discovery
|
||||||
|
redis-cli -h 100.108.208.3 HGETALL homelab:services:chelsty:stability-agent
|
||||||
|
|
||||||
|
# Check event stream
|
||||||
|
redis-cli -h 100.108.208.3 XRANGE homelab:events - +
|
||||||
|
```
|
||||||
|
|
||||||
#### Safety
|
#### Safety
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -12,7 +12,10 @@ services:
|
||||||
- DISK_THRESHOLD_PCT=${DISK_THRESHOLD_PCT:-90}
|
- DISK_THRESHOLD_PCT=${DISK_THRESHOLD_PCT:-90}
|
||||||
- MQTT_HOST=${MQTT_HOST}
|
- MQTT_HOST=${MQTT_HOST}
|
||||||
- MQTT_PORT=${MQTT_PORT:-1883}
|
- MQTT_PORT=${MQTT_PORT:-1883}
|
||||||
- NODE_NAME=chelsty
|
- REDIS_HOST=${REDIS_HOST:-100.108.208.3}
|
||||||
|
- REDIS_PORT=${REDIS_PORT:-6379}
|
||||||
|
- REDIS_ENABLED=${REDIS_ENABLED:-true}
|
||||||
|
- NODE_NAME=${NODE_NAME:-chelsty}
|
||||||
healthcheck:
|
healthcheck:
|
||||||
test: ["CMD", "/bin/sh", "/app/healthcheck.sh"]
|
test: ["CMD", "/bin/sh", "/app/healthcheck.sh"]
|
||||||
interval: 1m
|
interval: 1m
|
||||||
|
|
|
||||||
|
|
@ -2,3 +2,7 @@ STABILITY_CHECK_INTERVAL=60
|
||||||
DISK_THRESHOLD_PCT=90
|
DISK_THRESHOLD_PCT=90
|
||||||
MQTT_HOST=mosquitto
|
MQTT_HOST=mosquitto
|
||||||
MQTT_PORT=1883
|
MQTT_PORT=1883
|
||||||
|
REDIS_HOST=100.108.208.3
|
||||||
|
REDIS_PORT=6379
|
||||||
|
REDIS_ENABLED=true
|
||||||
|
NODE_NAME=chelsty
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,9 @@ DISK_THRESHOLD_PCT = float(os.environ.get("DISK_THRESHOLD_PCT", "90.0"))
|
||||||
MQTT_HOST = os.environ.get("MQTT_HOST")
|
MQTT_HOST = os.environ.get("MQTT_HOST")
|
||||||
MQTT_PORT = int(os.environ.get("MQTT_PORT", "1883"))
|
MQTT_PORT = int(os.environ.get("MQTT_PORT", "1883"))
|
||||||
NODE_NAME = os.environ.get("NODE_NAME", "chelsty")
|
NODE_NAME = os.environ.get("NODE_NAME", "chelsty")
|
||||||
|
REDIS_HOST = os.environ.get("REDIS_HOST")
|
||||||
|
REDIS_PORT = int(os.environ.get("REDIS_PORT", "6379"))
|
||||||
|
REDIS_ENABLED = os.environ.get("REDIS_ENABLED", "true").lower() == "true" if REDIS_HOST else False
|
||||||
SOURCE = "stability-agent"
|
SOURCE = "stability-agent"
|
||||||
|
|
||||||
STATE_DIR = "/opt/homelab/state"
|
STATE_DIR = "/opt/homelab/state"
|
||||||
|
|
@ -27,9 +30,10 @@ def get_datestamp():
|
||||||
return datetime.datetime.utcnow().strftime("%Y-%m-%d")
|
return datetime.datetime.utcnow().strftime("%Y-%m-%d")
|
||||||
|
|
||||||
def emit_event(event_type, severity, message, service=None, details=None):
|
def emit_event(event_type, severity, message, service=None, details=None):
|
||||||
|
timestamp = get_timestamp()
|
||||||
event = {
|
event = {
|
||||||
"id": str(uuid.uuid4()),
|
"id": str(uuid.uuid4()),
|
||||||
"timestamp": get_timestamp(),
|
"timestamp": timestamp,
|
||||||
"node": NODE_NAME,
|
"node": NODE_NAME,
|
||||||
"source": SOURCE,
|
"source": SOURCE,
|
||||||
"type": event_type,
|
"type": event_type,
|
||||||
|
|
@ -50,6 +54,21 @@ def emit_event(event_type, severity, message, service=None, details=None):
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Failed to write event to filesystem: {e}")
|
print(f"Failed to write event to filesystem: {e}")
|
||||||
|
|
||||||
|
# Redis publishing
|
||||||
|
if REDIS_ENABLED and redis_client:
|
||||||
|
try:
|
||||||
|
redis_client.xadd("homelab:events", {
|
||||||
|
"node": NODE_NAME,
|
||||||
|
"type": event_type,
|
||||||
|
"severity": severity,
|
||||||
|
"timestamp": str(int(time.time())),
|
||||||
|
"message": message,
|
||||||
|
"details": json.dumps(details or {})
|
||||||
|
})
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Failed to publish event to Redis: {e}")
|
||||||
|
# Do not crash, already logged to filesystem
|
||||||
|
|
||||||
print(f"[{severity}] {message}")
|
print(f"[{severity}] {message}")
|
||||||
|
|
||||||
def check_disk():
|
def check_disk():
|
||||||
|
|
@ -157,6 +176,69 @@ def check_mqtt():
|
||||||
emit_event("mqtt_unreachable", "error", f"MQTT broker at {MQTT_HOST}:{MQTT_PORT} is unreachable", details={"error": str(e)})
|
emit_event("mqtt_unreachable", "error", f"MQTT broker at {MQTT_HOST}:{MQTT_PORT} is unreachable", details={"error": str(e)})
|
||||||
return {"configured": True, "reachable": False, "error": str(e)}
|
return {"configured": True, "reachable": False, "error": str(e)}
|
||||||
|
|
||||||
|
class RedisClient:
|
||||||
|
def __init__(self, host, port=6379):
|
||||||
|
self.host = host
|
||||||
|
self.port = port
|
||||||
|
self.sock = None
|
||||||
|
|
||||||
|
def _connect(self):
|
||||||
|
if self.sock:
|
||||||
|
try:
|
||||||
|
# Check if socket is still alive
|
||||||
|
self.sock.send(b"", socket.MSG_DONTWAIT)
|
||||||
|
return True
|
||||||
|
except (socket.error, AttributeError):
|
||||||
|
self.sock = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.sock = socket.create_connection((self.host, self.port), timeout=2)
|
||||||
|
self.sock.settimeout(2.0)
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
self.sock = None
|
||||||
|
print(f"Redis connection error: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _send_command(self, *args):
|
||||||
|
if not self._connect():
|
||||||
|
return False
|
||||||
|
|
||||||
|
# RESP array
|
||||||
|
cmd = f"*{len(args)}\r\n"
|
||||||
|
for arg in args:
|
||||||
|
s_arg = str(arg)
|
||||||
|
cmd += f"${len(s_arg.encode('utf-8'))}\r\n{s_arg}\r\n"
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.sock.sendall(cmd.encode('utf-8'))
|
||||||
|
# Basic response reading
|
||||||
|
resp = self.sock.recv(4096)
|
||||||
|
if resp.startswith(b"-"):
|
||||||
|
print(f"Redis error response: {resp.decode().strip()}")
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Redis send error: {e}")
|
||||||
|
if self.sock:
|
||||||
|
self.sock.close()
|
||||||
|
self.sock = None
|
||||||
|
return False
|
||||||
|
|
||||||
|
def hset(self, key, mapping):
|
||||||
|
args = ["HSET", key]
|
||||||
|
for k, v in mapping.items():
|
||||||
|
args.extend([k, v])
|
||||||
|
return self._send_command(*args)
|
||||||
|
|
||||||
|
def xadd(self, key, fields):
|
||||||
|
args = ["XADD", key, "*"]
|
||||||
|
for k, v in fields.items():
|
||||||
|
args.extend([k, v])
|
||||||
|
return self._send_command(*args)
|
||||||
|
|
||||||
|
redis_client = RedisClient(REDIS_HOST, REDIS_PORT) if REDIS_ENABLED else None
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
print(f"Starting stability-agent on {NODE_NAME}...")
|
print(f"Starting stability-agent on {NODE_NAME}...")
|
||||||
|
|
||||||
|
|
@ -200,6 +282,47 @@ def main():
|
||||||
with open(STATUS_FILE, "w") as f:
|
with open(STATUS_FILE, "w") as f:
|
||||||
json.dump(status, f, indent=2)
|
json.dump(status, f, indent=2)
|
||||||
|
|
||||||
|
# Redis publishing
|
||||||
|
if REDIS_ENABLED and redis_client:
|
||||||
|
try:
|
||||||
|
# Node state
|
||||||
|
node_health = "healthy"
|
||||||
|
for check in status["checks"].values():
|
||||||
|
if isinstance(check, dict) and check.get("status") == "error":
|
||||||
|
node_health = "unhealthy"
|
||||||
|
|
||||||
|
redis_client.hset(f"homelab:nodes:{NODE_NAME}", {
|
||||||
|
"id": NODE_NAME,
|
||||||
|
"hostname": socket.gethostname(),
|
||||||
|
"health": node_health,
|
||||||
|
"status": "online",
|
||||||
|
"last_seen": status["timestamp"],
|
||||||
|
"capabilities": json.dumps(["docker", "tailscale", "mqtt", "disk"]),
|
||||||
|
"checks": json.dumps(status["checks"])
|
||||||
|
})
|
||||||
|
|
||||||
|
# Services discovered from Docker
|
||||||
|
if status["checks"]["docker"]["status"] == "ok":
|
||||||
|
for c in status["checks"]["docker"]["containers"]:
|
||||||
|
service_name = c["name"]
|
||||||
|
service_health = "healthy" if c["state"] == "running" else "unhealthy"
|
||||||
|
|
||||||
|
redis_client.hset(f"homelab:services:{NODE_NAME}:{service_name}", {
|
||||||
|
"name": service_name,
|
||||||
|
"node": NODE_NAME,
|
||||||
|
"health": service_health,
|
||||||
|
"desired_state": "running",
|
||||||
|
"actual_state": c["state"],
|
||||||
|
"deployment_state": "deployed",
|
||||||
|
"updated_at": status["timestamp"],
|
||||||
|
"dependencies": json.dumps([]),
|
||||||
|
"recommendations": json.dumps([])
|
||||||
|
})
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Failed to publish to Redis: {e}")
|
||||||
|
# Local event for Redis error
|
||||||
|
emit_event("redis_publish_error", "warning", f"Failed to publish to Redis: {e}", details={"error": str(e)})
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error in main loop: {e}")
|
print(f"Error in main loop: {e}")
|
||||||
emit_event("agent_error", "error", f"Internal agent error: {e}", details={"error": str(e)})
|
emit_event("agent_error", "error", f"Internal agent error: {e}", details={"error": str(e)})
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue