diff --git a/hosts/piha/runtime/agent-system/docker-compose.override.yml b/hosts/piha/runtime/agent-system/docker-compose.override.yml new file mode 100644 index 0000000..58ae42a --- /dev/null +++ b/hosts/piha/runtime/agent-system/docker-compose.override.yml @@ -0,0 +1,8 @@ +services: + runtime-materializer: + environment: + # Pull world state from the VPS control-plane API instead of local Redis. + # The observer on VPS is the authoritative writer; mirroring its API output + # here ensures the webui /snapshot matches the clean 97-service state that + # the control-plane /summary endpoint serves. + CONTROL_PLANE_URL: "http://100.95.58.48:18180" diff --git a/services/agent-system/runtime-materializer/materializer.py b/services/agent-system/runtime-materializer/materializer.py index cbf22ea..8b42b91 100644 --- a/services/agent-system/runtime-materializer/materializer.py +++ b/services/agent-system/runtime-materializer/materializer.py @@ -3,6 +3,8 @@ import json import os import time import argparse +import urllib.request +import urllib.error from datetime import datetime # Configuration from environment variables @@ -10,6 +12,15 @@ REDIS_HOST = os.environ.get("REDIS_HOST", "redis") REDIS_PORT = int(os.environ.get("REDIS_PORT", 6379)) WORLD_DIR = os.environ.get("WORLD_DIR", "/opt/homelab/world") +# When set, materialize from the control-plane HTTP API instead of Redis. +# This is the authoritative source of truth: the observer writes clean world +# state to the control-plane API, which the materializer mirrors locally so +# the webui's /snapshot (and all other endpoints) reflect the same data. +# +# Example: CONTROL_PLANE_URL=http://100.95.58.48:18180 +CONTROL_PLANE_URL = os.environ.get("CONTROL_PLANE_URL", "").rstrip("/") + + def get_redis_client(): """Returns a Redis client with decoding enabled.""" return redis.Redis( @@ -41,12 +52,67 @@ def normalize_health(health): return "degraded" return "error" + +def _fetch_json(url): + """Fetch JSON from a URL, returning parsed data or None on error.""" + try: + with urllib.request.urlopen(url, timeout=10) as resp: + return json.loads(resp.read()) + except Exception as e: + print(f"[{datetime.now().isoformat()}] Error fetching {url}: {e}") + return None + + +def write_json(filename, data): + path = os.path.join(WORLD_DIR, filename) + with open(path, "w") as f: + json.dump(data, f, indent=2) + + +def materialize_from_api(): + """Mirror world state from the control-plane API to local world files. + + The control-plane observer on VPS is the single authoritative writer of + world state. By fetching from its HTTP API we get the same clean, pruned + data that the /summary endpoint serves — no stale Redis artefacts. + + Returns True if all fetches succeeded and files were written, False otherwise. + """ + print(f"[{datetime.now().isoformat()}] Materializing from control-plane API: {CONTROL_PLANE_URL}") + + endpoints = { + "nodes.json": f"{CONTROL_PLANE_URL}/nodes", + "services.json": f"{CONTROL_PLANE_URL}/services", + "incidents.json": f"{CONTROL_PLANE_URL}/incidents", + "deployments.json": f"{CONTROL_PLANE_URL}/deployments", + "recommendations.json":f"{CONTROL_PLANE_URL}/recommendations", + "runtime-summary.json":f"{CONTROL_PLANE_URL}/summary", + "events.json": f"{CONTROL_PLANE_URL}/events", + } + + fetched = {} + for filename, url in endpoints.items(): + data = _fetch_json(url) + if data is None: + print(f"[{datetime.now().isoformat()}] Aborting: failed to fetch {url}") + return False + fetched[filename] = data + + os.makedirs(WORLD_DIR, exist_ok=True) + for filename, data in fetched.items(): + write_json(filename, data) + + svc_count = len(fetched.get("services.json") or []) + print(f"[{datetime.now().isoformat()}] Materialized from API: {svc_count} services → {WORLD_DIR}") + return True + + def materialize(): """Reads state from Redis and writes JSON files to the world directory.""" print(f"[{datetime.now().isoformat()}] Materializing world state...") try: r = get_redis_client() - + # 1. Nodes nodes = [] node_keys = r.keys("homelab:nodes:*") @@ -62,7 +128,7 @@ def materialize(): if "checks" in node_data: node_data["checks"] = safe_json_loads(node_data["checks"], {}) nodes.append(node_data) - + # 2. Services services = [] service_keys = r.keys("homelab:services:*") @@ -77,7 +143,7 @@ def materialize(): if "recommendations" in svc_data: svc_data["recommendations"] = safe_json_loads(svc_data["recommendations"], []) services.append(svc_data) - + # 3. Events (Stream) events = [] try: @@ -146,11 +212,6 @@ def materialize(): # Ensure directory exists os.makedirs(WORLD_DIR, exist_ok=True) - def write_json(filename, data): - path = os.path.join(WORLD_DIR, filename) - with open(path, "w") as f: - json.dump(data, f, indent=2) - write_json("runtime-summary.json", summary) write_json("nodes.json", nodes) write_json("services.json", services) @@ -158,7 +219,7 @@ def materialize(): write_json("events.json", events) write_json("deployments.json", deployments) write_json("recommendations.json", recommendations) - + print(f"[{datetime.now().isoformat()}] Successfully materialized to {WORLD_DIR}") except redis.exceptions.ConnectionError as e: @@ -172,10 +233,19 @@ if __name__ == "__main__": parser.add_argument("--interval", type=int, default=30, help="Sleep interval between runs (seconds)") args = parser.parse_args() - if args.once: - materialize() + if CONTROL_PLANE_URL: + print(f"Mode: control-plane API ({CONTROL_PLANE_URL})") + run_fn = materialize_from_api else: - print(f"Starting materializer loop (interval: {args.interval}s)...") + print(f"Mode: Redis ({REDIS_HOST}:{REDIS_PORT})") + run_fn = materialize + + interval = int(os.environ.get("MATERIALIZE_INTERVAL", args.interval)) + + if args.once: + run_fn() + else: + print(f"Starting materializer loop (interval: {interval}s)...") while True: - materialize() - time.sleep(args.interval) + run_fn() + time.sleep(interval)