service_healthy is a positive health confirmation — if the service had an active incident (e.g. from earlier service_unhealthy events), that incident should be resolved when the service is confirmed healthy. Previously only service_recovered resolved incidents; service_healthy set status=healthy but left incidents open, keeping status='degraded'. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
423 lines
19 KiB
Python
423 lines
19 KiB
Python
import os
|
|
import json
|
|
import time
|
|
import glob
|
|
import logging
|
|
import yaml
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
# Constants and Paths
|
|
RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab")
|
|
EVENTS_DIR = Path(RUNTIME_PATH) / "events"
|
|
STATE_DIR = Path(RUNTIME_PATH) / "state"
|
|
LOGS_DIR = Path(RUNTIME_PATH) / "logs"
|
|
WORLD_DIR = Path(RUNTIME_PATH) / "world"
|
|
OBSERVER_STATE_FILE = STATE_DIR / "observer_checkpoint.json"
|
|
|
|
REPO_ROOT = Path(__file__).parent.parent.parent
|
|
INVENTORY_TOPOLOGY = REPO_ROOT / "inventory" / "topology.yaml"
|
|
|
|
# Logging setup
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger("observer")
|
|
|
|
class Observer:
|
|
def __init__(self):
|
|
# Per-node-directory checkpoint: {"vps": "last/file/path", "piha": "last/file/path"}
|
|
# Replaces the old single last_processed_file which silently skipped event dirs
|
|
# that sort alphabetically before the checkpoint (e.g. piha/ < vps/).
|
|
self.node_checkpoints: dict = {}
|
|
self.world_state = {
|
|
"nodes": {},
|
|
"services": {},
|
|
"deployments": {},
|
|
"incidents": {},
|
|
"summary": {
|
|
"last_update": datetime.now(timezone.utc).isoformat(),
|
|
"status": "initializing",
|
|
"active_incidents_count": 0
|
|
}
|
|
}
|
|
self.inventory = self._load_inventory()
|
|
self._ensure_dirs()
|
|
self._load_checkpoint()
|
|
|
|
def _ensure_dirs(self):
|
|
WORLD_DIR.mkdir(parents=True, exist_ok=True)
|
|
STATE_DIR.mkdir(parents=True, exist_ok=True)
|
|
EVENTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
LOGS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
def _load_inventory(self):
|
|
inventory = {"nodes": {}, "services": {}}
|
|
try:
|
|
if INVENTORY_TOPOLOGY.exists():
|
|
with open(INVENTORY_TOPOLOGY, "r") as f:
|
|
topo = yaml.safe_load(f)
|
|
for node_name, node_info in topo.get("nodes", {}).items():
|
|
inventory["nodes"][node_name] = {
|
|
"roles": node_info.get("roles", []),
|
|
"connectivity": node_info.get("connectivity", {})
|
|
}
|
|
|
|
# Load service assignments from hosts files
|
|
hosts_dir = REPO_ROOT / "hosts"
|
|
for host_dir in hosts_dir.iterdir():
|
|
if host_dir.is_dir():
|
|
svc_file = host_dir / "services.yaml"
|
|
if svc_file.exists():
|
|
with open(svc_file, "r") as f:
|
|
svc_data = yaml.safe_load(f)
|
|
host_name = svc_data.get("host")
|
|
for svc_name, svc_info in svc_data.get("services", {}).items():
|
|
if host_name not in inventory["services"]:
|
|
inventory["services"][host_name] = {}
|
|
inventory["services"][host_name][svc_name] = {
|
|
"role": svc_info.get("role"),
|
|
"exposure": svc_info.get("exposure")
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Failed to load inventory: {e}")
|
|
return inventory
|
|
|
|
def _load_checkpoint(self):
|
|
if OBSERVER_STATE_FILE.exists():
|
|
try:
|
|
with open(OBSERVER_STATE_FILE, "r") as f:
|
|
checkpoint = json.load(f)
|
|
|
|
if "node_checkpoints" in checkpoint:
|
|
# New format: per-directory checkpoints.
|
|
self.node_checkpoints = checkpoint["node_checkpoints"]
|
|
elif "last_processed_file" in checkpoint:
|
|
# Migrate old single-file checkpoint: extract node dir from path.
|
|
old = checkpoint["last_processed_file"]
|
|
if old:
|
|
try:
|
|
node_dir = Path(old).relative_to(EVENTS_DIR).parts[0]
|
|
self.node_checkpoints = {node_dir: old}
|
|
logger.info(f"Migrated old checkpoint → node_checkpoints: {self.node_checkpoints}")
|
|
except Exception:
|
|
pass # Bad path — start fresh
|
|
|
|
self._load_world_from_disk()
|
|
except Exception as e:
|
|
logger.error(f"Failed to load checkpoint: {e}")
|
|
|
|
def _load_world_from_disk(self):
|
|
# Optional: Load existing state to resume faster
|
|
files = {
|
|
"nodes": WORLD_DIR / "nodes.json",
|
|
"services": WORLD_DIR / "services.json",
|
|
"deployments": WORLD_DIR / "deployments.json",
|
|
"incidents": WORLD_DIR / "incidents.json",
|
|
"summary": WORLD_DIR / "runtime-summary.json"
|
|
}
|
|
for key, path in files.items():
|
|
if path.exists():
|
|
try:
|
|
with open(path, "r") as f:
|
|
self.world_state[key] = json.load(f)
|
|
except Exception as e:
|
|
logger.error(f"Failed to load {key} state: {e}")
|
|
|
|
def _save_checkpoint(self):
|
|
try:
|
|
with open(OBSERVER_STATE_FILE, "w") as f:
|
|
json.dump({"node_checkpoints": self.node_checkpoints}, f, indent=2)
|
|
except Exception as e:
|
|
logger.error(f"Failed to save checkpoint: {e}")
|
|
|
|
def _prune_stale_world(self):
|
|
"""Remove world-state entries for nodes absent from the topology inventory.
|
|
|
|
Root cause this guards against: when NODE_NAME env var is unset, node_agent.py
|
|
falls back to socket.gethostname(), which inside a Docker container returns the
|
|
12-char hex container ID (e.g. 'be17cb6eb0f6') instead of the canonical host name
|
|
('vps'). The observer ingests those events and creates ghost entries that never
|
|
expire on their own.
|
|
|
|
Also ages out resolved incidents older than 7 days to keep world state lean.
|
|
"""
|
|
known_nodes = set(self.inventory["nodes"].keys())
|
|
if not known_nodes:
|
|
# Inventory failed to load — don't prune to avoid wiping valid state.
|
|
return
|
|
|
|
stale_nodes = [n for n in list(self.world_state["nodes"].keys())
|
|
if n not in known_nodes]
|
|
for n in stale_nodes:
|
|
logger.info(f"Pruning stale node from world state: {n}")
|
|
del self.world_state["nodes"][n]
|
|
|
|
stale_svcs = [k for k in list(self.world_state["services"].keys())
|
|
if k.split("/")[0] in stale_nodes]
|
|
for k in stale_svcs:
|
|
logger.info(f"Pruning stale service from world state: {k}")
|
|
del self.world_state["services"][k]
|
|
|
|
# Remove resolved incidents older than 7 days.
|
|
now = time.time()
|
|
stale_incidents = [
|
|
k for k, v in self.world_state["incidents"].items()
|
|
if v.get("status") == "resolved"
|
|
and (now - (v.get("resolved_at") or now)) > 7 * 86400
|
|
]
|
|
for k in stale_incidents:
|
|
del self.world_state["incidents"][k]
|
|
|
|
def _save_world(self):
|
|
self.world_state["summary"]["last_update"] = datetime.now(timezone.utc).isoformat()
|
|
active_incidents = [
|
|
k for k, v in self.world_state["incidents"].items() if v.get("status") == "active"
|
|
]
|
|
self.world_state["summary"]["active_incidents_count"] = len(active_incidents)
|
|
self.world_state["summary"]["node_count"] = len(self.world_state["nodes"])
|
|
self.world_state["summary"]["service_count"] = len(self.world_state["services"])
|
|
|
|
if active_incidents:
|
|
self.world_state["summary"]["status"] = "degraded"
|
|
else:
|
|
self.world_state["summary"]["status"] = "nominal"
|
|
|
|
files = {
|
|
"nodes.json": self.world_state["nodes"],
|
|
"services.json": self.world_state["services"],
|
|
"deployments.json": self.world_state["deployments"],
|
|
"incidents.json": self.world_state["incidents"],
|
|
"recommendations.json": [], # Placeholder to satisfy requirements
|
|
"runtime-summary.json": self.world_state["summary"]
|
|
}
|
|
for filename, data in files.items():
|
|
try:
|
|
with open(WORLD_DIR / filename, "w") as f:
|
|
json.dump(data, f, indent=2)
|
|
except Exception as e:
|
|
logger.error(f"Failed to save {filename}: {e}")
|
|
|
|
def process_event(self, event):
|
|
etype = event.get("type")
|
|
node = event.get("node")
|
|
service = event.get("service")
|
|
severity = event.get("severity")
|
|
timestamp = event.get("timestamp")
|
|
cid = event.get("correlation_id")
|
|
payload = event.get("payload", {})
|
|
|
|
# 1. Update Node State
|
|
if node not in self.world_state["nodes"]:
|
|
self.world_state["nodes"][node] = {
|
|
"status": "unknown",
|
|
"last_seen": None,
|
|
"roles": self.inventory["nodes"].get(node, {}).get("roles", [])
|
|
}
|
|
self.world_state["nodes"][node]["last_seen"] = timestamp
|
|
|
|
if etype == "node_online":
|
|
self.world_state["nodes"][node]["status"] = "online"
|
|
elif etype == "node_offline":
|
|
self.world_state["nodes"][node]["status"] = "offline"
|
|
|
|
elif etype == "node_health":
|
|
# Regular heartbeat from node-agent; updates resource metrics.
|
|
# Clears disk_pressure if disk is now healthy (< warn threshold).
|
|
self.world_state["nodes"][node]["status"] = "online"
|
|
self.world_state["nodes"][node].update({
|
|
"disk_usage_pct": payload.get("disk_pct"),
|
|
"mem_usage_pct": payload.get("mem_pct"),
|
|
"cpu_usage_pct": payload.get("cpu_pct"),
|
|
})
|
|
if (payload.get("disk_pct") or 0) < 75:
|
|
self.world_state["nodes"][node].pop("disk_pressure", None)
|
|
|
|
elif etype == "disk_pressure":
|
|
# Emitted when disk usage crosses 75 % (medium) or 85 % (high).
|
|
# The supervisor reads disk_pressure to generate disk_cleanup actions.
|
|
self.world_state["nodes"][node]["disk_pressure"] = severity
|
|
self.world_state["nodes"][node]["disk_usage_pct"] = payload.get("usage_pct")
|
|
|
|
elif etype == "high_memory":
|
|
# Memory pressure observation; recorded on the node for correlation.
|
|
# No automated action — operator decides if a container restart helps.
|
|
self.world_state["nodes"][node]["memory_pressure"] = severity
|
|
self.world_state["nodes"][node]["mem_usage_pct"] = payload.get("usage_pct")
|
|
|
|
elif etype == "high_cpu":
|
|
# CPU pressure observation; recorded for visibility.
|
|
self.world_state["nodes"][node]["cpu_pressure"] = severity
|
|
self.world_state["nodes"][node]["cpu_usage_pct"] = payload.get("usage_pct")
|
|
|
|
# 2. Update Service State
|
|
if service and service != "all":
|
|
svc_key = f"{node}/{service}"
|
|
if svc_key not in self.world_state["services"]:
|
|
self.world_state["services"][svc_key] = {
|
|
"node": node,
|
|
"service": service,
|
|
"status": "unknown",
|
|
"last_check": None,
|
|
"incident_id": None
|
|
}
|
|
self.world_state["services"][svc_key]["last_check"] = timestamp
|
|
|
|
if etype == "service_recovered":
|
|
self.world_state["services"][svc_key]["status"] = "healthy"
|
|
self._resolve_incident(svc_key, timestamp)
|
|
elif etype == "service_healthy":
|
|
# Positive confirmation from node-agent that a managed container
|
|
# is running. This keeps services.json populated so the supervisor
|
|
# can correctly detect drift (absent entry = never reported = unknown,
|
|
# not the same as confirmed missing).
|
|
# Also resolve any active incident — if a service that had been
|
|
# unhealthy/crashing is now confirmed healthy, the incident is over.
|
|
self.world_state["services"][svc_key]["status"] = "healthy"
|
|
self._resolve_incident(svc_key, timestamp)
|
|
elif etype in ["service_unhealthy", "healthcheck_failed"]:
|
|
self.world_state["services"][svc_key]["status"] = "unhealthy"
|
|
self._handle_incident(svc_key, event)
|
|
|
|
# 3. Update Deployment State
|
|
if etype.startswith("deployment_") and cid:
|
|
if cid not in self.world_state["deployments"]:
|
|
self.world_state["deployments"][cid] = {
|
|
"node": node,
|
|
"service": service,
|
|
"status": "unknown",
|
|
"started_at": None,
|
|
"finished_at": None,
|
|
"events": []
|
|
}
|
|
self.world_state["deployments"][cid]["events"].append({
|
|
"type": etype,
|
|
"timestamp": timestamp,
|
|
"payload": payload
|
|
})
|
|
if etype == "deployment_started":
|
|
self.world_state["deployments"][cid]["status"] = "in_progress"
|
|
self.world_state["deployments"][cid]["started_at"] = timestamp
|
|
elif etype == "deployment_completed":
|
|
self.world_state["deployments"][cid]["status"] = "completed"
|
|
self.world_state["deployments"][cid]["finished_at"] = timestamp
|
|
elif etype == "deployment_failed":
|
|
self.world_state["deployments"][cid]["status"] = "failed"
|
|
self.world_state["deployments"][cid]["finished_at"] = timestamp
|
|
# Deployment failure often creates an incident
|
|
self._handle_deployment_failure(event)
|
|
|
|
def _handle_incident(self, svc_key, event):
|
|
# Correlation: collapse repeated failures for the same service on the same node
|
|
active_incident = self.world_state["services"][svc_key].get("incident_id")
|
|
|
|
if active_incident and active_incident in self.world_state["incidents"]:
|
|
incident = self.world_state["incidents"][active_incident]
|
|
if incident["status"] == "active":
|
|
incident["last_occurrence"] = event["timestamp"]
|
|
incident["occurrence_count"] = incident.get("occurrence_count", 1) + 1
|
|
incident["events"].append(event["timestamp"])
|
|
return
|
|
|
|
# Create new incident
|
|
incident_id = f"inc-{int(time.time())}-{event.get('node')}-{event.get('service')}"
|
|
self.world_state["incidents"][incident_id] = {
|
|
"id": incident_id,
|
|
"node": event.get("node"),
|
|
"service": event.get("service"),
|
|
"status": "active",
|
|
"severity": event.get("severity"),
|
|
# trigger_type records the event type that opened this incident so that
|
|
# the supervisor can choose the appropriate remediation action
|
|
# (e.g. container_restart for containers_not_running / mqtt_unreachable
|
|
# vs. a full redeploy for other causes).
|
|
"trigger_type": event.get("type"),
|
|
"started_at": event.get("timestamp"),
|
|
"last_occurrence": event.get("timestamp"),
|
|
"occurrence_count": 1,
|
|
"events": [event["timestamp"]],
|
|
"correlation_id": event.get("correlation_id")
|
|
}
|
|
self.world_state["services"][svc_key]["incident_id"] = incident_id
|
|
|
|
def _resolve_incident(self, svc_key, timestamp):
|
|
incident_id = self.world_state["services"][svc_key].get("incident_id")
|
|
if incident_id and incident_id in self.world_state["incidents"]:
|
|
if self.world_state["incidents"][incident_id]["status"] == "active":
|
|
self.world_state["incidents"][incident_id]["status"] = "resolved"
|
|
self.world_state["incidents"][incident_id]["resolved_at"] = timestamp
|
|
self.world_state["services"][svc_key]["incident_id"] = None
|
|
|
|
def _handle_deployment_failure(self, event):
|
|
# Specific logic for deployment failures
|
|
svc_key = f"{event.get('node')}/{event.get('service')}"
|
|
self._handle_incident(svc_key, event)
|
|
|
|
# Link diagnostics if available in payload
|
|
incident_id = self.world_state["services"][svc_key].get("incident_id")
|
|
if incident_id and incident_id in self.world_state["incidents"]:
|
|
payload = event.get("payload", {})
|
|
if "diagnostics_file" in payload:
|
|
self.world_state["incidents"][incident_id]["diagnostics_ref"] = payload["diagnostics_file"]
|
|
elif "error" in payload:
|
|
self.world_state["incidents"][incident_id]["last_error"] = payload["error"]
|
|
|
|
def run_once(self):
|
|
# Update heartbeat
|
|
heartbeat_file = STATE_DIR / "observer.heartbeat"
|
|
try:
|
|
heartbeat_file.touch()
|
|
except Exception as e:
|
|
logger.error(f"Failed to touch heartbeat file: {e}")
|
|
|
|
# Collect all event files grouped by node directory.
|
|
# Per-node checkpoints are compared within each directory independently,
|
|
# so late-arriving events from remote nodes (sorted earlier in the path)
|
|
# are never skipped just because another node's checkpoint is further ahead.
|
|
all_files = sorted(glob.glob(str(EVENTS_DIR / "**" / "*.json"), recursive=True))
|
|
|
|
new_files = []
|
|
for file_path in all_files:
|
|
try:
|
|
node_dir = str(Path(file_path).relative_to(EVENTS_DIR).parts[0])
|
|
except (IndexError, ValueError):
|
|
node_dir = "__unknown__"
|
|
last_for_node = self.node_checkpoints.get(node_dir, "")
|
|
if file_path > last_for_node:
|
|
new_files.append((node_dir, file_path))
|
|
|
|
if not new_files:
|
|
# Even if no new events, prune stale entries and refresh summary freshness.
|
|
self._prune_stale_world()
|
|
self._save_world()
|
|
return
|
|
|
|
logger.info(f"Processing {len(new_files)} new events across "
|
|
f"{len({n for n, _ in new_files})} node(s)")
|
|
for node_dir, file_path in new_files:
|
|
try:
|
|
with open(file_path, "r") as f:
|
|
event = json.load(f)
|
|
self.process_event(event)
|
|
# Advance per-node checkpoint (only forward — no regression).
|
|
if file_path > self.node_checkpoints.get(node_dir, ""):
|
|
self.node_checkpoints[node_dir] = file_path
|
|
except Exception as e:
|
|
logger.error(f"Error processing {file_path}: {e}")
|
|
|
|
self._save_checkpoint()
|
|
self._prune_stale_world()
|
|
self._save_world()
|
|
|
|
def loop(self, interval=5):
|
|
logger.info("Starting observer loop")
|
|
while True:
|
|
self.run_once()
|
|
time.sleep(interval)
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
observer = Observer()
|
|
if "--run-once" in sys.argv:
|
|
observer.run_once()
|
|
else:
|
|
observer.loop()
|