import json import os import socket from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path STATE_DIR = Path("/opt/homelab/state") EVENTS_DIR = Path("/opt/homelab/events") WORLD_DIR = Path("/opt/homelab/world") EVENT_LOG = Path("/tmp/agent-events.log") STATIC_DIR = Path(__file__).parent REDIS_HOST = os.getenv("REDIS_HOST", "redis") REDIS_PORT = int(os.getenv("REDIS_PORT", "6379")) DEFAULT_CONFIG = { "auto_mode": True, "action_thresholds": { "restart_ha": 0.8, "check_network": 0.9, }, "default_threshold": 0.9, "allowed_auto_actions": ["restart_ha"], } def tail_lines(path, limit): if not path.exists(): path.touch() with path.open("r", encoding="utf-8", errors="replace") as handle: lines = handle.readlines() print(f"Read {len(lines)} lines", flush=True) return [line.rstrip("\n") for line in lines[-limit:]] def redis_command(*parts): payload = f"*{len(parts)}\r\n".encode("utf-8") for part in parts: data = str(part).encode("utf-8") payload += f"${len(data)}\r\n".encode("utf-8") + data + b"\r\n" with socket.create_connection((REDIS_HOST, REDIS_PORT), timeout=3) as client: client.sendall(payload) return client.recv(4096) def send_command(command): task = { "target": "orchestrator", "action": "input", "params": { "command": command, }, } redis_command("LPUSH", "tasks", json.dumps(task)) def send_stop_run(run_id): task = { "target": "orchestrator", "action": "stop_run", "params": { "run_id": run_id, }, } redis_command("LPUSH", "tasks", json.dumps(task)) def send_run_action(run_id, command): task = { "target": "orchestrator", "action": "run_action", "params": { "run_id": run_id, "command": command, }, } redis_command("LPUSH", "tasks", json.dumps(task)) def send_auto_mode(auto_mode): task = { "target": "orchestrator", "action": "set_auto_mode", "params": { "auto_mode": auto_mode, }, } redis_command("LPUSH", "tasks", json.dumps(task)) def send_auto_config(config): task = { "target": "orchestrator", "action": "set_auto_config", "params": { "config": config, }, } redis_command("LPUSH", "tasks", json.dumps(task)) def send_event(event): redis_command("LPUSH", "events", json.dumps(event)) def read_json_file(path, default=None): if not path.exists(): return default if default is not None else [] try: return json.loads(path.read_text()) except Exception: return default if default is not None else [] def current_config(): config = json.loads(json.dumps(DEFAULT_CONFIG)) # We still keep reading from EVENT_LOG for auto_config if it's there for line in tail_lines(EVENT_LOG, 500): try: event = json.loads(line) except json.JSONDecodeError: continue if event.get("type") != "auto_config": continue for key in config: if key in event: config[key] = event[key] return config def current_nodes(): return read_json_file(STATE_DIR / "nodes.json") def current_services(): return read_json_file(STATE_DIR / "services.json") def current_deployments(): return read_json_file(STATE_DIR / "deployments.json") def current_incidents(): return read_json_file(STATE_DIR / "incidents.json") def current_recommendations(): return read_json_file(STATE_DIR / "recommendations.json") def current_summary(): return read_json_file(STATE_DIR / "runtime-summary.json", default={}) def current_events(): events = [] if EVENTS_DIR.exists(): for f in EVENTS_DIR.glob("*.json"): data = read_json_file(f) if data: events.append(data) return sorted(events, key=lambda x: x.get("timestamp", 0), reverse=True) def send_json(status, payload, handler): body = (json.dumps(payload) + "\n").encode("utf-8") handler.send_response(status) handler.send_header("Content-Type", "application/json") handler.send_header("Content-Length", str(len(body))) handler.end_headers() handler.wfile.write(body) class Handler(BaseHTTPRequestHandler): def do_GET(self): if self.path == "/config": send_json(200, current_config(), self) return if self.path == "/nodes": send_json(200, current_nodes(), self) return if self.path == "/services": send_json(200, current_services(), self) return if self.path == "/deployments": send_json(200, current_deployments(), self) return if self.path == "/incidents": send_json(200, current_incidents(), self) return if self.path == "/recommendations": send_json(200, current_recommendations(), self) return if self.path == "/summary": send_json(200, current_summary(), self) return if self.path == "/events": send_json(200, current_events(), self) return if self.path == "/logs": print("LOGS endpoint called", flush=True) body = ("\n".join(tail_lines(EVENT_LOG, 200)) + "\n").encode("utf-8") self.send_response(200) self.send_header("Content-Type", "text/plain; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) return if self.path in ("/", "/index.html"): body = (STATIC_DIR / "index.html").read_bytes() self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) return self.send_error(404) def do_POST(self): if self.path not in ( "/command", "/stop", "/action", "/auto-mode", "/config", "/events", ): self.send_error(404) return length = int(self.headers.get("Content-Length", "0")) raw_body = self.rfile.read(length).decode("utf-8") try: payload = json.loads(raw_body) command = str(payload.get("command", "")).strip() except json.JSONDecodeError: payload = {} command = raw_body.strip() if self.path == "/events": if not isinstance(payload, dict): self.send_error(400, "event object is required") return send_event(payload) send_json(200, {"status": "sent"}, self) return if self.path == "/stop": run_id = str(payload.get("run_id", "")).strip() if not run_id: self.send_error(400, "run_id is required") return send_stop_run(run_id) send_json(200, {"status": "sent"}, self) return if self.path == "/action": run_id = str(payload.get("run_id", "")).strip() action_command = str(payload.get("command", "")).strip() if not run_id: self.send_error(400, "run_id is required") return if not action_command: self.send_error(400, "command is required") return send_run_action(run_id, action_command) send_json(200, {"status": "sent"}, self) return if self.path == "/auto-mode": send_auto_mode(bool(payload.get("auto_mode"))) send_json(200, {"status": "sent"}, self) return if self.path == "/config": send_auto_config(payload) send_json(200, {"status": "sent"}, self) return if not command: self.send_error(400, "command is required") return send_command(command) send_json(200, {"status": "sent"}, self) def log_message(self, format, *args): return if __name__ == "__main__": server = ThreadingHTTPServer(("0.0.0.0", 8080), Handler) server.serve_forever()