import json import os import time from datetime import datetime from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path STATE_DIR = Path(os.getenv("HOMELAB_STATE_ROOT", "/opt/homelab/state")) EVENTS_DIR = Path(os.getenv("HOMELAB_EVENTS_ROOT", "/opt/homelab/events")) WORLD_DIR = Path(os.getenv("HOMELAB_WORLD_ROOT", "/opt/homelab/world")) ACTIONS_DIR = Path(os.getenv("HOMELAB_ACTIONS_ROOT", "/opt/homelab/actions")) CONFIG_DIR = Path(os.getenv("HOMELAB_CONFIG_ROOT", "/opt/homelab/config")) STATIC_DIR = Path(__file__).parent DEFAULT_CONFIG = { "operator_mode": "approval", "auto_mode": True, "action_thresholds": { "restart_ha": 0.8, "check_network": 0.9, }, "default_threshold": 0.9, "allowed_auto_actions": ["restart_ha"], } def read_json_file(path, default=None): if not path.exists(): return default if default is not None else [] try: return json.loads(path.read_text()) except Exception: return default if default is not None else [] def get_config(): config_path = STATE_DIR / "operator-config.json" if config_path.exists(): return read_json_file(config_path, DEFAULT_CONFIG) return DEFAULT_CONFIG def save_config(config): STATE_DIR.mkdir(parents=True, exist_ok=True) (STATE_DIR / "operator-config.json").write_text(json.dumps(config, indent=2)) def current_nodes(): return read_json_file(WORLD_DIR / "nodes.json") def current_services(): return read_json_file(WORLD_DIR / "services.json") def current_deployments(): return read_json_file(WORLD_DIR / "deployments.json") def current_incidents(): return read_json_file(WORLD_DIR / "incidents.json") def current_recommendations(): return read_json_file(WORLD_DIR / "recommendations.json") def current_summary(): summary = read_json_file(WORLD_DIR / "runtime-summary.json", default={}) if summary: # Check for staleness from the summary's own timestamp if available # otherwise use file mtime last_update_str = summary.get("last_update") if last_update_str: try: # Assuming ISO format from observer.py last_update = datetime.fromisoformat(last_update_str.replace('Z', '+00:00')).timestamp() except Exception: last_update = os.path.getmtime(WORLD_DIR / "runtime-summary.json") else: last_update = os.path.getmtime(WORLD_DIR / "runtime-summary.json") summary["last_update"] = last_update summary["stale"] = (time.time() - last_update) > 60 # Stale if older than 60s return summary def current_events(): events = [] if EVENTS_DIR.exists(): for f in EVENTS_DIR.glob("**/*.json"): data = read_json_file(f) if data: # Add source file for traceability data["_source"] = f.name events.append(data) return sorted(events, key=lambda x: x.get("timestamp", ""), reverse=True) def current_actions(): actions = {} statuses = ["pending", "approved", "running", "completed", "failed", "rejected"] for status in statuses: actions[status] = [] status_dir = ACTIONS_DIR / status if status_dir.exists(): for f in status_dir.glob("*.json"): data = read_json_file(f) if data: # Injects some metadata for UI data["id"] = data.get("action_id") or f.stem data["status"] = status actions[status].append(data) return actions def mutate_action(action_id, target_status): statuses = ["pending", "approved", "running", "completed", "failed", "rejected"] if target_status not in statuses: return False, f"Invalid target status: {target_status}" # Find where the action is source_path = None current_status = None for status in statuses: p = ACTIONS_DIR / status / f"{action_id}.json" if p.exists(): source_path = p current_status = status break if not source_path: return False, f"Action {action_id} not found" target_dir = ACTIONS_DIR / target_status target_dir.mkdir(parents=True, exist_ok=True) target_path = target_dir / f"{action_id}.json" try: data = json.loads(source_path.read_text()) data["status"] = target_status data["updated_at"] = time.time() # Keep history of transitions history = data.get("transition_history", []) history.append({ "from": current_status, "to": target_status, "timestamp": time.time() }) data["transition_history"] = history target_path.write_text(json.dumps(data, indent=2)) if source_path != target_path: source_path.unlink() return True, "Success" except Exception as e: return False, str(e) def send_json(status, payload, handler): body = (json.dumps(payload) + "\n").encode("utf-8") handler.send_response(status) handler.send_header("Content-Type", "application/json") handler.send_header("Content-Length", str(len(body))) handler.end_headers() handler.wfile.write(body) class Handler(BaseHTTPRequestHandler): def do_GET(self): if self.path == "/config": send_json(200, get_config(), self) return if self.path == "/nodes": send_json(200, current_nodes(), self) return if self.path == "/services": send_json(200, current_services(), self) return if self.path == "/deployments": send_json(200, current_deployments(), self) return if self.path == "/incidents": send_json(200, current_incidents(), self) return if self.path == "/recommendations": send_json(200, current_recommendations(), self) return if self.path == "/summary": send_json(200, current_summary(), self) return if self.path == "/events": send_json(200, current_events(), self) return if self.path == "/actions": send_json(200, current_actions(), self) return if self.path in ("/", "/index.html"): body = (STATIC_DIR / "index.html").read_bytes() self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) return self.send_error(404) def do_POST(self): if self.path not in ( "/config", "/action/mutate", "/mode", ): self.send_error(404) return length = int(self.headers.get("Content-Length", "0")) raw_body = self.rfile.read(length).decode("utf-8") try: payload = json.loads(raw_body) except json.JSONDecodeError: self.send_error(400, "Invalid JSON") return if self.path == "/config": config = get_config() config.update(payload) save_config(config) send_json(200, {"status": "ok"}, self) return if self.path == "/mode": mode = payload.get("mode") if not mode: self.send_error(400, "mode is required") return config = get_config() config["operator_mode"] = mode save_config(config) send_json(200, {"status": "ok"}, self) return if self.path == "/action/mutate": action_id = payload.get("id") target = payload.get("status") if not action_id or not target: self.send_error(400, "id and status are required") return success, msg = mutate_action(action_id, target) if success: send_json(200, {"status": "ok"}, self) else: self.send_error(500, msg) return def log_message(self, format, *args): return if __name__ == "__main__": # Ensure directories exist for d in [STATE_DIR, EVENTS_DIR, WORLD_DIR, ACTIONS_DIR, CONFIG_DIR]: d.mkdir(parents=True, exist_ok=True) for s in ["pending", "approved", "running", "completed", "failed", "rejected"]: (ACTIONS_DIR / s).mkdir(parents=True, exist_ok=True) port = int(os.getenv("PORT", "8080")) print(f"Operator Control Plane starting on 0.0.0.0:{port}") server = ThreadingHTTPServer(("0.0.0.0", port), Handler) server.serve_forever()