diff --git a/scripts/observer/observer.py b/scripts/observer/observer.py index 1b1b750..f4bcbe3 100644 --- a/scripts/observer/observer.py +++ b/scripts/observer/observer.py @@ -115,13 +115,53 @@ class Observer: except Exception as e: logger.error(f"Failed to save checkpoint: {e}") + def _prune_stale_world(self): + """Remove world-state entries for nodes absent from the topology inventory. + + Root cause this guards against: when NODE_NAME env var is unset, node_agent.py + falls back to socket.gethostname(), which inside a Docker container returns the + 12-char hex container ID (e.g. 'be17cb6eb0f6') instead of the canonical host name + ('vps'). The observer ingests those events and creates ghost entries that never + expire on their own. + + Also ages out resolved incidents older than 7 days to keep world state lean. + """ + known_nodes = set(self.inventory["nodes"].keys()) + if not known_nodes: + # Inventory failed to load — don't prune to avoid wiping valid state. + return + + stale_nodes = [n for n in list(self.world_state["nodes"].keys()) + if n not in known_nodes] + for n in stale_nodes: + logger.info(f"Pruning stale node from world state: {n}") + del self.world_state["nodes"][n] + + stale_svcs = [k for k in list(self.world_state["services"].keys()) + if k.split("/")[0] in stale_nodes] + for k in stale_svcs: + logger.info(f"Pruning stale service from world state: {k}") + del self.world_state["services"][k] + + # Remove resolved incidents older than 7 days. + now = time.time() + stale_incidents = [ + k for k, v in self.world_state["incidents"].items() + if v.get("status") == "resolved" + and (now - (v.get("resolved_at") or now)) > 7 * 86400 + ] + for k in stale_incidents: + del self.world_state["incidents"][k] + def _save_world(self): self.world_state["summary"]["last_update"] = datetime.now(timezone.utc).isoformat() active_incidents = [ k for k, v in self.world_state["incidents"].items() if v.get("status") == "active" ] self.world_state["summary"]["active_incidents_count"] = len(active_incidents) - + self.world_state["summary"]["node_count"] = len(self.world_state["nodes"]) + self.world_state["summary"]["service_count"] = len(self.world_state["services"]) + if active_incidents: self.world_state["summary"]["status"] = "degraded" else: @@ -320,7 +360,8 @@ class Observer: new_files = event_files if not new_files: - # Even if no new events, we update freshness of summary + # Even if no new events, prune stale entries and refresh summary freshness. + self._prune_stale_world() self._save_world() return @@ -335,6 +376,7 @@ class Observer: logger.error(f"Error processing {file_path}: {e}") self._save_checkpoint() + self._prune_stale_world() self._save_world() def loop(self, interval=5): diff --git a/services/control-plane/src/operator_ui.py b/services/control-plane/src/operator_ui.py index e1bdb7e..67ced90 100644 --- a/services/control-plane/src/operator_ui.py +++ b/services/control-plane/src/operator_ui.py @@ -47,20 +47,116 @@ def save_config(config): (STATE_DIR / "operator-config.json").write_text(json.dumps(config, indent=2)) +EVENTS_MAX_AGE_HOURS = int(os.getenv("EVENTS_MAX_AGE_HOURS", "24")) + + +def _node_health(info): + status = info.get("status", "unknown") + if status == "offline": + return "error" + if info.get("disk_pressure") == "high": + return "degraded" + if status == "online": + return "nominal" + return status + + def current_nodes(): - return read_json_file(WORLD_DIR / "nodes.json") + """Return nodes as a list of dicts shaped for the UI. + + The observer stores nodes as a keyed dict {node_name: {...}}. The frontend + calls .map() which requires an array, so we convert here rather than change + the on-disk format (which the supervisor also reads). + """ + raw = read_json_file(WORLD_DIR / "nodes.json", default={}) + if isinstance(raw, list): + return raw + result = [] + for name, info in raw.items(): + result.append({ + "id": name, + "hostname": name, + "health": _node_health(info), + "status": info.get("status", "unknown"), + "capabilities": info.get("roles", []), + "connectivity": "tailscale", + "incidents": 0, + "last_seen": info.get("last_seen"), + "disk_usage_pct": info.get("disk_usage_pct"), + "mem_usage_pct": info.get("mem_usage_pct"), + "cpu_usage_pct": info.get("cpu_usage_pct"), + "disk_pressure": info.get("disk_pressure"), + }) + return result def current_services(): - return read_json_file(WORLD_DIR / "services.json") + """Return services as a list of dicts shaped for the UI. + + Observer stores services as {"node/service": {...}}. Converted to a list + with the fields the services and topology views expect. + """ + raw = read_json_file(WORLD_DIR / "services.json", default={}) + if isinstance(raw, list): + return raw + result = [] + for key, info in raw.items(): + svc_status = info.get("status", "unknown") + result.append({ + "id": key, + "name": info.get("service", key), + "node": info.get("node", ""), + "health": ("nominal" if svc_status == "healthy" + else ("error" if svc_status == "unhealthy" + else svc_status)), + "desired_state": "running", + "actual_state": svc_status, + "deployment_state": "deployed", + "dependencies": [], + "recommendations": [], + "last_check": info.get("last_check"), + "incident_id": info.get("incident_id"), + }) + return result def current_deployments(): - return read_json_file(WORLD_DIR / "deployments.json") + """Return deployments as a list sorted newest-first.""" + raw = read_json_file(WORLD_DIR / "deployments.json", default={}) + if isinstance(raw, list): + return raw + result = [] + for dep_id, info in raw.items(): + result.append({ + "id": dep_id, + "service": info.get("service", ""), + "node": info.get("node", ""), + "status": info.get("status", "unknown"), + "stage": info.get("status", "unknown"), + "diagnostics": info.get("last_error", ""), + "resumable": info.get("status") == "failed", + "started_at": info.get("started_at"), + "finished_at": info.get("finished_at"), + }) + return sorted(result, key=lambda x: x.get("started_at") or 0, reverse=True) def current_incidents(): - return read_json_file(WORLD_DIR / "incidents.json") + """Return incidents as a list sorted most-recent-first.""" + raw = read_json_file(WORLD_DIR / "incidents.json", default={}) + if isinstance(raw, list): + return raw + result = [] + for inc in raw.values(): + # Synthesise a human-readable message if not stored (observer doesn't set one). + if "message" not in inc: + inc = dict(inc) + inc["message"] = ( + f"{inc.get('service', '?')} on {inc.get('node', '?')} " + f"is {inc.get('trigger_type', 'unhealthy')}" + ) + result.append(inc) + return sorted(result, key=lambda x: x.get("last_occurrence") or 0, reverse=True) def current_recommendations(): @@ -88,15 +184,22 @@ def current_summary(): def current_events(): + """Return recent events as a list sorted newest-first. + + Reads individual *.json event files from EVENTS_DIR. Without a time filter + this would return every event file ever written (including events from ghost + nodes created before NODE_NAME was configured). We cap at EVENTS_MAX_AGE_HOURS + (default 24 h) to keep the Events view responsive and stale-free. + """ events = [] + cutoff = time.time() - EVENTS_MAX_AGE_HOURS * 3600 if EVENTS_DIR.exists(): for f in EVENTS_DIR.glob("**/*.json"): data = read_json_file(f) - if data: - # Add source file for traceability + if data and (data.get("timestamp") or 0) > cutoff: data["_source"] = f.name events.append(data) - return sorted(events, key=lambda x: x.get("timestamp", ""), reverse=True) + return sorted(events, key=lambda x: x.get("timestamp") or 0, reverse=True) def current_actions():