2026-05-12 14:07:03 +02:00
|
|
|
import os
|
|
|
|
|
import json
|
|
|
|
|
import time
|
|
|
|
|
import glob
|
|
|
|
|
import logging
|
|
|
|
|
import yaml
|
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
|
|
|
|
# Constants and Paths
|
|
|
|
|
RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab")
|
|
|
|
|
EVENTS_DIR = Path(RUNTIME_PATH) / "events"
|
|
|
|
|
STATE_DIR = Path(RUNTIME_PATH) / "state"
|
|
|
|
|
LOGS_DIR = Path(RUNTIME_PATH) / "logs"
|
|
|
|
|
WORLD_DIR = Path(RUNTIME_PATH) / "world"
|
|
|
|
|
OBSERVER_STATE_FILE = STATE_DIR / "observer_checkpoint.json"
|
|
|
|
|
|
|
|
|
|
REPO_ROOT = Path(__file__).parent.parent.parent
|
|
|
|
|
INVENTORY_TOPOLOGY = REPO_ROOT / "inventory" / "topology.yaml"
|
|
|
|
|
|
|
|
|
|
# Logging setup
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
|
logger = logging.getLogger("observer")
|
|
|
|
|
|
|
|
|
|
class Observer:
|
|
|
|
|
def __init__(self):
|
2026-05-27 14:16:58 +02:00
|
|
|
# Per-node-directory checkpoint: {"vps": "last/file/path", "piha": "last/file/path"}
|
|
|
|
|
# Replaces the old single last_processed_file which silently skipped event dirs
|
|
|
|
|
# that sort alphabetically before the checkpoint (e.g. piha/ < vps/).
|
|
|
|
|
self.node_checkpoints: dict = {}
|
2026-05-12 14:07:03 +02:00
|
|
|
self.world_state = {
|
|
|
|
|
"nodes": {},
|
|
|
|
|
"services": {},
|
|
|
|
|
"deployments": {},
|
|
|
|
|
"incidents": {},
|
|
|
|
|
"summary": {
|
2026-05-12 20:59:46 +02:00
|
|
|
"last_update": datetime.now(timezone.utc).isoformat(),
|
2026-05-12 14:07:03 +02:00
|
|
|
"status": "initializing",
|
|
|
|
|
"active_incidents_count": 0
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
self.inventory = self._load_inventory()
|
|
|
|
|
self._ensure_dirs()
|
|
|
|
|
self._load_checkpoint()
|
|
|
|
|
|
|
|
|
|
def _ensure_dirs(self):
|
|
|
|
|
WORLD_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
STATE_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
EVENTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
LOGS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
|
|
def _load_inventory(self):
|
|
|
|
|
inventory = {"nodes": {}, "services": {}}
|
|
|
|
|
try:
|
|
|
|
|
if INVENTORY_TOPOLOGY.exists():
|
|
|
|
|
with open(INVENTORY_TOPOLOGY, "r") as f:
|
|
|
|
|
topo = yaml.safe_load(f)
|
|
|
|
|
for node_name, node_info in topo.get("nodes", {}).items():
|
|
|
|
|
inventory["nodes"][node_name] = {
|
|
|
|
|
"roles": node_info.get("roles", []),
|
|
|
|
|
"connectivity": node_info.get("connectivity", {})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# Load service assignments from hosts files
|
|
|
|
|
hosts_dir = REPO_ROOT / "hosts"
|
|
|
|
|
for host_dir in hosts_dir.iterdir():
|
|
|
|
|
if host_dir.is_dir():
|
|
|
|
|
svc_file = host_dir / "services.yaml"
|
|
|
|
|
if svc_file.exists():
|
|
|
|
|
with open(svc_file, "r") as f:
|
|
|
|
|
svc_data = yaml.safe_load(f)
|
|
|
|
|
host_name = svc_data.get("host")
|
|
|
|
|
for svc_name, svc_info in svc_data.get("services", {}).items():
|
|
|
|
|
if host_name not in inventory["services"]:
|
|
|
|
|
inventory["services"][host_name] = {}
|
|
|
|
|
inventory["services"][host_name][svc_name] = {
|
|
|
|
|
"role": svc_info.get("role"),
|
|
|
|
|
"exposure": svc_info.get("exposure")
|
|
|
|
|
}
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Failed to load inventory: {e}")
|
|
|
|
|
return inventory
|
|
|
|
|
|
|
|
|
|
def _load_checkpoint(self):
|
|
|
|
|
if OBSERVER_STATE_FILE.exists():
|
|
|
|
|
try:
|
|
|
|
|
with open(OBSERVER_STATE_FILE, "r") as f:
|
|
|
|
|
checkpoint = json.load(f)
|
2026-05-27 14:16:58 +02:00
|
|
|
|
|
|
|
|
if "node_checkpoints" in checkpoint:
|
|
|
|
|
# New format: per-directory checkpoints.
|
|
|
|
|
self.node_checkpoints = checkpoint["node_checkpoints"]
|
|
|
|
|
elif "last_processed_file" in checkpoint:
|
|
|
|
|
# Migrate old single-file checkpoint: extract node dir from path.
|
|
|
|
|
old = checkpoint["last_processed_file"]
|
|
|
|
|
if old:
|
|
|
|
|
try:
|
|
|
|
|
node_dir = Path(old).relative_to(EVENTS_DIR).parts[0]
|
|
|
|
|
self.node_checkpoints = {node_dir: old}
|
|
|
|
|
logger.info(f"Migrated old checkpoint → node_checkpoints: {self.node_checkpoints}")
|
|
|
|
|
except Exception:
|
|
|
|
|
pass # Bad path — start fresh
|
|
|
|
|
|
|
|
|
|
self._load_world_from_disk()
|
2026-05-12 14:07:03 +02:00
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Failed to load checkpoint: {e}")
|
|
|
|
|
|
|
|
|
|
def _load_world_from_disk(self):
|
|
|
|
|
# Optional: Load existing state to resume faster
|
|
|
|
|
files = {
|
|
|
|
|
"nodes": WORLD_DIR / "nodes.json",
|
|
|
|
|
"services": WORLD_DIR / "services.json",
|
|
|
|
|
"deployments": WORLD_DIR / "deployments.json",
|
|
|
|
|
"incidents": WORLD_DIR / "incidents.json",
|
|
|
|
|
"summary": WORLD_DIR / "runtime-summary.json"
|
|
|
|
|
}
|
|
|
|
|
for key, path in files.items():
|
|
|
|
|
if path.exists():
|
|
|
|
|
try:
|
|
|
|
|
with open(path, "r") as f:
|
|
|
|
|
self.world_state[key] = json.load(f)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Failed to load {key} state: {e}")
|
|
|
|
|
|
|
|
|
|
def _save_checkpoint(self):
|
|
|
|
|
try:
|
|
|
|
|
with open(OBSERVER_STATE_FILE, "w") as f:
|
2026-05-27 14:16:58 +02:00
|
|
|
json.dump({"node_checkpoints": self.node_checkpoints}, f, indent=2)
|
2026-05-12 14:07:03 +02:00
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Failed to save checkpoint: {e}")
|
|
|
|
|
|
fix(observer+operator-ui): fix stale world state, dict→list API, event time filter
Root cause of stale data:
- node_agent.py falls back to socket.gethostname() when NODE_NAME is unset.
Inside a Docker container this returns the 12-char container ID (e.g.
'be17cb6eb0f6'), not the host name. Observer ingested those events and
created ghost entries in world/nodes.json that never expired.
observer.py:
- _prune_stale_world(): removes node/service/incident entries for nodes absent
from topology inventory; called on every run_once() cycle (both new-events
and idle paths). Resolved incidents older than 7 days are also aged out.
- _save_world(): now writes node_count and service_count to runtime-summary.json
so the Dashboard's System Overview cards show real numbers instead of undefined.
operator_ui.py:
- current_nodes/services/deployments/incidents(): the observer stores world state
as keyed dicts; the frontend calls .map() which requires an array. All four
functions now convert the dict to a properly-shaped list. Each item has the
fields the Nodes, Services, Topology, Deployments, and Correlation views expect
(hostname, health, capabilities, desired_state, dependencies, etc.).
- current_incidents(): synthesises a human-readable 'message' field from node +
service + trigger_type (observer does not store one; dashboard showed undefined).
- current_events(): adds a 24 h time filter (EVENTS_MAX_AGE_HOURS env var,
default 24). Without this, every event file ever written was returned,
including events from ghost-node deploys.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-27 13:51:03 +02:00
|
|
|
def _prune_stale_world(self):
|
|
|
|
|
"""Remove world-state entries for nodes absent from the topology inventory.
|
|
|
|
|
|
|
|
|
|
Root cause this guards against: when NODE_NAME env var is unset, node_agent.py
|
|
|
|
|
falls back to socket.gethostname(), which inside a Docker container returns the
|
|
|
|
|
12-char hex container ID (e.g. 'be17cb6eb0f6') instead of the canonical host name
|
|
|
|
|
('vps'). The observer ingests those events and creates ghost entries that never
|
|
|
|
|
expire on their own.
|
|
|
|
|
|
|
|
|
|
Also ages out resolved incidents older than 7 days to keep world state lean.
|
|
|
|
|
"""
|
|
|
|
|
known_nodes = set(self.inventory["nodes"].keys())
|
|
|
|
|
if not known_nodes:
|
|
|
|
|
# Inventory failed to load — don't prune to avoid wiping valid state.
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
stale_nodes = [n for n in list(self.world_state["nodes"].keys())
|
|
|
|
|
if n not in known_nodes]
|
|
|
|
|
for n in stale_nodes:
|
|
|
|
|
logger.info(f"Pruning stale node from world state: {n}")
|
|
|
|
|
del self.world_state["nodes"][n]
|
|
|
|
|
|
|
|
|
|
stale_svcs = [k for k in list(self.world_state["services"].keys())
|
|
|
|
|
if k.split("/")[0] in stale_nodes]
|
|
|
|
|
for k in stale_svcs:
|
|
|
|
|
logger.info(f"Pruning stale service from world state: {k}")
|
|
|
|
|
del self.world_state["services"][k]
|
|
|
|
|
|
2026-05-27 15:41:13 +02:00
|
|
|
# Prune ghost service keys whose service-name portion is a hash-prefixed
|
|
|
|
|
# Docker stale-state artifact (e.g. "9e36297651e7_control-plane-observer").
|
|
|
|
|
# These are created when node-agent incorrectly uses c.name instead of the
|
|
|
|
|
# compose label, and accumulate on every container rebuild.
|
|
|
|
|
# Pattern: <node>/<12hexchars>_<real-name>
|
|
|
|
|
ghost_svcs = [
|
|
|
|
|
k for k in list(self.world_state["services"].keys())
|
|
|
|
|
if len(k.split("/", 1)) == 2
|
|
|
|
|
and len(k.split("/", 1)[1]) > 13
|
|
|
|
|
and k.split("/", 1)[1][12] == "_"
|
|
|
|
|
and all(ch in "0123456789abcdef" for ch in k.split("/", 1)[1][:12])
|
|
|
|
|
]
|
|
|
|
|
for k in ghost_svcs:
|
|
|
|
|
logger.info(f"Pruning ghost (hash-prefixed) service key from world state: {k}")
|
|
|
|
|
del self.world_state["services"][k]
|
|
|
|
|
|
fix(observer+operator-ui): fix stale world state, dict→list API, event time filter
Root cause of stale data:
- node_agent.py falls back to socket.gethostname() when NODE_NAME is unset.
Inside a Docker container this returns the 12-char container ID (e.g.
'be17cb6eb0f6'), not the host name. Observer ingested those events and
created ghost entries in world/nodes.json that never expired.
observer.py:
- _prune_stale_world(): removes node/service/incident entries for nodes absent
from topology inventory; called on every run_once() cycle (both new-events
and idle paths). Resolved incidents older than 7 days are also aged out.
- _save_world(): now writes node_count and service_count to runtime-summary.json
so the Dashboard's System Overview cards show real numbers instead of undefined.
operator_ui.py:
- current_nodes/services/deployments/incidents(): the observer stores world state
as keyed dicts; the frontend calls .map() which requires an array. All four
functions now convert the dict to a properly-shaped list. Each item has the
fields the Nodes, Services, Topology, Deployments, and Correlation views expect
(hostname, health, capabilities, desired_state, dependencies, etc.).
- current_incidents(): synthesises a human-readable 'message' field from node +
service + trigger_type (observer does not store one; dashboard showed undefined).
- current_events(): adds a 24 h time filter (EVENTS_MAX_AGE_HOURS env var,
default 24). Without this, every event file ever written was returned,
including events from ghost-node deploys.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-27 13:51:03 +02:00
|
|
|
# Remove resolved incidents older than 7 days.
|
|
|
|
|
now = time.time()
|
|
|
|
|
stale_incidents = [
|
|
|
|
|
k for k, v in self.world_state["incidents"].items()
|
|
|
|
|
if v.get("status") == "resolved"
|
|
|
|
|
and (now - (v.get("resolved_at") or now)) > 7 * 86400
|
|
|
|
|
]
|
|
|
|
|
for k in stale_incidents:
|
|
|
|
|
del self.world_state["incidents"][k]
|
|
|
|
|
|
2026-05-12 14:07:03 +02:00
|
|
|
def _save_world(self):
|
|
|
|
|
self.world_state["summary"]["last_update"] = datetime.now(timezone.utc).isoformat()
|
|
|
|
|
active_incidents = [
|
|
|
|
|
k for k, v in self.world_state["incidents"].items() if v.get("status") == "active"
|
|
|
|
|
]
|
|
|
|
|
self.world_state["summary"]["active_incidents_count"] = len(active_incidents)
|
fix(observer+operator-ui): fix stale world state, dict→list API, event time filter
Root cause of stale data:
- node_agent.py falls back to socket.gethostname() when NODE_NAME is unset.
Inside a Docker container this returns the 12-char container ID (e.g.
'be17cb6eb0f6'), not the host name. Observer ingested those events and
created ghost entries in world/nodes.json that never expired.
observer.py:
- _prune_stale_world(): removes node/service/incident entries for nodes absent
from topology inventory; called on every run_once() cycle (both new-events
and idle paths). Resolved incidents older than 7 days are also aged out.
- _save_world(): now writes node_count and service_count to runtime-summary.json
so the Dashboard's System Overview cards show real numbers instead of undefined.
operator_ui.py:
- current_nodes/services/deployments/incidents(): the observer stores world state
as keyed dicts; the frontend calls .map() which requires an array. All four
functions now convert the dict to a properly-shaped list. Each item has the
fields the Nodes, Services, Topology, Deployments, and Correlation views expect
(hostname, health, capabilities, desired_state, dependencies, etc.).
- current_incidents(): synthesises a human-readable 'message' field from node +
service + trigger_type (observer does not store one; dashboard showed undefined).
- current_events(): adds a 24 h time filter (EVENTS_MAX_AGE_HOURS env var,
default 24). Without this, every event file ever written was returned,
including events from ghost-node deploys.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-27 13:51:03 +02:00
|
|
|
self.world_state["summary"]["node_count"] = len(self.world_state["nodes"])
|
|
|
|
|
self.world_state["summary"]["service_count"] = len(self.world_state["services"])
|
|
|
|
|
|
2026-05-12 14:07:03 +02:00
|
|
|
if active_incidents:
|
|
|
|
|
self.world_state["summary"]["status"] = "degraded"
|
|
|
|
|
else:
|
|
|
|
|
self.world_state["summary"]["status"] = "nominal"
|
|
|
|
|
|
|
|
|
|
files = {
|
|
|
|
|
"nodes.json": self.world_state["nodes"],
|
|
|
|
|
"services.json": self.world_state["services"],
|
|
|
|
|
"deployments.json": self.world_state["deployments"],
|
|
|
|
|
"incidents.json": self.world_state["incidents"],
|
2026-05-12 20:59:46 +02:00
|
|
|
"recommendations.json": [], # Placeholder to satisfy requirements
|
2026-05-12 14:07:03 +02:00
|
|
|
"runtime-summary.json": self.world_state["summary"]
|
|
|
|
|
}
|
|
|
|
|
for filename, data in files.items():
|
|
|
|
|
try:
|
|
|
|
|
with open(WORLD_DIR / filename, "w") as f:
|
|
|
|
|
json.dump(data, f, indent=2)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Failed to save {filename}: {e}")
|
|
|
|
|
|
|
|
|
|
def process_event(self, event):
|
|
|
|
|
etype = event.get("type")
|
|
|
|
|
node = event.get("node")
|
|
|
|
|
service = event.get("service")
|
|
|
|
|
severity = event.get("severity")
|
|
|
|
|
timestamp = event.get("timestamp")
|
|
|
|
|
cid = event.get("correlation_id")
|
|
|
|
|
payload = event.get("payload", {})
|
|
|
|
|
|
|
|
|
|
# 1. Update Node State
|
|
|
|
|
if node not in self.world_state["nodes"]:
|
|
|
|
|
self.world_state["nodes"][node] = {
|
|
|
|
|
"status": "unknown",
|
|
|
|
|
"last_seen": None,
|
|
|
|
|
"roles": self.inventory["nodes"].get(node, {}).get("roles", [])
|
|
|
|
|
}
|
|
|
|
|
self.world_state["nodes"][node]["last_seen"] = timestamp
|
feat(node-agent): implement health monitor and safe cleanup policy
scripts/monitor/health-monitor.sh (new):
- Standalone bash health monitor: disk/RAM/CPU checks + docker container health
- Per-node-type cleanup policy enforced:
lte_node (chelsty-infra, chelsty-ha): NO cleanup, no docker ops
sd_card (piha, saturn): dangling images + containers, rate-limited once/24h
ai_node (solaria): dangling + containers + build cache, NEVER -a
standard (vps): dangling + containers + build cache + CP filesystem rotation
- VPS filesystem rotation: completed/failed actions >7d, deploy logs >30d,
events >3d AND past observer checkpoint
- Emits structured JSON events (node_health, disk_pressure, high_memory, high_cpu,
containers_not_running, healthcheck_failed)
services/node-agent/ (new):
- Python daemon (node_agent.py): same policy as bash script, Docker SDK
for container checks and cleanup, /proc for system metrics
- Optional event shipping to VPS via rsync+SSH (VPS_EVENTS_HOST env var)
- Dockerfile: python:3.11-slim + openssh-client + rsync + docker>=6.0
- docker-compose.yml: mounts docker socket, /opt/homelab, repo read-only
observer.py:
- Handle node_health: update node status + disk/mem/cpu metrics, clear disk_pressure
- Handle disk_pressure: record severity on node, clear when healthy
- Handle high_memory / high_cpu: record pressure level for correlation
supervisor.py:
- Add NO_DISK_CLEANUP_NODES = {chelsty-infra, chelsty-ha}
- reconcile() step 3: generate disk_cleanup actions for nodes with high disk pressure
- _generate_disk_cleanup_recommendation(): stable ID disk-cleanup-{node},
checks all active states, risk=guarded (operator approval required)
executor.py:
- Handle disk_cleanup action type via _execute_disk_cleanup()
- Commands come from action payload; safety gate rejects any command touching
/opt/homelab/data/, /opt/homelab/config/, /opt/homelab/state/, or rm -rf /
hosts/*/services.yaml:
- Rename stability-agent -> node-agent on piha, vps, solaria, chelsty-infra
- Add node-agent to chelsty-ha (previously missing)
- Add cleanup policy notes to LTE node comments
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-27 13:15:06 +02:00
|
|
|
|
2026-05-12 14:07:03 +02:00
|
|
|
if etype == "node_online":
|
|
|
|
|
self.world_state["nodes"][node]["status"] = "online"
|
|
|
|
|
elif etype == "node_offline":
|
|
|
|
|
self.world_state["nodes"][node]["status"] = "offline"
|
|
|
|
|
|
feat(node-agent): implement health monitor and safe cleanup policy
scripts/monitor/health-monitor.sh (new):
- Standalone bash health monitor: disk/RAM/CPU checks + docker container health
- Per-node-type cleanup policy enforced:
lte_node (chelsty-infra, chelsty-ha): NO cleanup, no docker ops
sd_card (piha, saturn): dangling images + containers, rate-limited once/24h
ai_node (solaria): dangling + containers + build cache, NEVER -a
standard (vps): dangling + containers + build cache + CP filesystem rotation
- VPS filesystem rotation: completed/failed actions >7d, deploy logs >30d,
events >3d AND past observer checkpoint
- Emits structured JSON events (node_health, disk_pressure, high_memory, high_cpu,
containers_not_running, healthcheck_failed)
services/node-agent/ (new):
- Python daemon (node_agent.py): same policy as bash script, Docker SDK
for container checks and cleanup, /proc for system metrics
- Optional event shipping to VPS via rsync+SSH (VPS_EVENTS_HOST env var)
- Dockerfile: python:3.11-slim + openssh-client + rsync + docker>=6.0
- docker-compose.yml: mounts docker socket, /opt/homelab, repo read-only
observer.py:
- Handle node_health: update node status + disk/mem/cpu metrics, clear disk_pressure
- Handle disk_pressure: record severity on node, clear when healthy
- Handle high_memory / high_cpu: record pressure level for correlation
supervisor.py:
- Add NO_DISK_CLEANUP_NODES = {chelsty-infra, chelsty-ha}
- reconcile() step 3: generate disk_cleanup actions for nodes with high disk pressure
- _generate_disk_cleanup_recommendation(): stable ID disk-cleanup-{node},
checks all active states, risk=guarded (operator approval required)
executor.py:
- Handle disk_cleanup action type via _execute_disk_cleanup()
- Commands come from action payload; safety gate rejects any command touching
/opt/homelab/data/, /opt/homelab/config/, /opt/homelab/state/, or rm -rf /
hosts/*/services.yaml:
- Rename stability-agent -> node-agent on piha, vps, solaria, chelsty-infra
- Add node-agent to chelsty-ha (previously missing)
- Add cleanup policy notes to LTE node comments
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-27 13:15:06 +02:00
|
|
|
elif etype == "node_health":
|
|
|
|
|
# Regular heartbeat from node-agent; updates resource metrics.
|
|
|
|
|
# Clears disk_pressure if disk is now healthy (< warn threshold).
|
|
|
|
|
self.world_state["nodes"][node]["status"] = "online"
|
|
|
|
|
self.world_state["nodes"][node].update({
|
|
|
|
|
"disk_usage_pct": payload.get("disk_pct"),
|
|
|
|
|
"mem_usage_pct": payload.get("mem_pct"),
|
|
|
|
|
"cpu_usage_pct": payload.get("cpu_pct"),
|
|
|
|
|
})
|
|
|
|
|
if (payload.get("disk_pct") or 0) < 75:
|
|
|
|
|
self.world_state["nodes"][node].pop("disk_pressure", None)
|
|
|
|
|
|
|
|
|
|
elif etype == "disk_pressure":
|
|
|
|
|
# Emitted when disk usage crosses 75 % (medium) or 85 % (high).
|
|
|
|
|
# The supervisor reads disk_pressure to generate disk_cleanup actions.
|
|
|
|
|
self.world_state["nodes"][node]["disk_pressure"] = severity
|
|
|
|
|
self.world_state["nodes"][node]["disk_usage_pct"] = payload.get("usage_pct")
|
|
|
|
|
|
|
|
|
|
elif etype == "high_memory":
|
|
|
|
|
# Memory pressure observation; recorded on the node for correlation.
|
|
|
|
|
# No automated action — operator decides if a container restart helps.
|
|
|
|
|
self.world_state["nodes"][node]["memory_pressure"] = severity
|
|
|
|
|
self.world_state["nodes"][node]["mem_usage_pct"] = payload.get("usage_pct")
|
|
|
|
|
|
|
|
|
|
elif etype == "high_cpu":
|
|
|
|
|
# CPU pressure observation; recorded for visibility.
|
|
|
|
|
self.world_state["nodes"][node]["cpu_pressure"] = severity
|
|
|
|
|
self.world_state["nodes"][node]["cpu_usage_pct"] = payload.get("usage_pct")
|
|
|
|
|
|
2026-05-12 14:07:03 +02:00
|
|
|
# 2. Update Service State
|
|
|
|
|
if service and service != "all":
|
|
|
|
|
svc_key = f"{node}/{service}"
|
|
|
|
|
if svc_key not in self.world_state["services"]:
|
|
|
|
|
self.world_state["services"][svc_key] = {
|
|
|
|
|
"node": node,
|
|
|
|
|
"service": service,
|
|
|
|
|
"status": "unknown",
|
|
|
|
|
"last_check": None,
|
|
|
|
|
"incident_id": None
|
|
|
|
|
}
|
|
|
|
|
self.world_state["services"][svc_key]["last_check"] = timestamp
|
|
|
|
|
|
|
|
|
|
if etype == "service_recovered":
|
|
|
|
|
self.world_state["services"][svc_key]["status"] = "healthy"
|
|
|
|
|
self._resolve_incident(svc_key, timestamp)
|
2026-05-27 14:49:56 +02:00
|
|
|
elif etype == "service_healthy":
|
|
|
|
|
# Positive confirmation from node-agent that a managed container
|
|
|
|
|
# is running. This keeps services.json populated so the supervisor
|
|
|
|
|
# can correctly detect drift (absent entry = never reported = unknown,
|
2026-05-27 15:20:19 +02:00
|
|
|
# not the same as confirmed missing).
|
|
|
|
|
# Also resolve any active incident — if a service that had been
|
|
|
|
|
# unhealthy/crashing is now confirmed healthy, the incident is over.
|
2026-05-27 14:49:56 +02:00
|
|
|
self.world_state["services"][svc_key]["status"] = "healthy"
|
2026-05-27 15:20:19 +02:00
|
|
|
self._resolve_incident(svc_key, timestamp)
|
2026-05-12 14:07:03 +02:00
|
|
|
elif etype in ["service_unhealthy", "healthcheck_failed"]:
|
|
|
|
|
self.world_state["services"][svc_key]["status"] = "unhealthy"
|
|
|
|
|
self._handle_incident(svc_key, event)
|
|
|
|
|
|
|
|
|
|
# 3. Update Deployment State
|
|
|
|
|
if etype.startswith("deployment_") and cid:
|
|
|
|
|
if cid not in self.world_state["deployments"]:
|
|
|
|
|
self.world_state["deployments"][cid] = {
|
|
|
|
|
"node": node,
|
|
|
|
|
"service": service,
|
|
|
|
|
"status": "unknown",
|
|
|
|
|
"started_at": None,
|
|
|
|
|
"finished_at": None,
|
|
|
|
|
"events": []
|
|
|
|
|
}
|
|
|
|
|
self.world_state["deployments"][cid]["events"].append({
|
|
|
|
|
"type": etype,
|
|
|
|
|
"timestamp": timestamp,
|
|
|
|
|
"payload": payload
|
|
|
|
|
})
|
|
|
|
|
if etype == "deployment_started":
|
|
|
|
|
self.world_state["deployments"][cid]["status"] = "in_progress"
|
|
|
|
|
self.world_state["deployments"][cid]["started_at"] = timestamp
|
|
|
|
|
elif etype == "deployment_completed":
|
|
|
|
|
self.world_state["deployments"][cid]["status"] = "completed"
|
|
|
|
|
self.world_state["deployments"][cid]["finished_at"] = timestamp
|
|
|
|
|
elif etype == "deployment_failed":
|
|
|
|
|
self.world_state["deployments"][cid]["status"] = "failed"
|
|
|
|
|
self.world_state["deployments"][cid]["finished_at"] = timestamp
|
|
|
|
|
# Deployment failure often creates an incident
|
|
|
|
|
self._handle_deployment_failure(event)
|
|
|
|
|
|
|
|
|
|
def _handle_incident(self, svc_key, event):
|
|
|
|
|
# Correlation: collapse repeated failures for the same service on the same node
|
|
|
|
|
active_incident = self.world_state["services"][svc_key].get("incident_id")
|
|
|
|
|
|
|
|
|
|
if active_incident and active_incident in self.world_state["incidents"]:
|
|
|
|
|
incident = self.world_state["incidents"][active_incident]
|
|
|
|
|
if incident["status"] == "active":
|
|
|
|
|
incident["last_occurrence"] = event["timestamp"]
|
|
|
|
|
incident["occurrence_count"] = incident.get("occurrence_count", 1) + 1
|
|
|
|
|
incident["events"].append(event["timestamp"])
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
# Create new incident
|
|
|
|
|
incident_id = f"inc-{int(time.time())}-{event.get('node')}-{event.get('service')}"
|
|
|
|
|
self.world_state["incidents"][incident_id] = {
|
|
|
|
|
"id": incident_id,
|
|
|
|
|
"node": event.get("node"),
|
|
|
|
|
"service": event.get("service"),
|
|
|
|
|
"status": "active",
|
|
|
|
|
"severity": event.get("severity"),
|
2026-05-27 12:42:03 +02:00
|
|
|
# trigger_type records the event type that opened this incident so that
|
|
|
|
|
# the supervisor can choose the appropriate remediation action
|
|
|
|
|
# (e.g. container_restart for containers_not_running / mqtt_unreachable
|
|
|
|
|
# vs. a full redeploy for other causes).
|
|
|
|
|
"trigger_type": event.get("type"),
|
2026-05-12 14:07:03 +02:00
|
|
|
"started_at": event.get("timestamp"),
|
|
|
|
|
"last_occurrence": event.get("timestamp"),
|
|
|
|
|
"occurrence_count": 1,
|
|
|
|
|
"events": [event["timestamp"]],
|
|
|
|
|
"correlation_id": event.get("correlation_id")
|
|
|
|
|
}
|
|
|
|
|
self.world_state["services"][svc_key]["incident_id"] = incident_id
|
|
|
|
|
|
|
|
|
|
def _resolve_incident(self, svc_key, timestamp):
|
|
|
|
|
incident_id = self.world_state["services"][svc_key].get("incident_id")
|
|
|
|
|
if incident_id and incident_id in self.world_state["incidents"]:
|
|
|
|
|
if self.world_state["incidents"][incident_id]["status"] == "active":
|
|
|
|
|
self.world_state["incidents"][incident_id]["status"] = "resolved"
|
|
|
|
|
self.world_state["incidents"][incident_id]["resolved_at"] = timestamp
|
|
|
|
|
self.world_state["services"][svc_key]["incident_id"] = None
|
|
|
|
|
|
|
|
|
|
def _handle_deployment_failure(self, event):
|
|
|
|
|
# Specific logic for deployment failures
|
|
|
|
|
svc_key = f"{event.get('node')}/{event.get('service')}"
|
|
|
|
|
self._handle_incident(svc_key, event)
|
|
|
|
|
|
|
|
|
|
# Link diagnostics if available in payload
|
|
|
|
|
incident_id = self.world_state["services"][svc_key].get("incident_id")
|
|
|
|
|
if incident_id and incident_id in self.world_state["incidents"]:
|
|
|
|
|
payload = event.get("payload", {})
|
|
|
|
|
if "diagnostics_file" in payload:
|
|
|
|
|
self.world_state["incidents"][incident_id]["diagnostics_ref"] = payload["diagnostics_file"]
|
|
|
|
|
elif "error" in payload:
|
|
|
|
|
self.world_state["incidents"][incident_id]["last_error"] = payload["error"]
|
|
|
|
|
|
|
|
|
|
def run_once(self):
|
2026-05-12 20:59:46 +02:00
|
|
|
# Update heartbeat
|
|
|
|
|
heartbeat_file = STATE_DIR / "observer.heartbeat"
|
|
|
|
|
try:
|
|
|
|
|
heartbeat_file.touch()
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Failed to touch heartbeat file: {e}")
|
|
|
|
|
|
2026-05-27 14:16:58 +02:00
|
|
|
# Collect all event files grouped by node directory.
|
|
|
|
|
# Per-node checkpoints are compared within each directory independently,
|
|
|
|
|
# so late-arriving events from remote nodes (sorted earlier in the path)
|
|
|
|
|
# are never skipped just because another node's checkpoint is further ahead.
|
|
|
|
|
all_files = sorted(glob.glob(str(EVENTS_DIR / "**" / "*.json"), recursive=True))
|
|
|
|
|
|
2026-05-12 14:07:03 +02:00
|
|
|
new_files = []
|
2026-05-27 14:16:58 +02:00
|
|
|
for file_path in all_files:
|
2026-05-12 14:07:03 +02:00
|
|
|
try:
|
2026-05-27 14:16:58 +02:00
|
|
|
node_dir = str(Path(file_path).relative_to(EVENTS_DIR).parts[0])
|
|
|
|
|
except (IndexError, ValueError):
|
|
|
|
|
node_dir = "__unknown__"
|
|
|
|
|
last_for_node = self.node_checkpoints.get(node_dir, "")
|
|
|
|
|
if file_path > last_for_node:
|
|
|
|
|
new_files.append((node_dir, file_path))
|
2026-05-12 14:07:03 +02:00
|
|
|
|
|
|
|
|
if not new_files:
|
fix(observer+operator-ui): fix stale world state, dict→list API, event time filter
Root cause of stale data:
- node_agent.py falls back to socket.gethostname() when NODE_NAME is unset.
Inside a Docker container this returns the 12-char container ID (e.g.
'be17cb6eb0f6'), not the host name. Observer ingested those events and
created ghost entries in world/nodes.json that never expired.
observer.py:
- _prune_stale_world(): removes node/service/incident entries for nodes absent
from topology inventory; called on every run_once() cycle (both new-events
and idle paths). Resolved incidents older than 7 days are also aged out.
- _save_world(): now writes node_count and service_count to runtime-summary.json
so the Dashboard's System Overview cards show real numbers instead of undefined.
operator_ui.py:
- current_nodes/services/deployments/incidents(): the observer stores world state
as keyed dicts; the frontend calls .map() which requires an array. All four
functions now convert the dict to a properly-shaped list. Each item has the
fields the Nodes, Services, Topology, Deployments, and Correlation views expect
(hostname, health, capabilities, desired_state, dependencies, etc.).
- current_incidents(): synthesises a human-readable 'message' field from node +
service + trigger_type (observer does not store one; dashboard showed undefined).
- current_events(): adds a 24 h time filter (EVENTS_MAX_AGE_HOURS env var,
default 24). Without this, every event file ever written was returned,
including events from ghost-node deploys.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-27 13:51:03 +02:00
|
|
|
# Even if no new events, prune stale entries and refresh summary freshness.
|
|
|
|
|
self._prune_stale_world()
|
2026-05-12 20:59:46 +02:00
|
|
|
self._save_world()
|
2026-05-12 14:07:03 +02:00
|
|
|
return
|
|
|
|
|
|
2026-05-27 14:16:58 +02:00
|
|
|
logger.info(f"Processing {len(new_files)} new events across "
|
|
|
|
|
f"{len({n for n, _ in new_files})} node(s)")
|
|
|
|
|
for node_dir, file_path in new_files:
|
2026-05-12 14:07:03 +02:00
|
|
|
try:
|
|
|
|
|
with open(file_path, "r") as f:
|
|
|
|
|
event = json.load(f)
|
|
|
|
|
self.process_event(event)
|
2026-05-27 14:16:58 +02:00
|
|
|
# Advance per-node checkpoint (only forward — no regression).
|
|
|
|
|
if file_path > self.node_checkpoints.get(node_dir, ""):
|
|
|
|
|
self.node_checkpoints[node_dir] = file_path
|
2026-05-12 14:07:03 +02:00
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Error processing {file_path}: {e}")
|
|
|
|
|
|
|
|
|
|
self._save_checkpoint()
|
fix(observer+operator-ui): fix stale world state, dict→list API, event time filter
Root cause of stale data:
- node_agent.py falls back to socket.gethostname() when NODE_NAME is unset.
Inside a Docker container this returns the 12-char container ID (e.g.
'be17cb6eb0f6'), not the host name. Observer ingested those events and
created ghost entries in world/nodes.json that never expired.
observer.py:
- _prune_stale_world(): removes node/service/incident entries for nodes absent
from topology inventory; called on every run_once() cycle (both new-events
and idle paths). Resolved incidents older than 7 days are also aged out.
- _save_world(): now writes node_count and service_count to runtime-summary.json
so the Dashboard's System Overview cards show real numbers instead of undefined.
operator_ui.py:
- current_nodes/services/deployments/incidents(): the observer stores world state
as keyed dicts; the frontend calls .map() which requires an array. All four
functions now convert the dict to a properly-shaped list. Each item has the
fields the Nodes, Services, Topology, Deployments, and Correlation views expect
(hostname, health, capabilities, desired_state, dependencies, etc.).
- current_incidents(): synthesises a human-readable 'message' field from node +
service + trigger_type (observer does not store one; dashboard showed undefined).
- current_events(): adds a 24 h time filter (EVENTS_MAX_AGE_HOURS env var,
default 24). Without this, every event file ever written was returned,
including events from ghost-node deploys.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-27 13:51:03 +02:00
|
|
|
self._prune_stale_world()
|
2026-05-12 14:07:03 +02:00
|
|
|
self._save_world()
|
|
|
|
|
|
|
|
|
|
def loop(self, interval=5):
|
|
|
|
|
logger.info("Starting observer loop")
|
|
|
|
|
while True:
|
|
|
|
|
self.run_once()
|
|
|
|
|
time.sleep(interval)
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
import sys
|
|
|
|
|
observer = Observer()
|
|
|
|
|
if "--run-once" in sys.argv:
|
|
|
|
|
observer.run_once()
|
|
|
|
|
else:
|
|
|
|
|
observer.loop()
|