From 495741e7ace172fd0546c5fee71286c27e74ec57 Mon Sep 17 00:00:00 2001 From: Oskar Kapala Date: Mon, 1 Jun 2026 16:34:52 +0200 Subject: [PATCH] operator-ui: /events bez ladowania calego katalogu + daemon threads; epoch z regexa (fix chelsty-infra) Co-Authored-By: Claude Sonnet 4.6 --- services/control-plane/src/operator_ui.py | 64 +++++++++++++++++------ 1 file changed, 49 insertions(+), 15 deletions(-) diff --git a/services/control-plane/src/operator_ui.py b/services/control-plane/src/operator_ui.py index ce5def3..cb3c322 100644 --- a/services/control-plane/src/operator_ui.py +++ b/services/control-plane/src/operator_ui.py @@ -1,5 +1,7 @@ +import heapq import json import os +import re import time from datetime import datetime from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer @@ -14,6 +16,8 @@ CONFIG_DIR = Path(os.getenv("HOMELAB_CONFIG_ROOT", "/opt/homelab/config")) STATIC_DIR = Path(__file__).parent +_EVENT_TS_RE = re.compile(r"-(\d{9,11})-") + DEFAULT_CONFIG = { "operator_mode": "approval", "auto_mode": True, @@ -48,6 +52,7 @@ def save_config(config): EVENTS_MAX_AGE_HOURS = int(os.getenv("EVENTS_MAX_AGE_HOURS", "24")) +EVENTS_MAX_COUNT = int(os.getenv("EVENTS_MAX_COUNT", "200")) def _node_health(info): @@ -183,22 +188,43 @@ def current_summary(): return summary -def current_events(): - """Return recent events as a list sorted newest-first. +def _event_file_ts(p: Path) -> int: + """Extract epoch timestamp from event filename: evt----.json""" + m = _EVENT_TS_RE.search(p.stem) + return int(m.group(1)) if m else 0 - Reads individual *.json event files from EVENTS_DIR. Without a time filter - this would return every event file ever written (including events from ghost - nodes created before NODE_NAME was configured). We cap at EVENTS_MAX_AGE_HOURS - (default 24 h) to keep the Events view responsive and stale-free. + +def current_events(): + """Return the EVENTS_MAX_COUNT most-recent events, sorted newest-first. + + Event files are named evt----.json. The directory + can contain hundreds of thousands of files (one file per event, written by + node-agent). Loading every file on each request causes catastrophic RSS + growth — 242 k files ≈ 420 MB of Python objects + 100 MB JSON serialisation. + + Fix: use heapq.nlargest to stream through file paths (O(N_files) time, + O(EVENTS_MAX_COUNT) memory), extracting the epoch from the filename without + opening any file. Only the winning EVENTS_MAX_COUNT files are then read. """ - events = [] + if not EVENTS_DIR.exists(): + return [] + cutoff = time.time() - EVENTS_MAX_AGE_HOURS * 3600 - if EVENTS_DIR.exists(): - for f in EVENTS_DIR.glob("**/*.json"): - data = read_json_file(f) - if data and (data.get("timestamp") or 0) > cutoff: - data["_source"] = f.name - events.append(data) + + # Stream all paths through a max-heap — never materialises the full list. + candidates = heapq.nlargest( + EVENTS_MAX_COUNT, + EVENTS_DIR.glob("**/*.json"), + key=_event_file_ts, + ) + + events = [] + for f in candidates: + data = read_json_file(f) + if data and (data.get("timestamp") or 0) > cutoff: + data["_source"] = f.name + events.append(data) + return sorted(events, key=lambda x: x.get("timestamp") or 0, reverse=True) @@ -373,14 +399,22 @@ class Handler(BaseHTTPRequestHandler): return +class OperatorHTTPServer(ThreadingHTTPServer): + # Use daemon threads so finished request threads do not accumulate in the + # internal _threads list. ThreadingMixIn only tracks non-daemon threads + # (for joining at server_close); with daemon_threads=True that list stays + # empty, preventing unbounded growth of dead Thread objects over time. + daemon_threads = True + + if __name__ == "__main__": # Ensure directories exist for d in [STATE_DIR, EVENTS_DIR, WORLD_DIR, ACTIONS_DIR, CONFIG_DIR]: d.mkdir(parents=True, exist_ok=True) for s in ["pending", "approved", "running", "completed", "failed", "rejected"]: (ACTIONS_DIR / s).mkdir(parents=True, exist_ok=True) - + port = int(os.getenv("PORT", "8080")) print(f"Operator Control Plane starting on 0.0.0.0:{port}") - server = ThreadingHTTPServer(("0.0.0.0", port), Handler) + server = OperatorHTTPServer(("0.0.0.0", port), Handler) server.serve_forever()