homelab-codex-ws/services/agent-system/runtime-materializer/materializer.py

252 lines
9 KiB
Python
Raw Normal View History

import redis
import json
import os
import time
import argparse
import urllib.request
import urllib.error
from datetime import datetime
# Configuration from environment variables
REDIS_HOST = os.environ.get("REDIS_HOST", "redis")
REDIS_PORT = int(os.environ.get("REDIS_PORT", 6379))
WORLD_DIR = os.environ.get("WORLD_DIR", "/opt/homelab/world")
# When set, materialize from the control-plane HTTP API instead of Redis.
# This is the authoritative source of truth: the observer writes clean world
# state to the control-plane API, which the materializer mirrors locally so
# the webui's /snapshot (and all other endpoints) reflect the same data.
#
# Example: CONTROL_PLANE_URL=http://100.95.58.48:18180
CONTROL_PLANE_URL = os.environ.get("CONTROL_PLANE_URL", "").rstrip("/")
def get_redis_client():
"""Returns a Redis client with decoding enabled."""
return redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
decode_responses=True,
socket_timeout=5
)
def safe_json_loads(data, default=None):
"""Safely loads JSON from a string."""
if not data:
return default
try:
if isinstance(data, (dict, list)):
return data
return json.loads(data)
except (json.JSONDecodeError, TypeError):
return data
def normalize_health(health):
"""Normalizes health values for the UI."""
if not health:
return "nominal"
h = str(health).lower()
if h in ["healthy", "ok", "running", "nominal"]:
return "nominal"
if h in ["degraded", "warning"]:
return "degraded"
return "error"
def _fetch_json(url):
"""Fetch JSON from a URL, returning parsed data or None on error."""
try:
with urllib.request.urlopen(url, timeout=10) as resp:
return json.loads(resp.read())
except Exception as e:
print(f"[{datetime.now().isoformat()}] Error fetching {url}: {e}")
return None
def write_json(filename, data):
path = os.path.join(WORLD_DIR, filename)
with open(path, "w") as f:
json.dump(data, f, indent=2)
def materialize_from_api():
"""Mirror world state from the control-plane API to local world files.
The control-plane observer on VPS is the single authoritative writer of
world state. By fetching from its HTTP API we get the same clean, pruned
data that the /summary endpoint serves no stale Redis artefacts.
Returns True if all fetches succeeded and files were written, False otherwise.
"""
print(f"[{datetime.now().isoformat()}] Materializing from control-plane API: {CONTROL_PLANE_URL}")
endpoints = {
"nodes.json": f"{CONTROL_PLANE_URL}/nodes",
"services.json": f"{CONTROL_PLANE_URL}/services",
"incidents.json": f"{CONTROL_PLANE_URL}/incidents",
"deployments.json": f"{CONTROL_PLANE_URL}/deployments",
"recommendations.json":f"{CONTROL_PLANE_URL}/recommendations",
"runtime-summary.json":f"{CONTROL_PLANE_URL}/summary",
"events.json": f"{CONTROL_PLANE_URL}/events",
}
fetched = {}
for filename, url in endpoints.items():
data = _fetch_json(url)
if data is None:
print(f"[{datetime.now().isoformat()}] Aborting: failed to fetch {url}")
return False
fetched[filename] = data
os.makedirs(WORLD_DIR, exist_ok=True)
for filename, data in fetched.items():
write_json(filename, data)
svc_count = len(fetched.get("services.json") or [])
print(f"[{datetime.now().isoformat()}] Materialized from API: {svc_count} services → {WORLD_DIR}")
return True
def materialize():
"""Reads state from Redis and writes JSON files to the world directory."""
print(f"[{datetime.now().isoformat()}] Materializing world state...")
try:
r = get_redis_client()
# 1. Nodes
nodes = []
node_keys = r.keys("homelab:nodes:*")
for key in node_keys:
node_data = r.hgetall(key)
if node_data:
# Normalize health
if "health" in node_data:
node_data["health"] = normalize_health(node_data["health"])
# Parse JSON fields if they exist
if "capabilities" in node_data:
node_data["capabilities"] = safe_json_loads(node_data["capabilities"], [])
if "checks" in node_data:
node_data["checks"] = safe_json_loads(node_data["checks"], {})
nodes.append(node_data)
# 2. Services
services = []
service_keys = r.keys("homelab:services:*")
for key in service_keys:
svc_data = r.hgetall(key)
if svc_data:
# Normalize health
if "health" in svc_data:
svc_data["health"] = normalize_health(svc_data["health"])
if "dependencies" in svc_data:
svc_data["dependencies"] = safe_json_loads(svc_data["dependencies"], [])
if "recommendations" in svc_data:
svc_data["recommendations"] = safe_json_loads(svc_data["recommendations"], [])
services.append(svc_data)
# 3. Events (Stream)
events = []
try:
# Get last 100 events from the stream
raw_events = r.xrevrange("homelab:events", count=100)
for event_id, data in raw_events:
event = data.copy()
event["id"] = event_id
if "details" in event:
event["details"] = safe_json_loads(event["details"], {})
events.append(event)
except redis.exceptions.ResponseError:
# homelab:events might not be a stream or doesn't exist
pass
# 4. Incidents (Hash)
incidents = []
incident_keys = r.keys("homelab:incidents:*")
for key in incident_keys:
incident_data = r.hgetall(key)
if incident_data:
# Normalize health if present
if "health" in incident_data:
incident_data["health"] = normalize_health(incident_data["health"])
incidents.append(incident_data)
# 5. Deployments (Hash)
deployments = []
deployment_keys = r.keys("homelab:deployments:*")
for key in deployment_keys:
dep_data = r.hgetall(key)
if dep_data:
deployments.append(dep_data)
# 6. Recommendations (Hash)
recommendations = []
recommendation_keys = r.keys("homelab:recommendations:*")
for key in recommendation_keys:
rec_data = r.hgetall(key)
if rec_data:
recommendations.append(rec_data)
# 7. Runtime Summary
unhealthy_services = [s for s in services if s.get("health") != "nominal"]
active_incidents = [i for i in incidents if i.get("status") not in ["resolved", "closed"]]
status = "nominal"
if len(active_incidents) > 0 or len(unhealthy_services) > 5:
status = "error"
elif len(unhealthy_services) > 0:
status = "degraded"
summary = {
"status": status,
"timestamp": datetime.utcnow().isoformat() + "Z",
"last_update": int(time.time()),
"node_count": len(nodes),
"service_count": len(services),
"active_incidents_count": len(active_incidents),
"unhealthy_services_count": len(unhealthy_services),
"incident_count": len(incidents),
"recent_events_count": len(events),
"stale": False
}
# Ensure directory exists
os.makedirs(WORLD_DIR, exist_ok=True)
write_json("runtime-summary.json", summary)
write_json("nodes.json", nodes)
write_json("services.json", services)
write_json("incidents.json", incidents)
write_json("events.json", events)
write_json("deployments.json", deployments)
write_json("recommendations.json", recommendations)
print(f"[{datetime.now().isoformat()}] Successfully materialized to {WORLD_DIR}")
except redis.exceptions.ConnectionError as e:
print(f"Redis connection error: {e}")
except Exception as e:
print(f"Unexpected error during materialization: {e}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Homelab Runtime Materializer")
parser.add_argument("--once", action="store_true", help="Run once and exit")
parser.add_argument("--interval", type=int, default=30, help="Sleep interval between runs (seconds)")
args = parser.parse_args()
if CONTROL_PLANE_URL:
print(f"Mode: control-plane API ({CONTROL_PLANE_URL})")
run_fn = materialize_from_api
else:
print(f"Mode: Redis ({REDIS_HOST}:{REDIS_PORT})")
run_fn = materialize
interval = int(os.environ.get("MATERIALIZE_INTERVAL", args.interval))
if args.once:
run_fn()
else:
print(f"Starting materializer loop (interval: {interval}s)...")
while True:
run_fn()
time.sleep(interval)