homelab-codex-ws/services/agent-system/runtime-materializer/materializer.py

182 lines
6.5 KiB
Python
Raw Permalink Normal View History

import redis
import json
import os
import time
import argparse
from datetime import datetime
# Configuration from environment variables
REDIS_HOST = os.environ.get("REDIS_HOST", "redis")
REDIS_PORT = int(os.environ.get("REDIS_PORT", 6379))
WORLD_DIR = os.environ.get("WORLD_DIR", "/opt/homelab/world")
def get_redis_client():
"""Returns a Redis client with decoding enabled."""
return redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
decode_responses=True,
socket_timeout=5
)
def safe_json_loads(data, default=None):
"""Safely loads JSON from a string."""
if not data:
return default
try:
if isinstance(data, (dict, list)):
return data
return json.loads(data)
except (json.JSONDecodeError, TypeError):
return data
def normalize_health(health):
"""Normalizes health values for the UI."""
if not health:
return "nominal"
h = str(health).lower()
if h in ["healthy", "ok", "running", "nominal"]:
return "nominal"
if h in ["degraded", "warning"]:
return "degraded"
return "error"
def materialize():
"""Reads state from Redis and writes JSON files to the world directory."""
print(f"[{datetime.now().isoformat()}] Materializing world state...")
try:
r = get_redis_client()
# 1. Nodes
nodes = []
node_keys = r.keys("homelab:nodes:*")
for key in node_keys:
node_data = r.hgetall(key)
if node_data:
# Normalize health
if "health" in node_data:
node_data["health"] = normalize_health(node_data["health"])
# Parse JSON fields if they exist
if "capabilities" in node_data:
node_data["capabilities"] = safe_json_loads(node_data["capabilities"], [])
if "checks" in node_data:
node_data["checks"] = safe_json_loads(node_data["checks"], {})
nodes.append(node_data)
# 2. Services
services = []
service_keys = r.keys("homelab:services:*")
for key in service_keys:
svc_data = r.hgetall(key)
if svc_data:
# Normalize health
if "health" in svc_data:
svc_data["health"] = normalize_health(svc_data["health"])
if "dependencies" in svc_data:
svc_data["dependencies"] = safe_json_loads(svc_data["dependencies"], [])
if "recommendations" in svc_data:
svc_data["recommendations"] = safe_json_loads(svc_data["recommendations"], [])
services.append(svc_data)
# 3. Events (Stream)
events = []
try:
# Get last 100 events from the stream
raw_events = r.xrevrange("homelab:events", count=100)
for event_id, data in raw_events:
event = data.copy()
event["id"] = event_id
if "details" in event:
event["details"] = safe_json_loads(event["details"], {})
events.append(event)
except redis.exceptions.ResponseError:
# homelab:events might not be a stream or doesn't exist
pass
# 4. Incidents (Hash)
incidents = []
incident_keys = r.keys("homelab:incidents:*")
for key in incident_keys:
incident_data = r.hgetall(key)
if incident_data:
# Normalize health if present
if "health" in incident_data:
incident_data["health"] = normalize_health(incident_data["health"])
incidents.append(incident_data)
# 5. Deployments (Hash)
deployments = []
deployment_keys = r.keys("homelab:deployments:*")
for key in deployment_keys:
dep_data = r.hgetall(key)
if dep_data:
deployments.append(dep_data)
# 6. Recommendations (Hash)
recommendations = []
recommendation_keys = r.keys("homelab:recommendations:*")
for key in recommendation_keys:
rec_data = r.hgetall(key)
if rec_data:
recommendations.append(rec_data)
# 7. Runtime Summary
unhealthy_services = [s for s in services if s.get("health") != "nominal"]
active_incidents = [i for i in incidents if i.get("status") not in ["resolved", "closed"]]
status = "nominal"
if len(active_incidents) > 0 or len(unhealthy_services) > 5:
status = "error"
elif len(unhealthy_services) > 0:
status = "degraded"
summary = {
"status": status,
"timestamp": datetime.utcnow().isoformat() + "Z",
"last_update": int(time.time()),
"node_count": len(nodes),
"service_count": len(services),
"active_incidents_count": len(active_incidents),
"unhealthy_services_count": len(unhealthy_services),
"incident_count": len(incidents),
"recent_events_count": len(events),
"stale": False
}
# Ensure directory exists
os.makedirs(WORLD_DIR, exist_ok=True)
def write_json(filename, data):
path = os.path.join(WORLD_DIR, filename)
with open(path, "w") as f:
json.dump(data, f, indent=2)
write_json("runtime-summary.json", summary)
write_json("nodes.json", nodes)
write_json("services.json", services)
write_json("incidents.json", incidents)
write_json("events.json", events)
write_json("deployments.json", deployments)
write_json("recommendations.json", recommendations)
print(f"[{datetime.now().isoformat()}] Successfully materialized to {WORLD_DIR}")
except redis.exceptions.ConnectionError as e:
print(f"Redis connection error: {e}")
except Exception as e:
print(f"Unexpected error during materialization: {e}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Homelab Runtime Materializer")
parser.add_argument("--once", action="store_true", help="Run once and exit")
parser.add_argument("--interval", type=int, default=30, help="Sleep interval between runs (seconds)")
args = parser.parse_args()
if args.once:
materialize()
else:
print(f"Starting materializer loop (interval: {args.interval}s)...")
while True:
materialize()
time.sleep(args.interval)