import heapq import json import os import re import time from datetime import datetime from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path STATE_DIR = Path(os.getenv("HOMELAB_STATE_ROOT", "/opt/homelab/state")) EVENTS_DIR = Path(os.getenv("HOMELAB_EVENTS_ROOT", "/opt/homelab/events")) WORLD_DIR = Path(os.getenv("HOMELAB_WORLD_ROOT", "/opt/homelab/world")) ACTIONS_DIR = Path(os.getenv("HOMELAB_ACTIONS_ROOT", "/opt/homelab/actions")) CONFIG_DIR = Path(os.getenv("HOMELAB_CONFIG_ROOT", "/opt/homelab/config")) STATIC_DIR = Path(__file__).parent _EVENT_TS_RE = re.compile(r"-(\d{9,11})-") DEFAULT_CONFIG = { "operator_mode": "approval", "auto_mode": True, "action_thresholds": { "restart_ha": 0.8, "check_network": 0.9, }, "default_threshold": 0.9, "allowed_auto_actions": ["restart_ha"], } def read_json_file(path, default=None): if not path.exists(): return default if default is not None else [] try: return json.loads(path.read_text()) except Exception: return default if default is not None else [] def get_config(): config_path = STATE_DIR / "operator-config.json" if config_path.exists(): return read_json_file(config_path, DEFAULT_CONFIG) return DEFAULT_CONFIG def save_config(config): STATE_DIR.mkdir(parents=True, exist_ok=True) (STATE_DIR / "operator-config.json").write_text(json.dumps(config, indent=2)) EVENTS_MAX_AGE_HOURS = int(os.getenv("EVENTS_MAX_AGE_HOURS", "24")) EVENTS_MAX_COUNT = int(os.getenv("EVENTS_MAX_COUNT", "200")) def _node_health(info): status = info.get("status", "unknown") if status == "offline": return "error" if info.get("disk_pressure") == "high": return "degraded" if status == "online": return "nominal" return status def current_nodes(): """Return nodes as a list of dicts shaped for the UI. The observer stores nodes as a keyed dict {node_name: {...}}. The frontend calls .map() which requires an array, so we convert here rather than change the on-disk format (which the supervisor also reads). """ raw = read_json_file(WORLD_DIR / "nodes.json", default={}) if isinstance(raw, list): return raw result = [] for name, info in raw.items(): result.append({ "id": name, "hostname": name, "health": _node_health(info), "status": info.get("status", "unknown"), "capabilities": info.get("roles", []), "connectivity": "tailscale", "incidents": 0, "last_seen": info.get("last_seen"), "disk_usage_pct": info.get("disk_usage_pct"), "mem_usage_pct": info.get("mem_usage_pct"), "cpu_usage_pct": info.get("cpu_usage_pct"), "disk_pressure": info.get("disk_pressure"), }) return result def current_services(): """Return services as a list of dicts shaped for the UI. Observer stores services as {"node/service": {...}}. Converted to a list with the fields the services and topology views expect. """ raw = read_json_file(WORLD_DIR / "services.json", default={}) if isinstance(raw, list): return raw result = [] for key, info in raw.items(): svc_status = info.get("status", "unknown") result.append({ "id": key, "name": info.get("service", key), "node": info.get("node", ""), "health": ("nominal" if svc_status == "healthy" else ("error" if svc_status == "unhealthy" else svc_status)), "desired_state": "running", "actual_state": svc_status, "deployment_state": "deployed", "dependencies": [], "recommendations": [], "last_check": info.get("last_check"), "incident_id": info.get("incident_id"), }) return result def current_deployments(): """Return deployments as a list sorted newest-first.""" raw = read_json_file(WORLD_DIR / "deployments.json", default={}) if isinstance(raw, list): return raw result = [] for dep_id, info in raw.items(): result.append({ "id": dep_id, "service": info.get("service", ""), "node": info.get("node", ""), "status": info.get("status", "unknown"), "stage": info.get("status", "unknown"), "diagnostics": info.get("last_error", ""), "resumable": info.get("status") == "failed", "started_at": info.get("started_at"), "finished_at": info.get("finished_at"), }) return sorted(result, key=lambda x: x.get("started_at") or 0, reverse=True) def current_incidents(): """Return incidents as a list sorted most-recent-first.""" raw = read_json_file(WORLD_DIR / "incidents.json", default={}) if isinstance(raw, list): return raw result = [] for inc in raw.values(): # Synthesise a human-readable message if not stored (observer doesn't set one). if "message" not in inc: inc = dict(inc) inc["message"] = ( f"{inc.get('service', '?')} on {inc.get('node', '?')} " f"is {inc.get('trigger_type', 'unhealthy')}" ) result.append(inc) return sorted(result, key=lambda x: x.get("last_occurrence") or 0, reverse=True) def current_recommendations(): return read_json_file(WORLD_DIR / "recommendations.json") def current_summary(): path = WORLD_DIR / "runtime-summary.json" summary = read_json_file(path, default={}) if summary: last_update_val = summary.get("last_update") if last_update_val: try: if isinstance(last_update_val, str): last_update = datetime.fromisoformat(last_update_val.replace('Z', '+00:00')).timestamp() else: last_update = float(last_update_val) except Exception: last_update = os.path.getmtime(path) else: last_update = os.path.getmtime(path) summary["last_update"] = last_update summary["stale"] = (time.time() - last_update) > 60 return summary def _event_file_ts(p: Path) -> int: """Extract epoch timestamp from event filename: evt----.json""" m = _EVENT_TS_RE.search(p.stem) return int(m.group(1)) if m else 0 def current_events(): """Return the EVENTS_MAX_COUNT most-recent events, sorted newest-first. Event files are named evt----.json. The directory can contain hundreds of thousands of files (one file per event, written by node-agent). Loading every file on each request causes catastrophic RSS growth — 242 k files ≈ 420 MB of Python objects + 100 MB JSON serialisation. Fix: use heapq.nlargest to stream through file paths (O(N_files) time, O(EVENTS_MAX_COUNT) memory), extracting the epoch from the filename without opening any file. Only the winning EVENTS_MAX_COUNT files are then read. """ if not EVENTS_DIR.exists(): return [] cutoff = time.time() - EVENTS_MAX_AGE_HOURS * 3600 # Stream all paths through a max-heap — never materialises the full list. candidates = heapq.nlargest( EVENTS_MAX_COUNT, EVENTS_DIR.glob("**/*.json"), key=_event_file_ts, ) events = [] for f in candidates: data = read_json_file(f) if data and (data.get("timestamp") or 0) > cutoff: data["_source"] = f.name events.append(data) return sorted(events, key=lambda x: x.get("timestamp") or 0, reverse=True) def current_actions(): actions = {} statuses = ["pending", "approved", "running", "completed", "failed", "rejected"] for status in statuses: actions[status] = [] status_dir = ACTIONS_DIR / status if status_dir.exists(): for f in status_dir.glob("*.json"): data = read_json_file(f) if data: # Injects some metadata for UI data["id"] = data.get("action_id") or f.stem data["status"] = status actions[status].append(data) return actions def mutate_action(action_id, target_status): statuses = ["pending", "approved", "running", "completed", "failed", "rejected"] if target_status not in statuses: return False, f"Invalid target status: {target_status}" # Find where the action is source_path = None current_status = None for status in statuses: p = ACTIONS_DIR / status / f"{action_id}.json" if p.exists(): source_path = p current_status = status break if not source_path: return False, f"Action {action_id} not found" target_dir = ACTIONS_DIR / target_status target_dir.mkdir(parents=True, exist_ok=True) target_path = target_dir / f"{action_id}.json" try: data = json.loads(source_path.read_text()) data["status"] = target_status data["updated_at"] = time.time() # Keep history of transitions history = data.get("transition_history", []) history.append({ "from": current_status, "to": target_status, "timestamp": time.time() }) data["transition_history"] = history target_path.write_text(json.dumps(data, indent=2)) if source_path != target_path: source_path.unlink() return True, "Success" except Exception as e: return False, str(e) def send_json(status, payload, handler): body = (json.dumps(payload) + "\n").encode("utf-8") handler.send_response(status) handler.send_header("Content-Type", "application/json") handler.send_header("Content-Length", str(len(body))) handler.end_headers() handler.wfile.write(body) class Handler(BaseHTTPRequestHandler): def do_GET(self): if self.path == "/config": send_json(200, get_config(), self) return if self.path == "/nodes": send_json(200, current_nodes(), self) return if self.path == "/services": send_json(200, current_services(), self) return if self.path == "/deployments": send_json(200, current_deployments(), self) return if self.path == "/incidents": send_json(200, current_incidents(), self) return if self.path == "/recommendations": send_json(200, current_recommendations(), self) return if self.path == "/summary": send_json(200, current_summary(), self) return if self.path == "/events": send_json(200, current_events(), self) return if self.path == "/actions": send_json(200, current_actions(), self) return if self.path in ("/", "/index.html"): body = (STATIC_DIR / "index.html").read_bytes() self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) return self.send_error(404) def do_POST(self): if self.path not in ( "/config", "/action/mutate", "/mode", ): self.send_error(404) return length = int(self.headers.get("Content-Length", "0")) raw_body = self.rfile.read(length).decode("utf-8") try: payload = json.loads(raw_body) except json.JSONDecodeError: self.send_error(400, "Invalid JSON") return if self.path == "/config": config = get_config() config.update(payload) save_config(config) send_json(200, {"status": "ok"}, self) return if self.path == "/mode": mode = payload.get("mode") if not mode: self.send_error(400, "mode is required") return config = get_config() config["operator_mode"] = mode save_config(config) send_json(200, {"status": "ok"}, self) return if self.path == "/action/mutate": action_id = payload.get("id") target = payload.get("status") if not action_id or not target: self.send_error(400, "id and status are required") return success, msg = mutate_action(action_id, target) if success: send_json(200, {"status": "ok"}, self) else: self.send_error(500, msg) return def log_message(self, format, *args): return class OperatorHTTPServer(ThreadingHTTPServer): # Use daemon threads so finished request threads do not accumulate in the # internal _threads list. ThreadingMixIn only tracks non-daemon threads # (for joining at server_close); with daemon_threads=True that list stays # empty, preventing unbounded growth of dead Thread objects over time. daemon_threads = True if __name__ == "__main__": # Ensure directories exist for d in [STATE_DIR, EVENTS_DIR, WORLD_DIR, ACTIONS_DIR, CONFIG_DIR]: d.mkdir(parents=True, exist_ok=True) for s in ["pending", "approved", "running", "completed", "failed", "rejected"]: (ACTIONS_DIR / s).mkdir(parents=True, exist_ok=True) port = int(os.getenv("PORT", "8080")) print(f"Operator Control Plane starting on 0.0.0.0:{port}") server = OperatorHTTPServer(("0.0.0.0", port), Handler) server.serve_forever()