operator_ui.py called .replace() on last_update without checking type — an integer value (written by the materializer) raised AttributeError and silently fell back to os.path.getmtime(), which was stuck at 5/29 after a deploy with preserved timestamps. web.py had the same class of bug but worse: it unconditionally replaced last_update with mtime, ignoring the JSON field entirely. Both now branch on isinstance(str) and cast numeric values directly to float, with mtime only as a last-resort fallback. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
302 lines
9 KiB
Python
302 lines
9 KiB
Python
import json
|
|
import os
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from pathlib import Path
|
|
|
|
|
|
STATE_DIR = Path(os.getenv("HOMELAB_STATE_ROOT", "/opt/homelab/state"))
|
|
EVENTS_DIR = Path(os.getenv("HOMELAB_EVENTS_ROOT", "/opt/homelab/events"))
|
|
WORLD_DIR = Path(os.getenv("HOMELAB_WORLD_ROOT", "/opt/homelab/world"))
|
|
ACTIONS_DIR = Path(os.getenv("HOMELAB_ACTIONS_ROOT", "/opt/homelab/actions"))
|
|
CONFIG_DIR = Path(os.getenv("HOMELAB_CONFIG_ROOT", "/opt/homelab/config"))
|
|
|
|
STATIC_DIR = Path(__file__).parent
|
|
|
|
DEFAULT_CONFIG = {
|
|
"operator_mode": "approval",
|
|
"auto_mode": True,
|
|
"action_thresholds": {
|
|
"restart_ha": 0.8,
|
|
"check_network": 0.9,
|
|
},
|
|
"default_threshold": 0.9,
|
|
"allowed_auto_actions": ["restart_ha"],
|
|
}
|
|
|
|
|
|
def read_json_file(path, default=None):
|
|
if not path.exists():
|
|
return default if default is not None else []
|
|
try:
|
|
return json.loads(path.read_text())
|
|
except Exception:
|
|
return default if default is not None else []
|
|
|
|
|
|
def get_config():
|
|
config_path = STATE_DIR / "operator-config.json"
|
|
if config_path.exists():
|
|
return read_json_file(config_path, DEFAULT_CONFIG)
|
|
return DEFAULT_CONFIG
|
|
|
|
|
|
def save_config(config):
|
|
STATE_DIR.mkdir(parents=True, exist_ok=True)
|
|
(STATE_DIR / "operator-config.json").write_text(json.dumps(config, indent=2))
|
|
|
|
|
|
def current_nodes():
|
|
return read_json_file(WORLD_DIR / "nodes.json")
|
|
|
|
|
|
def current_services():
|
|
return read_json_file(WORLD_DIR / "services.json")
|
|
|
|
|
|
def current_deployments():
|
|
return read_json_file(WORLD_DIR / "deployments.json")
|
|
|
|
|
|
def current_incidents():
|
|
return read_json_file(WORLD_DIR / "incidents.json")
|
|
|
|
|
|
def current_recommendations():
|
|
return read_json_file(WORLD_DIR / "recommendations.json")
|
|
|
|
|
|
def current_summary():
|
|
path = WORLD_DIR / "runtime-summary.json"
|
|
summary = read_json_file(path, default={})
|
|
if summary:
|
|
last_update_val = summary.get("last_update")
|
|
if last_update_val:
|
|
try:
|
|
if isinstance(last_update_val, str):
|
|
last_update = datetime.fromisoformat(last_update_val.replace('Z', '+00:00')).timestamp()
|
|
else:
|
|
last_update = float(last_update_val)
|
|
except Exception:
|
|
last_update = os.path.getmtime(path)
|
|
else:
|
|
last_update = os.path.getmtime(path)
|
|
summary["last_update"] = last_update
|
|
summary["stale"] = (time.time() - last_update) > 60
|
|
return summary
|
|
|
|
|
|
def current_events():
|
|
return read_json_file(WORLD_DIR / "events.json", default=[])
|
|
|
|
|
|
def current_actions():
|
|
actions = {}
|
|
statuses = ["pending", "approved", "running", "completed", "failed", "rejected"]
|
|
for status in statuses:
|
|
actions[status] = []
|
|
status_dir = ACTIONS_DIR / status
|
|
if status_dir.exists():
|
|
for f in status_dir.glob("*.json"):
|
|
data = read_json_file(f)
|
|
if data:
|
|
# Injects some metadata for UI
|
|
data["id"] = data.get("action_id") or f.stem
|
|
data["status"] = status
|
|
actions[status].append(data)
|
|
return actions
|
|
|
|
|
|
def mutate_action(action_id, target_status):
|
|
statuses = ["pending", "approved", "running", "completed", "failed", "rejected"]
|
|
if target_status not in statuses:
|
|
return False, f"Invalid target status: {target_status}"
|
|
|
|
# Find where the action is
|
|
source_path = None
|
|
current_status = None
|
|
for status in statuses:
|
|
p = ACTIONS_DIR / status / f"{action_id}.json"
|
|
if p.exists():
|
|
source_path = p
|
|
current_status = status
|
|
break
|
|
|
|
if not source_path:
|
|
return False, f"Action {action_id} not found"
|
|
|
|
target_dir = ACTIONS_DIR / target_status
|
|
target_dir.mkdir(parents=True, exist_ok=True)
|
|
target_path = target_dir / f"{action_id}.json"
|
|
|
|
try:
|
|
data = json.loads(source_path.read_text())
|
|
data["status"] = target_status
|
|
data["updated_at"] = time.time()
|
|
|
|
# Keep history of transitions
|
|
history = data.get("transition_history", [])
|
|
history.append({
|
|
"from": current_status,
|
|
"to": target_status,
|
|
"timestamp": time.time()
|
|
})
|
|
data["transition_history"] = history
|
|
|
|
target_path.write_text(json.dumps(data, indent=2))
|
|
if source_path != target_path:
|
|
source_path.unlink()
|
|
return True, "Success"
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
|
|
def get_snapshot():
|
|
nodes = current_nodes()
|
|
services = current_services()
|
|
incidents = current_incidents()
|
|
events = current_events()
|
|
summary = current_summary()
|
|
|
|
non_nominal = [s for s in services if s.get("health") != "nominal"]
|
|
nominal_count = len(services) - len(non_nominal)
|
|
|
|
return {
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"summary": summary,
|
|
"nodes": nodes,
|
|
"non_nominal_services": non_nominal,
|
|
"nominal_service_count": nominal_count,
|
|
"total_service_count": len(services),
|
|
"incidents": incidents,
|
|
"events": events[:10],
|
|
}
|
|
|
|
|
|
def send_json(status, payload, handler):
|
|
body = (json.dumps(payload) + "\n").encode("utf-8")
|
|
handler.send_response(status)
|
|
handler.send_header("Content-Type", "application/json")
|
|
handler.send_header("Content-Length", str(len(body)))
|
|
handler.end_headers()
|
|
handler.wfile.write(body)
|
|
|
|
|
|
class Handler(BaseHTTPRequestHandler):
|
|
def do_GET(self):
|
|
if self.path == "/config":
|
|
send_json(200, get_config(), self)
|
|
return
|
|
|
|
if self.path == "/nodes":
|
|
send_json(200, current_nodes(), self)
|
|
return
|
|
|
|
if self.path == "/services":
|
|
send_json(200, current_services(), self)
|
|
return
|
|
|
|
if self.path == "/deployments":
|
|
send_json(200, current_deployments(), self)
|
|
return
|
|
|
|
if self.path == "/incidents":
|
|
send_json(200, current_incidents(), self)
|
|
return
|
|
|
|
if self.path == "/recommendations":
|
|
send_json(200, current_recommendations(), self)
|
|
return
|
|
|
|
if self.path == "/summary":
|
|
send_json(200, current_summary(), self)
|
|
return
|
|
|
|
if self.path == "/events":
|
|
send_json(200, current_events(), self)
|
|
return
|
|
|
|
if self.path == "/actions":
|
|
send_json(200, current_actions(), self)
|
|
return
|
|
|
|
if self.path == "/snapshot":
|
|
send_json(200, get_snapshot(), self)
|
|
return
|
|
|
|
if self.path in ("/", "/index.html"):
|
|
body = (STATIC_DIR / "index.html").read_bytes()
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "text/html; charset=utf-8")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
return
|
|
|
|
self.send_error(404)
|
|
|
|
def do_POST(self):
|
|
if self.path not in (
|
|
"/config",
|
|
"/action/mutate",
|
|
"/mode",
|
|
):
|
|
self.send_error(404)
|
|
return
|
|
|
|
length = int(self.headers.get("Content-Length", "0"))
|
|
raw_body = self.rfile.read(length).decode("utf-8")
|
|
try:
|
|
payload = json.loads(raw_body)
|
|
except json.JSONDecodeError:
|
|
self.send_error(400, "Invalid JSON")
|
|
return
|
|
|
|
if self.path == "/config":
|
|
config = get_config()
|
|
config.update(payload)
|
|
save_config(config)
|
|
send_json(200, {"status": "ok"}, self)
|
|
return
|
|
|
|
if self.path == "/mode":
|
|
mode = payload.get("mode")
|
|
if not mode:
|
|
self.send_error(400, "mode is required")
|
|
return
|
|
config = get_config()
|
|
config["operator_mode"] = mode
|
|
save_config(config)
|
|
send_json(200, {"status": "ok"}, self)
|
|
return
|
|
|
|
if self.path == "/action/mutate":
|
|
action_id = payload.get("id")
|
|
target = payload.get("status")
|
|
if not action_id or not target:
|
|
self.send_error(400, "id and status are required")
|
|
return
|
|
success, msg = mutate_action(action_id, target)
|
|
if success:
|
|
send_json(200, {"status": "ok"}, self)
|
|
else:
|
|
self.send_error(500, msg)
|
|
return
|
|
|
|
def log_message(self, format, *args):
|
|
return
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Ensure directories exist
|
|
for d in [STATE_DIR, EVENTS_DIR, WORLD_DIR, ACTIONS_DIR, CONFIG_DIR]:
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
for s in ["pending", "approved", "running", "completed", "failed", "rejected"]:
|
|
(ACTIONS_DIR / s).mkdir(parents=True, exist_ok=True)
|
|
|
|
port = int(os.getenv("PORT", "8080"))
|
|
print(f"Operator Control Plane starting on 0.0.0.0:{port}")
|
|
server = ThreadingHTTPServer(("0.0.0.0", port), Handler)
|
|
server.serve_forever()
|