homelab-codex-ws/scripts/observer/observer.py

309 lines
13 KiB
Python
Raw Normal View History

import os
import json
import time
import glob
import logging
import yaml
from datetime import datetime, timezone
from pathlib import Path
# Constants and Paths
RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab")
EVENTS_DIR = Path(RUNTIME_PATH) / "events"
STATE_DIR = Path(RUNTIME_PATH) / "state"
LOGS_DIR = Path(RUNTIME_PATH) / "logs"
WORLD_DIR = Path(RUNTIME_PATH) / "world"
OBSERVER_STATE_FILE = STATE_DIR / "observer_checkpoint.json"
REPO_ROOT = Path(__file__).parent.parent.parent
INVENTORY_TOPOLOGY = REPO_ROOT / "inventory" / "topology.yaml"
# Logging setup
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("observer")
class Observer:
def __init__(self):
self.last_processed_file = None
self.world_state = {
"nodes": {},
"services": {},
"deployments": {},
"incidents": {},
"summary": {
"last_update": None,
"status": "initializing",
"active_incidents_count": 0
}
}
self.inventory = self._load_inventory()
self._ensure_dirs()
self._load_checkpoint()
def _ensure_dirs(self):
WORLD_DIR.mkdir(parents=True, exist_ok=True)
STATE_DIR.mkdir(parents=True, exist_ok=True)
EVENTS_DIR.mkdir(parents=True, exist_ok=True)
LOGS_DIR.mkdir(parents=True, exist_ok=True)
def _load_inventory(self):
inventory = {"nodes": {}, "services": {}}
try:
if INVENTORY_TOPOLOGY.exists():
with open(INVENTORY_TOPOLOGY, "r") as f:
topo = yaml.safe_load(f)
for node_name, node_info in topo.get("nodes", {}).items():
inventory["nodes"][node_name] = {
"roles": node_info.get("roles", []),
"connectivity": node_info.get("connectivity", {})
}
# Load service assignments from hosts files
hosts_dir = REPO_ROOT / "hosts"
for host_dir in hosts_dir.iterdir():
if host_dir.is_dir():
svc_file = host_dir / "services.yaml"
if svc_file.exists():
with open(svc_file, "r") as f:
svc_data = yaml.safe_load(f)
host_name = svc_data.get("host")
for svc_name, svc_info in svc_data.get("services", {}).items():
if host_name not in inventory["services"]:
inventory["services"][host_name] = {}
inventory["services"][host_name][svc_name] = {
"role": svc_info.get("role"),
"exposure": svc_info.get("exposure")
}
except Exception as e:
logger.error(f"Failed to load inventory: {e}")
return inventory
def _load_checkpoint(self):
if OBSERVER_STATE_FILE.exists():
try:
with open(OBSERVER_STATE_FILE, "r") as f:
checkpoint = json.load(f)
self.last_processed_file = checkpoint.get("last_processed_file")
# We might want to persist partial world state,
# but for now we rebuild from events (idempotent)
# or we can load existing world state files.
self._load_world_from_disk()
except Exception as e:
logger.error(f"Failed to load checkpoint: {e}")
def _load_world_from_disk(self):
# Optional: Load existing state to resume faster
files = {
"nodes": WORLD_DIR / "nodes.json",
"services": WORLD_DIR / "services.json",
"deployments": WORLD_DIR / "deployments.json",
"incidents": WORLD_DIR / "incidents.json",
"summary": WORLD_DIR / "runtime-summary.json"
}
for key, path in files.items():
if path.exists():
try:
with open(path, "r") as f:
self.world_state[key] = json.load(f)
except Exception as e:
logger.error(f"Failed to load {key} state: {e}")
def _save_checkpoint(self):
try:
with open(OBSERVER_STATE_FILE, "w") as f:
json.dump({"last_processed_file": self.last_processed_file}, f)
except Exception as e:
logger.error(f"Failed to save checkpoint: {e}")
def _save_world(self):
self.world_state["summary"]["last_update"] = datetime.now(timezone.utc).isoformat()
active_incidents = [
k for k, v in self.world_state["incidents"].items() if v.get("status") == "active"
]
self.world_state["summary"]["active_incidents_count"] = len(active_incidents)
if active_incidents:
self.world_state["summary"]["status"] = "degraded"
else:
self.world_state["summary"]["status"] = "nominal"
files = {
"nodes.json": self.world_state["nodes"],
"services.json": self.world_state["services"],
"deployments.json": self.world_state["deployments"],
"incidents.json": self.world_state["incidents"],
"runtime-summary.json": self.world_state["summary"]
}
for filename, data in files.items():
try:
with open(WORLD_DIR / filename, "w") as f:
json.dump(data, f, indent=2)
except Exception as e:
logger.error(f"Failed to save {filename}: {e}")
def process_event(self, event):
etype = event.get("type")
node = event.get("node")
service = event.get("service")
severity = event.get("severity")
timestamp = event.get("timestamp")
cid = event.get("correlation_id")
payload = event.get("payload", {})
# 1. Update Node State
if node not in self.world_state["nodes"]:
self.world_state["nodes"][node] = {
"status": "unknown",
"last_seen": None,
"roles": self.inventory["nodes"].get(node, {}).get("roles", [])
}
self.world_state["nodes"][node]["last_seen"] = timestamp
if etype == "node_online":
self.world_state["nodes"][node]["status"] = "online"
elif etype == "node_offline":
self.world_state["nodes"][node]["status"] = "offline"
# 2. Update Service State
if service and service != "all":
svc_key = f"{node}/{service}"
if svc_key not in self.world_state["services"]:
self.world_state["services"][svc_key] = {
"node": node,
"service": service,
"status": "unknown",
"last_check": None,
"incident_id": None
}
self.world_state["services"][svc_key]["last_check"] = timestamp
if etype == "service_recovered":
self.world_state["services"][svc_key]["status"] = "healthy"
self._resolve_incident(svc_key, timestamp)
elif etype in ["service_unhealthy", "healthcheck_failed"]:
self.world_state["services"][svc_key]["status"] = "unhealthy"
self._handle_incident(svc_key, event)
# 3. Update Deployment State
if etype.startswith("deployment_") and cid:
if cid not in self.world_state["deployments"]:
self.world_state["deployments"][cid] = {
"node": node,
"service": service,
"status": "unknown",
"started_at": None,
"finished_at": None,
"events": []
}
self.world_state["deployments"][cid]["events"].append({
"type": etype,
"timestamp": timestamp,
"payload": payload
})
if etype == "deployment_started":
self.world_state["deployments"][cid]["status"] = "in_progress"
self.world_state["deployments"][cid]["started_at"] = timestamp
elif etype == "deployment_completed":
self.world_state["deployments"][cid]["status"] = "completed"
self.world_state["deployments"][cid]["finished_at"] = timestamp
elif etype == "deployment_failed":
self.world_state["deployments"][cid]["status"] = "failed"
self.world_state["deployments"][cid]["finished_at"] = timestamp
# Deployment failure often creates an incident
self._handle_deployment_failure(event)
def _handle_incident(self, svc_key, event):
# Correlation: collapse repeated failures for the same service on the same node
active_incident = self.world_state["services"][svc_key].get("incident_id")
if active_incident and active_incident in self.world_state["incidents"]:
incident = self.world_state["incidents"][active_incident]
if incident["status"] == "active":
incident["last_occurrence"] = event["timestamp"]
incident["occurrence_count"] = incident.get("occurrence_count", 1) + 1
incident["events"].append(event["timestamp"])
return
# Create new incident
incident_id = f"inc-{int(time.time())}-{event.get('node')}-{event.get('service')}"
self.world_state["incidents"][incident_id] = {
"id": incident_id,
"node": event.get("node"),
"service": event.get("service"),
"status": "active",
"severity": event.get("severity"),
"started_at": event.get("timestamp"),
"last_occurrence": event.get("timestamp"),
"occurrence_count": 1,
"events": [event["timestamp"]],
"correlation_id": event.get("correlation_id")
}
self.world_state["services"][svc_key]["incident_id"] = incident_id
def _resolve_incident(self, svc_key, timestamp):
incident_id = self.world_state["services"][svc_key].get("incident_id")
if incident_id and incident_id in self.world_state["incidents"]:
if self.world_state["incidents"][incident_id]["status"] == "active":
self.world_state["incidents"][incident_id]["status"] = "resolved"
self.world_state["incidents"][incident_id]["resolved_at"] = timestamp
self.world_state["services"][svc_key]["incident_id"] = None
def _handle_deployment_failure(self, event):
# Specific logic for deployment failures
svc_key = f"{event.get('node')}/{event.get('service')}"
self._handle_incident(svc_key, event)
# Link diagnostics if available in payload
incident_id = self.world_state["services"][svc_key].get("incident_id")
if incident_id and incident_id in self.world_state["incidents"]:
payload = event.get("payload", {})
if "diagnostics_file" in payload:
self.world_state["incidents"][incident_id]["diagnostics_ref"] = payload["diagnostics_file"]
elif "error" in payload:
self.world_state["incidents"][incident_id]["last_error"] = payload["error"]
def run_once(self):
# Find all event files
event_files = sorted(glob.glob(str(EVENTS_DIR / "*" / "*" / "*.json")))
new_files = []
if self.last_processed_file:
try:
idx = event_files.index(self.last_processed_file)
new_files = event_files[idx+1:]
except ValueError:
# If last_processed_file is gone or not in list, process all
new_files = event_files
else:
new_files = event_files
if not new_files:
return
logger.info(f"Processing {len(new_files)} new events")
for file_path in new_files:
try:
with open(file_path, "r") as f:
event = json.load(f)
self.process_event(event)
self.last_processed_file = file_path
except Exception as e:
logger.error(f"Error processing {file_path}: {e}")
self._save_checkpoint()
self._save_world()
def loop(self, interval=5):
logger.info("Starting observer loop")
while True:
self.run_once()
time.sleep(interval)
if __name__ == "__main__":
import sys
observer = Observer()
if "--run-once" in sys.argv:
observer.run_once()
else:
observer.loop()