fix(observer+operator-ui): fix stale world state, dict→list API, event time filter
Root cause of stale data: - node_agent.py falls back to socket.gethostname() when NODE_NAME is unset. Inside a Docker container this returns the 12-char container ID (e.g. 'be17cb6eb0f6'), not the host name. Observer ingested those events and created ghost entries in world/nodes.json that never expired. observer.py: - _prune_stale_world(): removes node/service/incident entries for nodes absent from topology inventory; called on every run_once() cycle (both new-events and idle paths). Resolved incidents older than 7 days are also aged out. - _save_world(): now writes node_count and service_count to runtime-summary.json so the Dashboard's System Overview cards show real numbers instead of undefined. operator_ui.py: - current_nodes/services/deployments/incidents(): the observer stores world state as keyed dicts; the frontend calls .map() which requires an array. All four functions now convert the dict to a properly-shaped list. Each item has the fields the Nodes, Services, Topology, Deployments, and Correlation views expect (hostname, health, capabilities, desired_state, dependencies, etc.). - current_incidents(): synthesises a human-readable 'message' field from node + service + trigger_type (observer does not store one; dashboard showed undefined). - current_events(): adds a 24 h time filter (EVENTS_MAX_AGE_HOURS env var, default 24). Without this, every event file ever written was returned, including events from ghost-node deploys. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
ae33cce889
commit
96bf32614f
|
|
@ -115,12 +115,52 @@ class Observer:
|
|||
except Exception as e:
|
||||
logger.error(f"Failed to save checkpoint: {e}")
|
||||
|
||||
def _prune_stale_world(self):
|
||||
"""Remove world-state entries for nodes absent from the topology inventory.
|
||||
|
||||
Root cause this guards against: when NODE_NAME env var is unset, node_agent.py
|
||||
falls back to socket.gethostname(), which inside a Docker container returns the
|
||||
12-char hex container ID (e.g. 'be17cb6eb0f6') instead of the canonical host name
|
||||
('vps'). The observer ingests those events and creates ghost entries that never
|
||||
expire on their own.
|
||||
|
||||
Also ages out resolved incidents older than 7 days to keep world state lean.
|
||||
"""
|
||||
known_nodes = set(self.inventory["nodes"].keys())
|
||||
if not known_nodes:
|
||||
# Inventory failed to load — don't prune to avoid wiping valid state.
|
||||
return
|
||||
|
||||
stale_nodes = [n for n in list(self.world_state["nodes"].keys())
|
||||
if n not in known_nodes]
|
||||
for n in stale_nodes:
|
||||
logger.info(f"Pruning stale node from world state: {n}")
|
||||
del self.world_state["nodes"][n]
|
||||
|
||||
stale_svcs = [k for k in list(self.world_state["services"].keys())
|
||||
if k.split("/")[0] in stale_nodes]
|
||||
for k in stale_svcs:
|
||||
logger.info(f"Pruning stale service from world state: {k}")
|
||||
del self.world_state["services"][k]
|
||||
|
||||
# Remove resolved incidents older than 7 days.
|
||||
now = time.time()
|
||||
stale_incidents = [
|
||||
k for k, v in self.world_state["incidents"].items()
|
||||
if v.get("status") == "resolved"
|
||||
and (now - (v.get("resolved_at") or now)) > 7 * 86400
|
||||
]
|
||||
for k in stale_incidents:
|
||||
del self.world_state["incidents"][k]
|
||||
|
||||
def _save_world(self):
|
||||
self.world_state["summary"]["last_update"] = datetime.now(timezone.utc).isoformat()
|
||||
active_incidents = [
|
||||
k for k, v in self.world_state["incidents"].items() if v.get("status") == "active"
|
||||
]
|
||||
self.world_state["summary"]["active_incidents_count"] = len(active_incidents)
|
||||
self.world_state["summary"]["node_count"] = len(self.world_state["nodes"])
|
||||
self.world_state["summary"]["service_count"] = len(self.world_state["services"])
|
||||
|
||||
if active_incidents:
|
||||
self.world_state["summary"]["status"] = "degraded"
|
||||
|
|
@ -320,7 +360,8 @@ class Observer:
|
|||
new_files = event_files
|
||||
|
||||
if not new_files:
|
||||
# Even if no new events, we update freshness of summary
|
||||
# Even if no new events, prune stale entries and refresh summary freshness.
|
||||
self._prune_stale_world()
|
||||
self._save_world()
|
||||
return
|
||||
|
||||
|
|
@ -335,6 +376,7 @@ class Observer:
|
|||
logger.error(f"Error processing {file_path}: {e}")
|
||||
|
||||
self._save_checkpoint()
|
||||
self._prune_stale_world()
|
||||
self._save_world()
|
||||
|
||||
def loop(self, interval=5):
|
||||
|
|
|
|||
|
|
@ -47,20 +47,116 @@ def save_config(config):
|
|||
(STATE_DIR / "operator-config.json").write_text(json.dumps(config, indent=2))
|
||||
|
||||
|
||||
EVENTS_MAX_AGE_HOURS = int(os.getenv("EVENTS_MAX_AGE_HOURS", "24"))
|
||||
|
||||
|
||||
def _node_health(info):
|
||||
status = info.get("status", "unknown")
|
||||
if status == "offline":
|
||||
return "error"
|
||||
if info.get("disk_pressure") == "high":
|
||||
return "degraded"
|
||||
if status == "online":
|
||||
return "nominal"
|
||||
return status
|
||||
|
||||
|
||||
def current_nodes():
|
||||
return read_json_file(WORLD_DIR / "nodes.json")
|
||||
"""Return nodes as a list of dicts shaped for the UI.
|
||||
|
||||
The observer stores nodes as a keyed dict {node_name: {...}}. The frontend
|
||||
calls .map() which requires an array, so we convert here rather than change
|
||||
the on-disk format (which the supervisor also reads).
|
||||
"""
|
||||
raw = read_json_file(WORLD_DIR / "nodes.json", default={})
|
||||
if isinstance(raw, list):
|
||||
return raw
|
||||
result = []
|
||||
for name, info in raw.items():
|
||||
result.append({
|
||||
"id": name,
|
||||
"hostname": name,
|
||||
"health": _node_health(info),
|
||||
"status": info.get("status", "unknown"),
|
||||
"capabilities": info.get("roles", []),
|
||||
"connectivity": "tailscale",
|
||||
"incidents": 0,
|
||||
"last_seen": info.get("last_seen"),
|
||||
"disk_usage_pct": info.get("disk_usage_pct"),
|
||||
"mem_usage_pct": info.get("mem_usage_pct"),
|
||||
"cpu_usage_pct": info.get("cpu_usage_pct"),
|
||||
"disk_pressure": info.get("disk_pressure"),
|
||||
})
|
||||
return result
|
||||
|
||||
|
||||
def current_services():
|
||||
return read_json_file(WORLD_DIR / "services.json")
|
||||
"""Return services as a list of dicts shaped for the UI.
|
||||
|
||||
Observer stores services as {"node/service": {...}}. Converted to a list
|
||||
with the fields the services and topology views expect.
|
||||
"""
|
||||
raw = read_json_file(WORLD_DIR / "services.json", default={})
|
||||
if isinstance(raw, list):
|
||||
return raw
|
||||
result = []
|
||||
for key, info in raw.items():
|
||||
svc_status = info.get("status", "unknown")
|
||||
result.append({
|
||||
"id": key,
|
||||
"name": info.get("service", key),
|
||||
"node": info.get("node", ""),
|
||||
"health": ("nominal" if svc_status == "healthy"
|
||||
else ("error" if svc_status == "unhealthy"
|
||||
else svc_status)),
|
||||
"desired_state": "running",
|
||||
"actual_state": svc_status,
|
||||
"deployment_state": "deployed",
|
||||
"dependencies": [],
|
||||
"recommendations": [],
|
||||
"last_check": info.get("last_check"),
|
||||
"incident_id": info.get("incident_id"),
|
||||
})
|
||||
return result
|
||||
|
||||
|
||||
def current_deployments():
|
||||
return read_json_file(WORLD_DIR / "deployments.json")
|
||||
"""Return deployments as a list sorted newest-first."""
|
||||
raw = read_json_file(WORLD_DIR / "deployments.json", default={})
|
||||
if isinstance(raw, list):
|
||||
return raw
|
||||
result = []
|
||||
for dep_id, info in raw.items():
|
||||
result.append({
|
||||
"id": dep_id,
|
||||
"service": info.get("service", ""),
|
||||
"node": info.get("node", ""),
|
||||
"status": info.get("status", "unknown"),
|
||||
"stage": info.get("status", "unknown"),
|
||||
"diagnostics": info.get("last_error", ""),
|
||||
"resumable": info.get("status") == "failed",
|
||||
"started_at": info.get("started_at"),
|
||||
"finished_at": info.get("finished_at"),
|
||||
})
|
||||
return sorted(result, key=lambda x: x.get("started_at") or 0, reverse=True)
|
||||
|
||||
|
||||
def current_incidents():
|
||||
return read_json_file(WORLD_DIR / "incidents.json")
|
||||
"""Return incidents as a list sorted most-recent-first."""
|
||||
raw = read_json_file(WORLD_DIR / "incidents.json", default={})
|
||||
if isinstance(raw, list):
|
||||
return raw
|
||||
result = []
|
||||
for inc in raw.values():
|
||||
# Synthesise a human-readable message if not stored (observer doesn't set one).
|
||||
if "message" not in inc:
|
||||
inc = dict(inc)
|
||||
inc["message"] = (
|
||||
f"{inc.get('service', '?')} on {inc.get('node', '?')} "
|
||||
f"is {inc.get('trigger_type', 'unhealthy')}"
|
||||
)
|
||||
result.append(inc)
|
||||
return sorted(result, key=lambda x: x.get("last_occurrence") or 0, reverse=True)
|
||||
|
||||
|
||||
def current_recommendations():
|
||||
|
|
@ -88,15 +184,22 @@ def current_summary():
|
|||
|
||||
|
||||
def current_events():
|
||||
"""Return recent events as a list sorted newest-first.
|
||||
|
||||
Reads individual *.json event files from EVENTS_DIR. Without a time filter
|
||||
this would return every event file ever written (including events from ghost
|
||||
nodes created before NODE_NAME was configured). We cap at EVENTS_MAX_AGE_HOURS
|
||||
(default 24 h) to keep the Events view responsive and stale-free.
|
||||
"""
|
||||
events = []
|
||||
cutoff = time.time() - EVENTS_MAX_AGE_HOURS * 3600
|
||||
if EVENTS_DIR.exists():
|
||||
for f in EVENTS_DIR.glob("**/*.json"):
|
||||
data = read_json_file(f)
|
||||
if data:
|
||||
# Add source file for traceability
|
||||
if data and (data.get("timestamp") or 0) > cutoff:
|
||||
data["_source"] = f.name
|
||||
events.append(data)
|
||||
return sorted(events, key=lambda x: x.get("timestamp", ""), reverse=True)
|
||||
return sorted(events, key=lambda x: x.get("timestamp") or 0, reverse=True)
|
||||
|
||||
|
||||
def current_actions():
|
||||
|
|
|
|||
Loading…
Reference in a new issue